1
0
mirror of https://github.com/oceanprotocol/market.git synced 2024-12-02 05:57:29 +01:00

fixed compute jobs logic

This commit is contained in:
Bogdan Fazakas 2022-03-10 01:10:57 +02:00
parent 8aaae669f7
commit a8b31131c4
3 changed files with 87 additions and 144 deletions

View File

@ -1,5 +1,5 @@
import { ComputeJob } from '@oceanprotocol/lib'
import { OrdersData_tokenOrders_datatokenId as OrdersDatatoken } from './apollo/OrdersData'
import { OrdersData_orders_datatoken as OrdersDatatoken } from '../@types/subgraph/OrdersData'
// declaring into global scope to be able to use this as
// ambiant types despite the above imports
@ -22,10 +22,10 @@ declare global {
interface TokenOrder {
id: string
serviceId: number
datatokenId: OrdersDatatoken
serviceIndex: number
datatoken: OrdersDatatoken
tx: any
timestamp: number
createdTimestamp: number
}
interface ComputeResults {

View File

@ -74,16 +74,16 @@ async function getAssetMetadata(
const baseQueryparams = {
chainIds,
filters: [
getFilterTerm('dataToken', queryDtList),
getFilterTerm('datatokens', queryDtList),
getFilterTerm('service.type', 'compute'),
getFilterTerm('service.attributes.main.type', 'dataset')
getFilterTerm('metadata.type', 'dataset')
],
ignorePurgatory: true
} as BaseQueryParams
const query = generateBaseQuery(baseQueryparams)
const result = await queryMetadata(query, cancelToken)
return result.results
return result?.results
}
export async function isOrderable(
@ -209,125 +209,58 @@ export async function getAlgorithmAssetSelectionList(
return algorithmSelectionList
}
function getServiceEndpoints(data: TokenOrder[], assets: Asset[]): string[] {
// const serviceEndpoints: string[] = []
async function getJobs(
providerUrls: string[],
accountId: string,
assets: Asset[]
): Promise<ComputeJobMetaData[]> {
const computeJobs: ComputeJobMetaData[] = []
providerUrls.forEach(async (providerUrl) => {
try {
const providerComputeJobs = (await ProviderInstance.computeStatus(
providerUrl,
null,
null,
null,
accountId
)) as ComputeJob[]
// for (let i = 0; i < data.length; i++) {
// try {
// const did = web3.utils
// .toChecksumAddress(data[i].datatokenId.address)
// .replace('0x', 'did:op:')
// const ddo = assets.filter((x) => x.id === did)[0]
// if (ddo === undefined) continue
if (!providerComputeJobs) {
providerComputeJobs.sort((a, b) => {
if (a.dateCreated > b.dateCreated) {
return -1
}
if (a.dateCreated < b.dateCreated) {
return 1
}
return 0
})
providerComputeJobs.forEach((job) => {
const did = job.inputDID[0]
const asset = assets.filter((x) => x.id === did)[0]
// const service = ddo.services.filter(
// (x: Service) => x.index === data[i].serviceId
// )[0]
// if (!service || service.type !== 'compute') continue
// const { providerEndpoint } = service
// const wasProviderQueried =
// serviceEndpoints?.filter((x) => x === providerEndpoint).length > 0
// if (wasProviderQueried) continue
// serviceEndpoints.push(providerEndpoint)
// } catch (err) {
// LoggerInstance.error(err.message)
// }
// }
// return serviceEndpoints
return ['dummy']
if (!asset) {
const compJob: ComputeJobMetaData = {
...job,
assetName: asset.metadata.name,
assetDtSymbol: asset.datatokens[0].symbol,
networkId: asset.chainId
}
computeJobs.push(compJob)
}
})
}
} catch (err) {
LoggerInstance.error(err.message)
}
})
return computeJobs
}
// async function getProviders(
// serviceEndpoints: string[],
// config: Config,
// ocean: Ocean
// ): Promise<Provider[]> {
// const providers: Provider[] = []
// try {
// for (let i = 0; i < serviceEndpoints?.length; i++) {
// const instanceConfig = {
// config,
// web3: config.web3Provider,
// logger: LoggerInstance,
// ocean
// }
// const provider = await Provider.getInstance(instanceConfig)
// await provider.setBaseUrl(serviceEndpoints[i])
// const hasSameCompute =
// providers.filter((x) => x.computeAddress === provider.computeAddress)
// .length > 0
// if (!hasSameCompute) providers.push(provider)
// }
// } catch (err) {
// LoggerInstance.error(err.message)
// }
// return providers
// }
// async function getJobs(
// providers: Provider[],
// account: Account,
// assets: Asset[]
// ): Promise<ComputeJobMetaData[]> {
// const computeJobs: ComputeJobMetaData[] = []
// for (let i = 0; i < providers.length; i++) {
// try {
// const providerComputeJobs = (await providers[i].computeStatus(
// '',
// account,
// undefined,
// undefined,
// false
// )) as ComputeJob[]
// // means the provider uri is not good, so we ignore it and move on
// if (!providerComputeJobs) continue
// providerComputeJobs.sort((a, b) => {
// if (a.dateCreated > b.dateCreated) {
// return -1
// }
// if (a.dateCreated < b.dateCreated) {
// return 1
// }
// return 0
// })
// for (let j = 0; j < providerComputeJobs?.length; j++) {
// const job = providerComputeJobs[j]
// const did = job.inputDID[0]
// const ddo = assets.filter((x) => x.id === did)[0]
// if (!ddo) continue
// const compJob: ComputeJobMetaData = {
// ...job,
// assetName: ddo.metadata.name,
// assetDtSymbol: ddo.dataTokenInfo.symbol,
// networkId: ddo.chainId
// }
// computeJobs.push(compJob)
// }
// } catch (err) {
// LoggerInstance.error(err.message)
// }
// }
// return computeJobs
// }
export async function getComputeJobs(
chainIds: number[],
accountId: string,
asset?: AssetExtended,
token?: CancelToken
cancelToken?: CancelToken
): Promise<ComputeResults> {
if (!accountId) return
const assetDTAddress = asset.datatokens[0].address
@ -335,7 +268,6 @@ export async function getComputeJobs(
computeJobs: [],
isLoaded: false
}
let isLoading = true
const variables = assetDTAddress
? {
user: accountId.toLowerCase(),
@ -351,33 +283,37 @@ export async function getComputeJobs(
assetDTAddress ? [asset?.chainId] : chainIds
)
let data: TokenOrder[] = results.map((result) =>
result.tokenOrders.forEach((tokenOrder: TokenOrder) => {
return tokenOrder
})
)
let tokenOrders: TokenOrder[] = []
results.map((result) => {
result.orders.forEach((tokenOrder: TokenOrder) =>
tokenOrders.push(tokenOrder)
)
})
if (data.length === 0) {
if (tokenOrders.length === 0) {
return computeResult
}
data = data.sort((a, b) => b.timestamp - a.timestamp)
const queryDtList = data.map((tokenOrder) => tokenOrder.datatokenId.address)
if (!queryDtList) return
tokenOrders = tokenOrders.sort(
(a, b) => b.createdTimestamp - a.createdTimestamp
)
const datatokenAddressList = tokenOrders.map(
(tokenOrder: TokenOrder) => tokenOrder.datatoken.address
)
if (!datatokenAddressList) return
const assets = await getAssetMetadata(queryDtList, token, chainIds)
const serviceEndpoints = getServiceEndpoints(data, assets)
// const providers: Provider[] = await getProviders(
// serviceEndpoints,
// config,
// ocean
// )
// const computeJobs = await getJobs(providers, account, assets)
isLoading = false
// computeResult = {
// computeJobs: computeJobs,
// isLoaded: isLoading
// }
const assets = await getAssetMetadata(
datatokenAddressList,
cancelToken,
chainIds
)
const providerUrls: string[] = []
assets.forEach((asset: Asset) =>
providerUrls.push(asset.services[0].serviceEndpoint)
)
computeResult.computeJobs = await getJobs(providerUrls, accountId, assets)
computeResult.isLoaded = true
return computeResult
}

View File

@ -14,6 +14,7 @@ import { getComputeJobs } from '@utils/compute'
import styles from './index.module.css'
import { useAsset } from '@context/Asset'
import { useIsMounted } from '@hooks/useIsMounted'
import { useCancelToken } from '@hooks/useCancelToken'
export function Status({ children }: { children: string }): ReactElement {
return <div className={styles.status}>{children}</div>
@ -79,6 +80,7 @@ export default function ComputeJobs({
const [isLoading, setIsLoading] = useState(false)
const [jobs, setJobs] = useState<ComputeJobMetaData[]>([])
const isMounted = useIsMounted()
const newCancelToken = useCancelToken()
const columnsMinimal = [columns[4], columns[5], columns[3]]
@ -90,9 +92,14 @@ export default function ComputeJobs({
}
try {
setIsLoading(true)
const jobs = await getComputeJobs(chainIds, accountId, asset)
const jobs = await getComputeJobs(
chainIds,
accountId,
asset,
newCancelToken()
)
isMounted() && setJobs(jobs.computeJobs)
setIsLoading(jobs.isLoaded)
setIsLoading(!jobs.isLoaded)
} catch (error) {
LoggerInstance.error(error.message)
}