1
0
mirror of https://github.com/oceanprotocol/market.git synced 2024-06-30 05:41:41 +02:00

refactor that huge ComputeJobs index component

This commit is contained in:
Matthias Kretschmann 2021-09-01 15:01:39 +02:00
parent ed9930dfb3
commit e998dc777a
Signed by: m
GPG Key ID: 606EEEF3C479A91F
2 changed files with 254 additions and 224 deletions

View File

@ -1,76 +1,20 @@
import React, { ReactElement, useEffect, useState } from 'react'
import web3 from 'web3'
import React, { ReactElement, useEffect, useState, useCallback } from 'react'
import Time from '../../../../atoms/Time'
import { Link } from 'gatsby'
import { DDO, Logger, Service, Provider } from '@oceanprotocol/lib'
import { Logger } from '@oceanprotocol/lib'
import { ComputeJobMetaData } from '../../../../../@types/ComputeJobMetaData'
import Dotdotdot from 'react-dotdotdot'
import Table from '../../../../atoms/Table'
import Button from '../../../../atoms/Button'
import { useOcean } from '../../../../../providers/Ocean'
import { gql } from 'urql'
import { useWeb3 } from '../../../../../providers/Web3'
import {
queryMetadata,
transformChainIdsListToQuery
} from '../../../../../utils/aquarius'
import axios, { CancelToken } from 'axios'
import Details from './Details'
import { ComputeJob } from '@oceanprotocol/lib/dist/node/ocean/interfaces/Compute'
import { ReactComponent as Refresh } from '../../../../../images/refresh.svg'
import styles from './index.module.css'
import { useUserPreferences } from '../../../../../providers/UserPreferences'
import { getOceanConfig } from '../../../../../utils/ocean'
import { fetchDataForMultipleChains } from '../../../../../utils/subgraph'
import { OrdersData_tokenOrders_datatokenId as OrdersDatatoken } from '../../../../../@types/apollo/OrdersData'
import NetworkName from '../../../../atoms/NetworkName'
const getComputeOrders = gql`
query ComputeOrders($user: String!) {
tokenOrders(
orderBy: timestamp
orderDirection: desc
where: { payer: $user }
) {
id
serviceId
datatokenId {
address
}
tx
timestamp
}
}
`
const getComputeOrdersByDatatokenAddress = gql`
query ComputeOrdersByDatatokenAddress(
$user: String!
$datatokenAddress: String!
) {
tokenOrders(
orderBy: timestamp
orderDirection: desc
where: { payer: $user, datatokenId: $datatokenAddress }
) {
id
serviceId
datatokenId {
address
}
tx
timestamp
}
}
`
interface TokenOrder {
id: string
serviceId: number
datatokenId: OrdersDatatoken
tx: any | null
timestamp: number
}
import { getComputeJobs } from './utils'
import styles from './index.module.css'
export function Status({ children }: { children: string }): ReactElement {
return <div className={styles.status}>{children}</div>
@ -123,28 +67,6 @@ const columns = [
}
]
async function getAssetMetadata(
queryDtList: string,
cancelToken: CancelToken,
chainIds: number[]
): Promise<DDO[]> {
const queryDid = {
page: 1,
offset: 100,
query: {
query_string: {
query: `(${queryDtList}) AND (${transformChainIdsListToQuery(
chainIds
)}) AND service.attributes.main.type:dataset AND service.type:compute`,
fields: ['dataToken']
}
}
}
const result = await queryMetadata(queryDid, cancelToken)
return result.results
}
export default function ComputeJobs({
minimal,
assetDTAddress,
@ -170,161 +92,37 @@ export default function ComputeJobs({
if (ocean === undefined) {
initOcean()
}
}, [networkId, ocean])
}, [networkId, ocean, connect])
async function getJobs() {
if (!accountId) return
setIsLoading(true)
const variables = assetDTAddress
? {
user: accountId?.toLowerCase(),
datatokenAddress: assetDTAddress.toLowerCase()
}
: {
user: accountId?.toLowerCase()
}
const result = await fetchDataForMultipleChains(
assetDTAddress ? getComputeOrdersByDatatokenAddress : getComputeOrders,
variables,
assetDTAddress ? [chainId] : chainIds
)
let data: TokenOrder[] = []
for (let i = 0; i < result.length; i++) {
if (!result[i].tokenOrders) continue
result[i].tokenOrders.forEach((tokenOrder: TokenOrder) => {
data.push(tokenOrder)
})
}
if (!ocean || !account || !data) {
return
}
data = data.sort((a, b) => b.timestamp - a.timestamp)
const dtList = []
const computeJobs: ComputeJobMetaData[] = []
for (let i = 0; i < data.length; i++) {
dtList.push(data[i].datatokenId.address)
}
const queryDtList = JSON.stringify(dtList)
.replace(/,/g, ' ')
.replace(/"/g, '')
.replace(/(\[|\])/g, '')
if (queryDtList === '') {
setJobs([])
const fetchJobs = useCallback(async () => {
if (!chainIds || !accountId) {
setIsLoading(false)
return
}
try {
setIsLoading(true)
const source = axios.CancelToken.source()
const assets = await getAssetMetadata(queryDtList, source.token, chainIds)
const providers: Provider[] = []
const serviceEndpoints: string[] = []
for (let i = 0; i < data.length; i++) {
try {
const did = web3.utils
.toChecksumAddress(data[i].datatokenId.address)
.replace('0x', 'did:op:')
const ddo = assets.filter((x) => x.id === did)[0]
if (ddo === undefined) continue
const service = ddo.service.filter(
(x: Service) => x.index === data[i].serviceId
)[0]
if (!service || service.type !== 'compute') continue
const { serviceEndpoint } = service
const wasProviderQueried =
serviceEndpoints.filter((x) => x === serviceEndpoint).length > 0
if (wasProviderQueried) continue
serviceEndpoints.push(serviceEndpoint)
} catch (err) {
Logger.error(err)
}
}
try {
setIsLoading(true)
for (let i = 0; i < serviceEndpoints.length; i++) {
const instanceConfig = {
config,
web3: config.web3Provider,
logger: Logger,
ocean: ocean
}
const provider = await Provider.getInstance(instanceConfig)
await provider.setBaseUrl(serviceEndpoints[i])
const hasSameCompute =
providers.filter(
(x) => x.computeAddress === provider.computeAddress
).length > 0
if (!hasSameCompute) providers.push(provider)
}
} catch (err) {
Logger.error(err)
}
for (let i = 0; i < providers.length; i++) {
try {
const providerComputeJobs = (await providers[i].computeStatus(
'',
account,
undefined,
undefined,
false
)) as ComputeJob[]
// means the provider uri is not good, so we ignore it and move on
if (!providerComputeJobs) continue
providerComputeJobs.sort((a, b) => {
if (a.dateCreated > b.dateCreated) {
return -1
}
if (a.dateCreated < b.dateCreated) {
return 1
}
return 0
})
for (let j = 0; j < providerComputeJobs.length; j++) {
const job = providerComputeJobs[j]
const did = job.inputDID[0]
const ddo = assets.filter((x) => x.id === did)[0]
if (!ddo) continue
const serviceMetadata = ddo.service.filter(
(x: Service) => x.type === 'metadata'
)[0]
const compJob: ComputeJobMetaData = {
...job,
assetName: serviceMetadata.attributes.main.name,
assetDtSymbol: ddo.dataTokenInfo.symbol,
networkId: ddo.chainId
}
computeJobs.push(compJob)
}
} catch (err) {
Logger.error(err)
}
}
setJobs(computeJobs)
const jobs = await getComputeJobs(
accountId,
assetDTAddress,
chainIds,
chainId,
config,
ocean,
account
)
setJobs(jobs)
} catch (error) {
Logger.log(error.message)
Logger.error(error.message)
} finally {
setIsLoading(false)
}
return true
}
}, [account, accountId, assetDTAddress, chainIds, chainId, config, ocean])
useEffect(() => {
if (!chainIds || !accountId) {
setIsLoading(false)
return
}
getJobs()
}, [ocean, account, chainIds, accountId])
fetchJobs()
}, [fetchJobs])
return accountId ? (
<>
@ -333,7 +131,7 @@ export default function ComputeJobs({
style="text"
size="small"
title="Refresh compute jobs"
onClick={() => getJobs()}
onClick={async () => await fetchJobs()}
disabled={isLoading}
className={styles.refresh}
>

View File

@ -0,0 +1,232 @@
import {
DDO,
Service,
Logger,
Provider,
Config,
Ocean,
Account
} from '@oceanprotocol/lib'
import web3 from 'web3'
import { ComputeJob } from '@oceanprotocol/lib/dist/node/ocean/interfaces/Compute'
import axios, { CancelToken } from 'axios'
import { gql } from 'urql'
import { ComputeJobMetaData } from '../../../../../@types/ComputeJobMetaData'
import {
transformChainIdsListToQuery,
queryMetadata
} from '../../../../../utils/aquarius'
import { fetchDataForMultipleChains } from '../../../../../utils/subgraph'
import { OrdersData_tokenOrders_datatokenId as OrdersDatatoken } from '../../../../../@types/apollo/OrdersData'
const getComputeOrders = gql`
query ComputeOrders($user: String!) {
tokenOrders(
orderBy: timestamp
orderDirection: desc
where: { payer: $user }
) {
id
serviceId
datatokenId {
address
}
tx
timestamp
}
}
`
const getComputeOrdersByDatatokenAddress = gql`
query ComputeOrdersByDatatokenAddress(
$user: String!
$datatokenAddress: String!
) {
tokenOrders(
orderBy: timestamp
orderDirection: desc
where: { payer: $user, datatokenId: $datatokenAddress }
) {
id
serviceId
datatokenId {
address
}
tx
timestamp
}
}
`
interface TokenOrder {
id: string
serviceId: number
datatokenId: OrdersDatatoken
tx: any | null
timestamp: number
}
async function getAssetMetadata(
queryDtList: string,
cancelToken: CancelToken,
chainIds: number[]
): Promise<DDO[]> {
const queryDid = {
page: 1,
offset: 100,
query: {
query_string: {
query: `(${queryDtList}) AND (${transformChainIdsListToQuery(
chainIds
)}) AND service.attributes.main.type:dataset AND service.type:compute`,
fields: ['dataToken']
}
}
}
const result = await queryMetadata(queryDid, cancelToken)
return result.results
}
export async function getComputeJobs(
accountId: string,
assetDTAddress: string,
chainIds: number[],
chainId: number,
config: Config,
ocean: Ocean,
account: Account
): Promise<ComputeJobMetaData[]> {
const variables = assetDTAddress
? {
user: accountId?.toLowerCase(),
datatokenAddress: assetDTAddress.toLowerCase()
}
: {
user: accountId?.toLowerCase()
}
const result = await fetchDataForMultipleChains(
assetDTAddress ? getComputeOrdersByDatatokenAddress : getComputeOrders,
variables,
assetDTAddress ? [chainId] : chainIds
)
let data: TokenOrder[] = []
for (let i = 0; i < result.length; i++) {
if (!result[i].tokenOrders) continue
result[i].tokenOrders.forEach((tokenOrder: TokenOrder) => {
data.push(tokenOrder)
})
}
if (!ocean || !account || !data) return []
data = data.sort((a, b) => b.timestamp - a.timestamp)
const dtList = []
const computeJobs: ComputeJobMetaData[] = []
for (let i = 0; i < data.length; i++) {
dtList.push(data[i].datatokenId.address)
}
const queryDtList = JSON.stringify(dtList)
.replace(/,/g, ' ')
.replace(/"/g, '')
.replace(/(\[|\])/g, '')
if (queryDtList === '') return []
const source = axios.CancelToken.source()
const assets = await getAssetMetadata(queryDtList, source.token, chainIds)
const providers: Provider[] = []
const serviceEndpoints: string[] = []
try {
for (let i = 0; i < data.length; i++) {
try {
const did = web3.utils
.toChecksumAddress(data[i].datatokenId.address)
.replace('0x', 'did:op:')
const ddo = assets.filter((x) => x.id === did)[0]
if (ddo === undefined) continue
const service = ddo.service.filter(
(x: Service) => x.index === data[i].serviceId
)[0]
if (!service || service.type !== 'compute') continue
const { serviceEndpoint } = service
const wasProviderQueried =
serviceEndpoints.filter((x) => x === serviceEndpoint).length > 0
if (wasProviderQueried) continue
serviceEndpoints.push(serviceEndpoint)
} catch (err) {
Logger.error(err)
}
}
for (let i = 0; i < serviceEndpoints.length; i++) {
const instanceConfig = {
config,
web3: config.web3Provider,
logger: Logger,
ocean
}
const provider = await Provider.getInstance(instanceConfig)
await provider.setBaseUrl(serviceEndpoints[i])
const hasSameCompute =
providers.filter((x) => x.computeAddress === provider.computeAddress)
.length > 0
if (!hasSameCompute) providers.push(provider)
}
} catch (err) {
Logger.error(err)
}
for (let i = 0; i < providers.length; i++) {
try {
const providerComputeJobs = (await providers[i].computeStatus(
'',
account,
undefined,
undefined,
false
)) as ComputeJob[]
// means the provider uri is not good, so we ignore it and move on
if (!providerComputeJobs) continue
providerComputeJobs.sort((a, b) => {
if (a.dateCreated > b.dateCreated) {
return -1
}
if (a.dateCreated < b.dateCreated) {
return 1
}
return 0
})
for (let j = 0; j < providerComputeJobs.length; j++) {
const job = providerComputeJobs[j]
const did = job.inputDID[0]
const ddo = assets.filter((x) => x.id === did)[0]
if (!ddo) continue
const serviceMetadata = ddo.service.filter(
(x: Service) => x.type === 'metadata'
)[0]
const compJob: ComputeJobMetaData = {
...job,
assetName: serviceMetadata.attributes.main.name,
assetDtSymbol: ddo.dataTokenInfo.symbol,
networkId: ddo.chainId
}
computeJobs.push(compJob)
}
} catch (err) {
Logger.error(err)
}
}
return computeJobs
}