1
0
mirror of https://github.com/oceanprotocol/ocean.js.git synced 2024-11-26 20:39:05 +01:00

simple compute flow (#1458)

* refactor compute flow
This commit is contained in:
Alex Coseru 2022-05-10 08:52:24 +03:00 committed by GitHub
parent b66fffc635
commit 6d9f9fed92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 392 additions and 205 deletions

View File

@ -58,8 +58,6 @@ jobs:
working-directory: ${{ github.workspace }}/barge
run: |
bash -x start_ocean.sh --no-aquarius --no-elasticsearch --no-provider --no-dashboard 2>&1 > start_ocean.log &
env:
CONTRACTS_VERSION: v1.0.0-alpha.31
- run: npm ci
- name: Wait for contracts deployment
working-directory: ${{ github.workspace }}/barge
@ -117,8 +115,6 @@ jobs:
working-directory: ${{ github.workspace }}/barge
run: |
bash -x start_ocean.sh --with-provider2 --no-dashboard --with-c2d 2>&1 > start_ocean.log &
env:
CONTRACTS_VERSION: v1.0.0-alpha.31
- run: npm ci
- run: npm run build:metadata

View File

@ -15,3 +15,14 @@ export interface ProviderInitialize {
computeAddress: string
providerFee: ProviderFees
}
export interface ProviderComputeInitialize {
datatoken?: string
validOrder?: string
providerFee?: ProviderFees
}
export interface ProviderComputeInitializeResults {
algorithm?: ProviderComputeInitialize
datasets?: ProviderComputeInitialize[]
}

View File

@ -7,7 +7,8 @@ import {
ComputeAlgorithm,
ComputeAsset,
ComputeEnvironment,
ProviderInitialize
ProviderInitialize,
ProviderComputeInitializeResults
} from '../@types/'
import { noZeroX } from '../utils/ConversionTypeHelper'
import fetch from 'cross-fetch'
@ -327,6 +328,60 @@ export class Provider {
}
}
/** Initialize a compute request.
* @param {ComputeAsset} assets
* @param {ComputeAlgorithmber} algorithm
* @param {string} computeEnv
* @param {number} validUntil
* @param {string} providerUri Identifier of the asset to be registered in ocean
* @param {string} accountId
* @param {AbortSignal} signal abort signal
* @return {Promise<ProviderComputeInitialize>} ProviderComputeInitialize data
*/
public async initializeCompute(
assets: ComputeAsset[],
algorithm: ComputeAlgorithm,
computeEnv: string,
validUntil: number,
providerUri: string,
accountId: string,
signal?: AbortSignal
): Promise<ProviderComputeInitializeResults> {
const providerEndpoints = await this.getEndpoints(providerUri)
const serviceEndpoints = await this.getServiceEndpoints(
providerUri,
providerEndpoints
)
const providerData = {
datasets: assets,
algorithm: algorithm,
compute: {
env: computeEnv,
validUntil: validUntil
},
consumerAddress: accountId
}
const initializeUrl = this.getEndpointURL(serviceEndpoints, 'initializeCompute')
? this.getEndpointURL(serviceEndpoints, 'initializeCompute').urlPath
: null
if (!initializeUrl) return null
try {
const response = await fetch(initializeUrl, {
method: 'POST',
body: JSON.stringify(providerData),
headers: {
'Content-Type': 'application/json'
},
signal: signal
})
const results = await response.json()
return results
} catch (e) {
LoggerInstance.error(e)
throw new Error('ComputeJob cannot be initialized')
}
}
/** Gets fully signed URL for download
* @param {string} did
* @param {string} accountId

View File

@ -8,11 +8,39 @@ import {
NftFactory,
NftCreateData,
Datatoken,
getHash,
Nft,
sleep
sleep,
ZERO_ADDRESS
} from '../../src'
import { ProviderFees, Erc20CreateParams, ComputeJob, Asset } from '../../src/@types'
import {
Erc20CreateParams,
ComputeJob,
ComputeAsset,
ComputeAlgorithm,
ProviderComputeInitialize,
ConsumeMarketFee
} from '../../src/@types'
let config: Config
let aquarius: Aquarius
let datatoken: Datatoken
let providerUrl: string
let consumerAccount: string
let publisherAccount: string
let computeJobId: string
let providerInitializeComputeResults
let computeEnvs
let addresses: any
let ddoWith1mTimeoutId
let ddoWithNoTimeoutId
let algoDdoWith1mTimeoutId
let algoDdoWithNoTimeoutId
let resolvedDdoWith1mTimeout
let resolvedDdoWithNoTimeout
let resolvedAlgoDdoWith1mTimeout
let resolvedAlgoDdoWithNoTimeout
const assetUrl = [
{
@ -21,7 +49,7 @@ const assetUrl = [
method: 'GET'
}
]
const ddo = {
const ddoWithNoTimeout = {
'@context': ['https://w3id.org/did/v1'],
id: 'did:op:efba17455c127a885ec7830d687a8f6e64f5ba559f8506f8723c1f10f05c049c',
version: '4.0.0',
@ -52,13 +80,44 @@ const ddo = {
publisherTrustedAlgorithmPublishers: [],
publisherTrustedAlgorithms: [],
allowRawAlgorithm: true,
allowNetworkAccess: true,
namespace: 'ocean-compute',
cpus: 2,
gpus: 4,
gpuType: 'NVIDIA Tesla V100 GPU',
memory: '128M',
volumeSize: '2G'
allowNetworkAccess: true
}
}
]
}
const ddoWith1mTimeout = {
'@context': ['https://w3id.org/did/v1'],
id: 'did:op:efba17455c127a885ec7830d687a8f6e64f5ba559f8506f8723c1f10f05c049c',
version: '4.0.0',
chainId: 4,
nftAddress: '0x0',
metadata: {
created: '2021-12-20T14:35:20Z',
updated: '2021-12-20T14:35:20Z',
type: 'dataset',
name: 'dfgdfgdg',
description: 'd dfgd fgd dfg dfgdfgd dfgdf',
tags: [''],
author: 'dd',
license: 'https://market.oceanprotocol.com/terms',
additionalInformation: {
termsAndConditions: true
}
},
services: [
{
id: 'notAnId',
type: 'compute',
files: '',
datatokenAddress: '0xa15024b732A8f2146423D14209eFd074e61964F3',
serviceEndpoint: 'https://providerv4.rinkeby.oceanprotocol.com',
timeout: 60,
compute: {
publisherTrustedAlgorithmPublishers: [],
publisherTrustedAlgorithms: [],
allowRawAlgorithm: true,
allowNetworkAccess: true
}
}
]
@ -70,7 +129,7 @@ const algoAssetUrl = [
method: 'GET'
}
]
const algoDdo = {
const algoDdoWithNoTimeout = {
'@context': ['https://w3id.org/did/v1'],
id: 'did:op:efba17455c127a885ec7830d687a8f6e64f5ba559f8506f8723c1f10f05c049c',
version: '4.0.0',
@ -110,144 +169,246 @@ const algoDdo = {
}
]
}
let providerUrl: string
let consumerAccount: string
let computeJobId: string
let resolvedDDOAsset: Asset
const algoDdoWith1mTimeout = {
'@context': ['https://w3id.org/did/v1'],
id: 'did:op:efba17455c127a885ec7830d687a8f6e64f5ba559f8506f8723c1f10f05c049c',
version: '4.0.0',
chainId: 4,
nftAddress: '0x0',
metadata: {
created: '2021-12-20T14:35:20Z',
updated: '2021-12-20T14:35:20Z',
type: 'algorithm',
name: 'dfgdfgdg',
description: 'd dfgd fgd dfg dfgdfgd dfgdf',
tags: [''],
author: 'dd',
license: 'https://market.oceanprotocol.com/terms',
additionalInformation: {
termsAndConditions: true
},
algorithm: {
language: 'Node.js',
version: '1.0.0',
container: {
entrypoint: 'node $ALGO',
image: 'ubuntu',
tag: 'latest',
checksum: '44e10daa6637893f4276bb8d7301eb35306ece50f61ca34dcab550'
}
}
},
services: [
{
id: 'notAnId',
type: 'access',
files: '',
datatokenAddress: '0xa15024b732A8f2146423D14209eFd074e61964F3',
serviceEndpoint: 'https://providerv4.rinkeby.oceanprotocol.com',
timeout: 60
}
]
}
async function createAsset(
name: string,
symbol: string,
owner: string,
assetUrl: any,
ddo: any,
providerUrl: string
) {
const nft = new Nft(web3)
const Factory = new NftFactory(addresses.ERC721Factory, web3)
const chain = await web3.eth.getChainId()
ddo.chainId = parseInt(chain.toString(10))
const nftParamsAsset: NftCreateData = {
name: name,
symbol: symbol,
templateIndex: 1,
tokenURI: 'aaa',
transferable: true,
owner: owner
}
const erc20ParamsAsset: Erc20CreateParams = {
templateIndex: 1,
cap: '100000',
feeAmount: '0',
paymentCollector: ZERO_ADDRESS,
feeToken: ZERO_ADDRESS,
minter: owner,
mpFeeAddress: ZERO_ADDRESS
}
const result = await Factory.createNftWithErc20(owner, nftParamsAsset, erc20ParamsAsset)
const erc721AddressAsset = result.events.NFTCreated.returnValues[0]
const datatokenAddressAsset = result.events.TokenCreated.returnValues[0]
// create the files encrypted string
let providerResponse = await ProviderInstance.encrypt(assetUrl, providerUrl)
ddo.services[0].files = await providerResponse
ddo.services[0].datatokenAddress = datatokenAddressAsset
ddo.services[0].serviceEndpoint = providerUrl
// update ddo and set the right did
ddo.nftAddress = web3.utils.toChecksumAddress(erc721AddressAsset)
ddo.id =
'did:op:' +
SHA256(web3.utils.toChecksumAddress(erc721AddressAsset) + chain.toString(10))
providerResponse = await ProviderInstance.encrypt(ddo, providerUrl)
const encryptedResponse = await providerResponse
const validateResult = await aquarius.validate(ddo)
assert(validateResult.valid, 'Could not validate metadata')
await nft.setMetadata(
erc721AddressAsset,
owner,
0,
providerUrl,
'',
'0x2',
encryptedResponse,
validateResult.hash
)
return ddo.id
}
async function handleOrder(
order: ProviderComputeInitialize,
datatokenAddress: string,
payerAccount: string,
consumerAccount: string,
serviceIndex: number,
consumeMarkerFee?: ConsumeMarketFee
) {
/* We do have 3 possible situations:
- have validOrder and no providerFees -> then order is valid, providerFees are valid, just use it in startCompute
- have validOrder and providerFees -> then order is valid but providerFees are not valid, we need to call reuseOrder and pay only providerFees
- no validOrder -> we need to call startOrder, to pay 1 DT & providerFees
*/
if (order.providerFee && order.providerFee.providerFeeAmount) {
await datatoken.approve(
order.providerFee.providerFeeToken,
datatokenAddress,
order.providerFee.providerFeeAmount,
payerAccount
)
}
if (order.validOrder) {
if (!order.providerFee) return order.validOrder
const tx = await datatoken.reuseOrder(
datatokenAddress,
payerAccount,
order.validOrder,
order.providerFee
)
return tx.transactionHash
}
const tx = await datatoken.startOrder(
datatokenAddress,
payerAccount,
consumerAccount,
serviceIndex,
order.providerFee,
consumeMarkerFee
)
return tx.transactionHash
}
describe('Simple compute tests', async () => {
let config: Config
let addresses: any
let aquarius: Aquarius
before(async () => {
config = await getTestConfig(web3)
addresses = getAddresses()
aquarius = new Aquarius(config.metadataCacheUri)
providerUrl = config.providerUri
datatoken = new Datatoken(web3)
})
it('should publish a dataset, algorithm and start a compute job', async () => {
const nft = new Nft(web3)
const datatoken = new Datatoken(web3)
const Factory = new NftFactory(addresses.ERC721Factory, web3)
it('should publish datasets and algorithms', async () => {
const accounts = await web3.eth.getAccounts()
const publisherAccount = accounts[0]
publisherAccount = accounts[0]
consumerAccount = accounts[1]
const chain = await web3.eth.getChainId()
const nftParamsAsset: NftCreateData = {
name: 'testNFT',
symbol: 'TST',
templateIndex: 1,
tokenURI: 'aaa',
transferable: true,
owner: publisherAccount
}
const erc20ParamsAsset: Erc20CreateParams = {
templateIndex: 1,
cap: '100000',
feeAmount: '0',
paymentCollector: '0x0000000000000000000000000000000000000000',
feeToken: '0x0000000000000000000000000000000000000000',
minter: publisherAccount,
mpFeeAddress: '0x0000000000000000000000000000000000000000'
}
const result = await Factory.createNftWithErc20(
ddoWith1mTimeoutId = await createAsset(
'D1Min',
'D1M',
publisherAccount,
nftParamsAsset,
erc20ParamsAsset
assetUrl,
ddoWith1mTimeout,
providerUrl
)
ddoWithNoTimeoutId = await createAsset(
'D1Min',
'D1M',
publisherAccount,
assetUrl,
ddoWithNoTimeout,
providerUrl
)
algoDdoWith1mTimeoutId = await createAsset(
'A1Min',
'A1M',
publisherAccount,
algoAssetUrl,
algoDdoWith1mTimeout,
providerUrl
)
const erc721AddressAsset = result.events.NFTCreated.returnValues[0]
const datatokenAddressAsset = result.events.TokenCreated.returnValues[0]
// create the files encrypted string
let providerResponse = await ProviderInstance.encrypt(assetUrl, providerUrl)
ddo.services[0].files = await providerResponse
ddo.services[0].datatokenAddress = datatokenAddressAsset
// update ddo and set the right did
ddo.nftAddress = erc721AddressAsset
ddo.id =
'did:op:' +
SHA256(web3.utils.toChecksumAddress(erc721AddressAsset) + chain.toString(10))
providerResponse = await ProviderInstance.encrypt(ddo, providerUrl)
let encryptedResponse = await providerResponse
let metadataHash = getHash(JSON.stringify(ddo))
let res = await nft.setMetadata(
erc721AddressAsset,
algoDdoWithNoTimeoutId = await createAsset(
'A1Min',
'A1M',
publisherAccount,
0,
providerUrl,
'',
'0x2',
encryptedResponse,
'0x' + metadataHash
algoAssetUrl,
algoDdoWithNoTimeout,
providerUrl
)
// let's publish the algorithm as well
const nftParamsAlgo: NftCreateData = {
name: 'testNFT',
symbol: 'TST',
templateIndex: 1,
tokenURI: '',
transferable: true,
owner: publisherAccount
}
const erc20ParamsAlgo: Erc20CreateParams = {
templateIndex: 1,
cap: '100000',
feeAmount: '0',
paymentCollector: '0x0000000000000000000000000000000000000000',
feeToken: '0x0000000000000000000000000000000000000000',
minter: publisherAccount,
mpFeeAddress: '0x0000000000000000000000000000000000000000'
}
const resultAlgo = await Factory.createNftWithErc20(
})
it('should resolve published datasets and algorithms', async () => {
resolvedDdoWith1mTimeout = await aquarius.waitForAqua(ddoWith1mTimeoutId)
assert(resolvedDdoWith1mTimeout, 'Cannot fetch DDO from Aquarius')
resolvedDdoWithNoTimeout = await aquarius.waitForAqua(ddoWithNoTimeoutId)
assert(resolvedDdoWithNoTimeout, 'Cannot fetch DDO from Aquarius')
resolvedAlgoDdoWith1mTimeout = await aquarius.waitForAqua(algoDdoWith1mTimeoutId)
assert(resolvedAlgoDdoWith1mTimeout, 'Cannot fetch DDO from Aquarius')
resolvedAlgoDdoWithNoTimeout = await aquarius.waitForAqua(algoDdoWithNoTimeoutId)
assert(resolvedAlgoDdoWithNoTimeout, 'Cannot fetch DDO from Aquarius')
})
it('should send DT to consumer', async () => {
const datatoken = new Datatoken(web3)
await datatoken.mint(
resolvedDdoWith1mTimeout.services[0].datatokenAddress,
publisherAccount,
nftParamsAlgo,
erc20ParamsAlgo
'10',
consumerAccount
)
const erc721AddressAlgo = resultAlgo.events.NFTCreated.returnValues[0]
const datatokenAddressAlgo = resultAlgo.events.TokenCreated.returnValues[0]
// create the files encrypted string
providerResponse = await ProviderInstance.encrypt(algoAssetUrl, providerUrl)
algoDdo.services[0].files = await providerResponse
algoDdo.services[0].datatokenAddress = datatokenAddressAlgo
// update ddo and set the right did
algoDdo.nftAddress = erc721AddressAlgo
algoDdo.id =
'did:op:' +
SHA256(web3.utils.toChecksumAddress(erc721AddressAlgo) + chain.toString(10))
providerResponse = await ProviderInstance.encrypt(algoDdo, providerUrl)
encryptedResponse = await providerResponse
metadataHash = getHash(JSON.stringify(algoDdo))
res = await nft.setMetadata(
erc721AddressAlgo,
await datatoken.mint(
resolvedDdoWithNoTimeout.services[0].datatokenAddress,
publisherAccount,
0,
providerUrl,
'',
'0x2',
encryptedResponse,
'0x' + metadataHash
'10',
consumerAccount
)
await datatoken.mint(
resolvedAlgoDdoWith1mTimeout.services[0].datatokenAddress,
publisherAccount,
'10',
consumerAccount
)
await datatoken.mint(
resolvedAlgoDdoWithNoTimeout.services[0].datatokenAddress,
publisherAccount,
'10',
consumerAccount
)
})
// let's wait
resolvedDDOAsset = await aquarius.waitForAqua(ddo.id)
assert(resolvedDDOAsset, 'Cannot fetch DDO from Aquarius')
const resolvedDDOAlgo = await aquarius.waitForAqua(algoDdo.id)
assert(resolvedDDOAlgo, 'Cannot fetch DDO from Aquarius')
// mint 1 ERC20 and send it to the consumer
await datatoken.mint(datatokenAddressAsset, publisherAccount, '1', consumerAccount)
await datatoken.mint(datatokenAddressAlgo, publisherAccount, '1', consumerAccount)
it('should fetch compute environments from provider', async () => {
// get compute environments
const computeEnvs = await ProviderInstance.getComputeEnvironments(providerUrl)
computeEnvs = await ProviderInstance.getComputeEnvironments(providerUrl)
assert(computeEnvs, 'No Compute environments found')
})
it('should start a computeJob', async () => {
// we choose the first env
const computeEnv = computeEnvs[0].id
const computeConsumerAddress = computeEnvs[0].consumerAddress
@ -255,89 +416,53 @@ describe('Simple compute tests', async () => {
const mytime = new Date()
mytime.setMinutes(mytime.getMinutes() + 19)
const computeValidUntil = Math.floor(mytime.getTime() / 1000)
// initialize provider orders for algo
const initializeDataAlgo = await ProviderInstance.initialize(
resolvedDDOAlgo.id,
resolvedDDOAlgo.services[0].id,
0,
consumerAccount,
providerUrl,
null,
null,
computeEnv,
computeValidUntil
)
const providerAlgoFees: ProviderFees = {
providerFeeAddress: initializeDataAlgo.providerFee.providerFeeAddress,
providerFeeToken: initializeDataAlgo.providerFee.providerFeeToken,
providerFeeAmount: initializeDataAlgo.providerFee.providerFeeAmount,
v: initializeDataAlgo.providerFee.v,
r: initializeDataAlgo.providerFee.r,
s: initializeDataAlgo.providerFee.s,
providerData: initializeDataAlgo.providerFee.providerData,
validUntil: initializeDataAlgo.providerFee.validUntil
const assets: ComputeAsset[] = [
{
documentId: resolvedDdoWith1mTimeout.id,
serviceId: resolvedDdoWith1mTimeout.services[0].id
}
]
const dtAddressArray = [resolvedDdoWith1mTimeout.services[0].datatokenAddress]
const algo: ComputeAlgorithm = {
documentId: resolvedAlgoDdoWith1mTimeout.id,
serviceId: resolvedAlgoDdoWith1mTimeout.services[0].id
}
// make the payment
const txidAlgo = await datatoken.startOrder(
datatokenAddressAlgo,
consumerAccount,
computeConsumerAddress, // this is important because the c2d is the consumer, and user is the payer. Otherwise, c2d will have no access to the asset
0,
providerAlgoFees
)
assert(txidAlgo, 'Failed to order algo')
const providerValidUntil = new Date()
providerValidUntil.setHours(providerValidUntil.getHours() + 1)
// initialize provider orders for asset
const initializeData = await ProviderInstance.initialize(
resolvedDDOAsset.id,
resolvedDDOAsset.services[0].id,
0,
consumerAccount,
providerUrl,
null,
null,
providerInitializeComputeResults = await ProviderInstance.initializeCompute(
assets,
algo,
computeEnv,
computeValidUntil
computeValidUntil,
providerUrl,
consumerAccount
)
const providerDatasetFees: ProviderFees = {
providerFeeAddress: initializeData.providerFee.providerFeeAddress,
providerFeeToken: initializeData.providerFee.providerFeeToken,
providerFeeAmount: initializeData.providerFee.providerFeeAmount,
v: initializeData.providerFee.v,
r: initializeData.providerFee.r,
s: initializeData.providerFee.s,
providerData: initializeData.providerFee.providerData,
validUntil: initializeData.providerFee.validUntil
}
// make the payment
const txidAsset = await datatoken.startOrder(
datatokenAddressAsset,
assert(
!('error' in providerInitializeComputeResults.algorithm),
'Cannot order algorithm'
)
algo.transferTxId = await handleOrder(
providerInitializeComputeResults.algorithm,
resolvedAlgoDdoWith1mTimeout.services[0].datatokenAddress,
consumerAccount,
computeConsumerAddress, // this is important because the c2d is the consumer, and user is the payer. Otherwise, c2d will have no access to the asset
0,
providerDatasetFees
computeConsumerAddress,
0
)
assert(txidAsset, 'Failed to order algo')
// start the compute job
for (let i = 0; i < providerInitializeComputeResults.datasets.length; i++) {
assets[i].transferTxId = await handleOrder(
providerInitializeComputeResults.datasets[i],
dtAddressArray[i],
consumerAccount,
computeConsumerAddress,
0
)
}
const computeJobs = await ProviderInstance.computeStart(
providerUrl,
web3,
consumerAccount,
computeEnv,
{
documentId: resolvedDDOAsset.id,
serviceId: resolvedDDOAsset.services[0].id,
transferTxId: txidAsset.transactionHash
},
{
documentId: resolvedDDOAlgo.id,
serviceId: resolvedDDOAlgo.services[0].id,
transferTxId: txidAlgo.transactionHash
}
assets[0],
algo
)
assert(computeJobs, 'Cannot start compute job')
computeJobId = computeJobs[0].jobId
@ -348,7 +473,7 @@ describe('Simple compute tests', async () => {
providerUrl,
consumerAccount,
computeJobId,
resolvedDDOAsset.id
resolvedDdoWith1mTimeout.id
)) as ComputeJob
assert(jobStatus, 'Cannot retrieve compute status!')
})