mirror of
https://github.com/oceanprotocol/market.git
synced 2024-12-02 05:57:29 +01:00
more getComputeJobs refactor
* move to compute utils * abstract out more loops into separate methods * fix results display
This commit is contained in:
parent
e3900e8326
commit
b2bec2abe4
@ -11,7 +11,7 @@ import { graphql, useStaticQuery } from 'gatsby'
|
||||
|
||||
export const contentQuery = graphql`
|
||||
query HistoryPageComputeResultsQuery {
|
||||
content: allFile(filter: { relativePath: { eq: "pages/account.json" } }) {
|
||||
content: allFile(filter: { relativePath: { eq: "pages/history.json" } }) {
|
||||
edges {
|
||||
node {
|
||||
childPagesJson {
|
||||
|
@ -13,7 +13,7 @@ import { ReactComponent as Refresh } from '../../../../../images/refresh.svg'
|
||||
import { useUserPreferences } from '../../../../../providers/UserPreferences'
|
||||
import { getOceanConfig } from '../../../../../utils/ocean'
|
||||
import NetworkName from '../../../../atoms/NetworkName'
|
||||
import { getComputeJobs } from './utils'
|
||||
import { getComputeJobs } from '../../../../../utils/compute'
|
||||
import styles from './index.module.css'
|
||||
import { useAsset } from '../../../../../providers/Asset'
|
||||
|
||||
@ -100,15 +100,7 @@ export default function ComputeJobs({
|
||||
|
||||
try {
|
||||
setIsLoading(true)
|
||||
|
||||
const jobs = await getComputeJobs(
|
||||
ddo?.dataTokenInfo?.address,
|
||||
chainIds,
|
||||
ddo?.chainId,
|
||||
config,
|
||||
ocean,
|
||||
account
|
||||
)
|
||||
const jobs = await getComputeJobs(chainIds, config, ocean, account, ddo)
|
||||
setJobs(jobs)
|
||||
} catch (error) {
|
||||
Logger.error(error.message)
|
||||
@ -123,7 +115,7 @@ export default function ComputeJobs({
|
||||
|
||||
return accountId ? (
|
||||
<>
|
||||
{(jobs?.length || !minimal) && (
|
||||
{jobs?.length >= 0 && !minimal && (
|
||||
<Button
|
||||
style="text"
|
||||
size="small"
|
||||
|
@ -1,232 +0,0 @@
|
||||
import {
|
||||
DDO,
|
||||
Service,
|
||||
Logger,
|
||||
Provider,
|
||||
Config,
|
||||
Ocean,
|
||||
Account
|
||||
} from '@oceanprotocol/lib'
|
||||
import web3 from 'web3'
|
||||
import { ComputeJob } from '@oceanprotocol/lib/dist/node/ocean/interfaces/Compute'
|
||||
import axios, { CancelToken } from 'axios'
|
||||
import { gql } from 'urql'
|
||||
import { ComputeJobMetaData } from '../../../../../@types/ComputeJobMetaData'
|
||||
import {
|
||||
transformChainIdsListToQuery,
|
||||
queryMetadata
|
||||
} from '../../../../../utils/aquarius'
|
||||
import { fetchDataForMultipleChains } from '../../../../../utils/subgraph'
|
||||
import { OrdersData_tokenOrders_datatokenId as OrdersDatatoken } from '../../../../../@types/apollo/OrdersData'
|
||||
|
||||
const getComputeOrders = gql`
|
||||
query ComputeOrders($user: String!) {
|
||||
tokenOrders(
|
||||
orderBy: timestamp
|
||||
orderDirection: desc
|
||||
where: { payer: $user }
|
||||
) {
|
||||
id
|
||||
serviceId
|
||||
datatokenId {
|
||||
address
|
||||
}
|
||||
tx
|
||||
timestamp
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
const getComputeOrdersByDatatokenAddress = gql`
|
||||
query ComputeOrdersByDatatokenAddress(
|
||||
$user: String!
|
||||
$datatokenAddress: String!
|
||||
) {
|
||||
tokenOrders(
|
||||
orderBy: timestamp
|
||||
orderDirection: desc
|
||||
where: { payer: $user, datatokenId: $datatokenAddress }
|
||||
) {
|
||||
id
|
||||
serviceId
|
||||
datatokenId {
|
||||
address
|
||||
}
|
||||
tx
|
||||
timestamp
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
interface TokenOrder {
|
||||
id: string
|
||||
serviceId: number
|
||||
datatokenId: OrdersDatatoken
|
||||
tx: any | null
|
||||
timestamp: number
|
||||
}
|
||||
|
||||
async function getAssetMetadata(
|
||||
queryDtList: string,
|
||||
cancelToken: CancelToken,
|
||||
chainIds: number[]
|
||||
): Promise<DDO[]> {
|
||||
const queryDid = {
|
||||
page: 1,
|
||||
offset: 100,
|
||||
query: {
|
||||
query_string: {
|
||||
query: `(${queryDtList}) AND (${transformChainIdsListToQuery(
|
||||
chainIds
|
||||
)}) AND service.attributes.main.type:dataset AND service.type:compute`,
|
||||
fields: ['dataToken']
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const result = await queryMetadata(queryDid, cancelToken)
|
||||
return result.results
|
||||
}
|
||||
|
||||
export async function getComputeJobs(
|
||||
assetDTAddress: string,
|
||||
chainIds: number[],
|
||||
chainId: number,
|
||||
config: Config,
|
||||
ocean: Ocean,
|
||||
account: Account
|
||||
): Promise<ComputeJobMetaData[]> {
|
||||
const variables = assetDTAddress
|
||||
? {
|
||||
user: account?.getId().toLowerCase(),
|
||||
datatokenAddress: assetDTAddress.toLowerCase()
|
||||
}
|
||||
: {
|
||||
user: account?.getId().toLowerCase()
|
||||
}
|
||||
|
||||
const result = await fetchDataForMultipleChains(
|
||||
assetDTAddress ? getComputeOrdersByDatatokenAddress : getComputeOrders,
|
||||
variables,
|
||||
assetDTAddress ? [chainId] : chainIds
|
||||
)
|
||||
let data: TokenOrder[] = []
|
||||
for (let i = 0; i < result.length; i++) {
|
||||
if (!result[i]?.tokenOrders) continue
|
||||
result[i]?.tokenOrders.forEach((tokenOrder: TokenOrder) => {
|
||||
data.push(tokenOrder)
|
||||
})
|
||||
}
|
||||
|
||||
if (!ocean || !account || !data) return []
|
||||
|
||||
data = data.sort((a, b) => b.timestamp - a.timestamp)
|
||||
const dtList = []
|
||||
const computeJobs: ComputeJobMetaData[] = []
|
||||
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
dtList.push(data[i].datatokenId.address)
|
||||
}
|
||||
const queryDtList = JSON.stringify(dtList)
|
||||
.replace(/,/g, ' ')
|
||||
.replace(/"/g, '')
|
||||
.replace(/(\[|\])/g, '')
|
||||
|
||||
if (queryDtList === '') return []
|
||||
|
||||
const source = axios.CancelToken.source()
|
||||
const assets = await getAssetMetadata(queryDtList, source.token, chainIds)
|
||||
const providers: Provider[] = []
|
||||
const serviceEndpoints: string[] = []
|
||||
|
||||
try {
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
try {
|
||||
const did = web3.utils
|
||||
.toChecksumAddress(data[i].datatokenId.address)
|
||||
.replace('0x', 'did:op:')
|
||||
const ddo = assets.filter((x) => x.id === did)[0]
|
||||
if (ddo === undefined) continue
|
||||
|
||||
const service = ddo.service.filter(
|
||||
(x: Service) => x.index === data[i].serviceId
|
||||
)[0]
|
||||
|
||||
if (!service || service.type !== 'compute') continue
|
||||
const { serviceEndpoint } = service
|
||||
|
||||
const wasProviderQueried =
|
||||
serviceEndpoints.filter((x) => x === serviceEndpoint).length > 0
|
||||
|
||||
if (wasProviderQueried) continue
|
||||
serviceEndpoints.push(serviceEndpoint)
|
||||
} catch (err) {
|
||||
Logger.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
for (let i = 0; i < serviceEndpoints.length; i++) {
|
||||
const instanceConfig = {
|
||||
config,
|
||||
web3: config.web3Provider,
|
||||
logger: Logger,
|
||||
ocean
|
||||
}
|
||||
const provider = await Provider.getInstance(instanceConfig)
|
||||
await provider.setBaseUrl(serviceEndpoints[i])
|
||||
const hasSameCompute =
|
||||
providers.filter((x) => x.computeAddress === provider.computeAddress)
|
||||
.length > 0
|
||||
if (!hasSameCompute) providers.push(provider)
|
||||
}
|
||||
} catch (err) {
|
||||
Logger.error(err)
|
||||
}
|
||||
|
||||
for (let i = 0; i < providers.length; i++) {
|
||||
try {
|
||||
const providerComputeJobs = (await providers[i].computeStatus(
|
||||
'',
|
||||
account,
|
||||
undefined,
|
||||
undefined,
|
||||
false
|
||||
)) as ComputeJob[]
|
||||
|
||||
// means the provider uri is not good, so we ignore it and move on
|
||||
if (!providerComputeJobs) continue
|
||||
providerComputeJobs.sort((a, b) => {
|
||||
if (a.dateCreated > b.dateCreated) {
|
||||
return -1
|
||||
}
|
||||
if (a.dateCreated < b.dateCreated) {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
for (let j = 0; j < providerComputeJobs.length; j++) {
|
||||
const job = providerComputeJobs[j]
|
||||
const did = job.inputDID[0]
|
||||
const ddo = assets.filter((x) => x.id === did)[0]
|
||||
|
||||
if (!ddo) continue
|
||||
const serviceMetadata = ddo.service.filter(
|
||||
(x: Service) => x.type === 'metadata'
|
||||
)[0]
|
||||
|
||||
const compJob: ComputeJobMetaData = {
|
||||
...job,
|
||||
assetName: serviceMetadata.attributes.main.name,
|
||||
assetDtSymbol: ddo.dataTokenInfo.symbol,
|
||||
networkId: ddo.chainId
|
||||
}
|
||||
computeJobs.push(compJob)
|
||||
}
|
||||
} catch (err) {
|
||||
Logger.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
return computeJobs
|
||||
}
|
@ -1,10 +1,272 @@
|
||||
import {
|
||||
DDO,
|
||||
Ocean,
|
||||
ServiceComputePrivacy,
|
||||
publisherTrustedAlgorithm as PublisherTrustedAlgorithm
|
||||
publisherTrustedAlgorithm as PublisherTrustedAlgorithm,
|
||||
DDO,
|
||||
Service,
|
||||
Logger,
|
||||
Provider,
|
||||
Config,
|
||||
Ocean,
|
||||
Account
|
||||
} from '@oceanprotocol/lib'
|
||||
import { ComputePrivacyForm } from '../models/FormEditComputeDataset'
|
||||
import web3 from 'web3'
|
||||
import { ComputeJob } from '@oceanprotocol/lib/dist/node/ocean/interfaces/Compute'
|
||||
import axios, { CancelToken } from 'axios'
|
||||
import { gql } from 'urql'
|
||||
import { ComputeJobMetaData } from '../@types/ComputeJobMetaData'
|
||||
import { transformChainIdsListToQuery, queryMetadata } from './aquarius'
|
||||
import { fetchDataForMultipleChains } from './subgraph'
|
||||
import { OrdersData_tokenOrders_datatokenId as OrdersDatatoken } from '../@types/apollo/OrdersData'
|
||||
|
||||
const getComputeOrders = gql`
|
||||
query ComputeOrders($user: String!) {
|
||||
tokenOrders(
|
||||
orderBy: timestamp
|
||||
orderDirection: desc
|
||||
where: { payer: $user }
|
||||
) {
|
||||
id
|
||||
serviceId
|
||||
datatokenId {
|
||||
address
|
||||
}
|
||||
tx
|
||||
timestamp
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
const getComputeOrdersByDatatokenAddress = gql`
|
||||
query ComputeOrdersByDatatokenAddress(
|
||||
$user: String!
|
||||
$datatokenAddress: String!
|
||||
) {
|
||||
tokenOrders(
|
||||
orderBy: timestamp
|
||||
orderDirection: desc
|
||||
where: { payer: $user, datatokenId: $datatokenAddress }
|
||||
) {
|
||||
id
|
||||
serviceId
|
||||
datatokenId {
|
||||
address
|
||||
}
|
||||
tx
|
||||
timestamp
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
interface TokenOrder {
|
||||
id: string
|
||||
serviceId: number
|
||||
datatokenId: OrdersDatatoken
|
||||
tx: any
|
||||
timestamp: number
|
||||
}
|
||||
|
||||
async function getAssetMetadata(
|
||||
queryDtList: string,
|
||||
cancelToken: CancelToken,
|
||||
chainIds: number[]
|
||||
): Promise<DDO[]> {
|
||||
const queryDid = {
|
||||
page: 1,
|
||||
offset: 100,
|
||||
query: {
|
||||
query_string: {
|
||||
query: `(${queryDtList}) AND (${transformChainIdsListToQuery(
|
||||
chainIds
|
||||
)}) AND service.attributes.main.type:dataset AND service.type:compute`,
|
||||
fields: ['dataToken']
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const result = await queryMetadata(queryDid, cancelToken)
|
||||
return result.results
|
||||
}
|
||||
|
||||
function getServiceEndpoints(data: TokenOrder[], assets: DDO[]): string[] {
|
||||
const serviceEndpoints: string[] = []
|
||||
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
try {
|
||||
const did = web3.utils
|
||||
.toChecksumAddress(data[i].datatokenId.address)
|
||||
.replace('0x', 'did:op:')
|
||||
const ddo = assets.filter((x) => x.id === did)[0]
|
||||
if (ddo === undefined) continue
|
||||
|
||||
const service = ddo.service.filter(
|
||||
(x: Service) => x.index === data[i].serviceId
|
||||
)[0]
|
||||
|
||||
if (!service || service.type !== 'compute') continue
|
||||
const { serviceEndpoint } = service
|
||||
|
||||
const wasProviderQueried =
|
||||
serviceEndpoints?.filter((x) => x === serviceEndpoint).length > 0
|
||||
|
||||
if (wasProviderQueried) continue
|
||||
serviceEndpoints.push(serviceEndpoint)
|
||||
} catch (err) {
|
||||
Logger.error(err.message)
|
||||
}
|
||||
}
|
||||
|
||||
return serviceEndpoints
|
||||
}
|
||||
|
||||
async function getProviders(
|
||||
serviceEndpoints: string[],
|
||||
config: Config,
|
||||
ocean: Ocean
|
||||
): Promise<Provider[]> {
|
||||
const providers: Provider[] = []
|
||||
|
||||
try {
|
||||
for (let i = 0; i < serviceEndpoints?.length; i++) {
|
||||
const instanceConfig = {
|
||||
config,
|
||||
web3: config.web3Provider,
|
||||
logger: Logger,
|
||||
ocean
|
||||
}
|
||||
const provider = await Provider.getInstance(instanceConfig)
|
||||
await provider.setBaseUrl(serviceEndpoints[i])
|
||||
const hasSameCompute =
|
||||
providers.filter((x) => x.computeAddress === provider.computeAddress)
|
||||
.length > 0
|
||||
if (!hasSameCompute) providers.push(provider)
|
||||
}
|
||||
} catch (err) {
|
||||
Logger.error(err.message)
|
||||
}
|
||||
|
||||
return providers
|
||||
}
|
||||
|
||||
async function getJobs(
|
||||
providers: Provider[],
|
||||
account: Account,
|
||||
assets: DDO[]
|
||||
): Promise<ComputeJobMetaData[]> {
|
||||
const computeJobs: ComputeJobMetaData[] = []
|
||||
|
||||
for (let i = 0; i < providers.length; i++) {
|
||||
try {
|
||||
const providerComputeJobs = (await providers[i].computeStatus(
|
||||
'',
|
||||
account,
|
||||
undefined,
|
||||
undefined,
|
||||
false
|
||||
)) as ComputeJob[]
|
||||
|
||||
// means the provider uri is not good, so we ignore it and move on
|
||||
if (!providerComputeJobs) continue
|
||||
providerComputeJobs.sort((a, b) => {
|
||||
if (a.dateCreated > b.dateCreated) {
|
||||
return -1
|
||||
}
|
||||
if (a.dateCreated < b.dateCreated) {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
for (let j = 0; j < providerComputeJobs?.length; j++) {
|
||||
const job = providerComputeJobs[j]
|
||||
const did = job.inputDID[0]
|
||||
const ddo = assets.filter((x) => x.id === did)[0]
|
||||
|
||||
if (!ddo) continue
|
||||
const serviceMetadata = ddo.service.filter(
|
||||
(x: Service) => x.type === 'metadata'
|
||||
)[0]
|
||||
|
||||
const compJob: ComputeJobMetaData = {
|
||||
...job,
|
||||
assetName: serviceMetadata.attributes.main.name,
|
||||
assetDtSymbol: ddo.dataTokenInfo.symbol,
|
||||
networkId: ddo.chainId
|
||||
}
|
||||
computeJobs.push(compJob)
|
||||
}
|
||||
} catch (err) {
|
||||
Logger.error(err.message)
|
||||
}
|
||||
}
|
||||
|
||||
return computeJobs
|
||||
}
|
||||
|
||||
function getDtList(data: TokenOrder[]) {
|
||||
const dtList = []
|
||||
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
dtList.push(data[i].datatokenId.address)
|
||||
}
|
||||
const queryDtList = JSON.stringify(dtList)
|
||||
.replace(/,/g, ' ')
|
||||
.replace(/"/g, '')
|
||||
.replace(/(\[|\])/g, '')
|
||||
|
||||
return queryDtList
|
||||
}
|
||||
|
||||
export async function getComputeJobs(
|
||||
chainIds: number[],
|
||||
config: Config,
|
||||
ocean: Ocean,
|
||||
account: Account,
|
||||
ddo?: DDO
|
||||
): Promise<ComputeJobMetaData[]> {
|
||||
const assetDTAddress = ddo?.dataTokenInfo?.address
|
||||
|
||||
const variables = assetDTAddress
|
||||
? {
|
||||
user: account?.getId().toLowerCase(),
|
||||
datatokenAddress: assetDTAddress.toLowerCase()
|
||||
}
|
||||
: {
|
||||
user: account?.getId().toLowerCase()
|
||||
}
|
||||
|
||||
const result = await fetchDataForMultipleChains(
|
||||
assetDTAddress ? getComputeOrdersByDatatokenAddress : getComputeOrders,
|
||||
variables,
|
||||
assetDTAddress ? [ddo?.chainId] : chainIds
|
||||
)
|
||||
let data: TokenOrder[] = []
|
||||
|
||||
for (let i = 0; i < result.length; i++) {
|
||||
if (!result[i]?.tokenOrders) continue
|
||||
result[i]?.tokenOrders.forEach((tokenOrder: TokenOrder) => {
|
||||
data.push(tokenOrder)
|
||||
})
|
||||
}
|
||||
|
||||
if (!ocean || !account || !data) return
|
||||
|
||||
data = data.sort((a, b) => b.timestamp - a.timestamp)
|
||||
const queryDtList = getDtList(data)
|
||||
if (queryDtList === '') return
|
||||
|
||||
const source = axios.CancelToken.source()
|
||||
const assets = await getAssetMetadata(queryDtList, source.token, chainIds)
|
||||
const serviceEndpoints = getServiceEndpoints(data, assets)
|
||||
const providers: Provider[] = await getProviders(
|
||||
serviceEndpoints,
|
||||
config,
|
||||
ocean
|
||||
)
|
||||
const computeJobs = await getJobs(providers, account, assets)
|
||||
|
||||
return computeJobs
|
||||
}
|
||||
|
||||
export async function createTrustedAlgorithmList(
|
||||
selectedAlgorithms: string[], // list of DIDs
|
||||
|
Loading…
Reference in New Issue
Block a user