From ee7a45e17ae8e0a74afa6dd0ca11791de709e4fd Mon Sep 17 00:00:00 2001 From: Alex Coseru Date: Wed, 9 Feb 2022 19:13:25 +0200 Subject: [PATCH] use compute environments (#1258) * use compute environments --- .github/workflows/ci.yml | 32 ++++++++++++++++++----- scripts/waitforcontracts.sh | 2 ++ src/@types/Compute.ts | 18 +++++++++++++ src/aquarius/Aquarius.ts | 13 ++-------- src/provider/Provider.ts | 39 +++++++++++++++++++++++++--- src/utils/General.ts | 8 ++++++ src/utils/index.ts | 1 + test/integration/ComputeFlow.test.ts | 32 +++++++++++++++++------ 8 files changed, 116 insertions(+), 29 deletions(-) create mode 100644 src/utils/General.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 935420e6..7349cfe1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,10 +57,17 @@ jobs: - name: Run Ganache with Barge working-directory: ${{ github.workspace }}/barge run: | - bash -x start_ocean.sh --no-aquarius --no-elasticsearch --no-provider --no-dashboard --skip-deploy 2>&1 > start_ocean.log & - cd .. + bash -x start_ocean.sh --no-aquarius --no-elasticsearch --no-provider --no-dashboard 2>&1 > start_ocean.log & - run: npm ci + - name: Wait for contracts deployment + working-directory: ${{ github.workspace }}/barge + run: | + for i in $(seq 1 250); do + sleep 5 + [ -f "$HOME/.ocean/ocean-contracts/artifacts/ready" ] && break + done + ls -la "$HOME/.ocean/ocean-contracts/artifacts/" - run: npm run build:metadata - run: npm run test:unit:cover - uses: actions/upload-artifact@v2 @@ -84,8 +91,6 @@ jobs: path: ~/.npm key: ${{ runner.os }}-test-integration-${{ env.cache-name }}-${{ hashFiles('**/package-lock.json') }} restore-keys: ${{ runner.os }}-test-integration-${{ env.cache-name }}- - - run: npm ci - - run: npm run build:metadata # Env var expansion workaround # https://docs.github.com/en/actions/reference/workflow-commands-for-github-actions#setting-an-environment-variable @@ -109,14 +114,29 @@ jobs: - name: Run Barge working-directory: ${{ github.workspace }}/barge + env: + OPERATOR_SERVICE_VERSION: compute_envs + OPERATOR_ENGINE_VERSION: compute_envs + PROVIDER_VERSION: compute_envs run: | bash -x start_ocean.sh --with-provider2 --no-dashboard --with-c2d 2>&1 > start_ocean.log & - cd .. && ./scripts/waitforcontracts.sh + + - run: npm ci + - run: npm run build:metadata + + - name: Wait for contracts deployment and C2D cluster to be ready + working-directory: ${{ github.workspace }}/barge + run: | + for i in $(seq 1 250); do + sleep 10 + [ -f "$HOME/.ocean/ocean-contracts/artifacts/ready" -a -f "$HOME/.ocean/ocean-c2d/ready" ] && break + done + - name: integration run: npm run test:integration:cover - name: docker logs - run: docker logs ocean_aquarius_1 && docker logs ocean_provider_1 && docker logs ocean_provider2_1 + run: docker logs ocean_aquarius_1 && docker logs ocean_provider_1 && docker logs ocean_provider2_1 && docker logs ocean_computetodata_1 if: ${{ failure() }} - name: Upload coverage uses: actions/upload-artifact@v2 diff --git a/scripts/waitforcontracts.sh b/scripts/waitforcontracts.sh index 537307fe..9a2f5740 100755 --- a/scripts/waitforcontracts.sh +++ b/scripts/waitforcontracts.sh @@ -2,6 +2,8 @@ while [ ! -f "$HOME/.ocean/ocean-contracts/artifacts/ready" ] || [ ! -f "$HOME/.ocean/ocean-c2d/ready" ]; do sleep 2 done +#wait for c2d +sleep 10 #cat "barge/start_ocean.log" ls -lh "${HOME}/.ocean/ocean-contracts/" ls -lh "${HOME}/.ocean/ocean-contracts/artifacts/" diff --git a/src/@types/Compute.ts b/src/@types/Compute.ts index 2a15b983..c175b115 100644 --- a/src/@types/Compute.ts +++ b/src/@types/Compute.ts @@ -2,6 +2,24 @@ import { Metadata, MetadataAlgorithm } from './DDO/Metadata' export type ComputeResultType = 'algorithmLog' | 'output' +export interface ComputeEnvironment { + id: string + cpuNumber: number + cpuType: string + gpuNumber: number + gpuType: string + ramGB: number + diskGB: number + priceMin: number + desc: string + currentJobs: number + maxJobs: number + consumerAddress: string + storageExpiry: number + maxJobDuration: number + lastSeen: number +} + export interface ComputeResult { filename: string filesize: number diff --git a/src/aquarius/Aquarius.ts b/src/aquarius/Aquarius.ts index 0fc47388..823010c2 100644 --- a/src/aquarius/Aquarius.ts +++ b/src/aquarius/Aquarius.ts @@ -1,4 +1,4 @@ -import { LoggerInstance } from '../utils' +import { LoggerInstance, sleep } from '../utils' import { Asset, DDO, ValidateMetadata } from '../@types/' import fetch from 'cross-fetch' export class Aquarius { @@ -39,15 +39,6 @@ export class Aquarius { } } - /** - * Simple blocking sleep function - */ - public sleep(ms: number) { - return new Promise((resolve) => { - setTimeout(resolve, ms) - }) - } - /** * Blocks until Aqua will cache the did (or the update for that did) or timeouts @@ -82,7 +73,7 @@ export class Aquarius { } catch (e) { // do nothing } - await this.sleep(1500) + await sleep(1500) tries++ } while (tries < 100) return null diff --git a/src/provider/Provider.ts b/src/provider/Provider.ts index 3c21f373..283647e1 100644 --- a/src/provider/Provider.ts +++ b/src/provider/Provider.ts @@ -6,6 +6,7 @@ import { ComputeOutput, ComputeAlgorithm, ComputeAsset, + ComputeEnvironment, ProviderInitialize } from '../@types/' import { noZeroX } from '../utils/ConversionTypeHelper' @@ -245,6 +246,36 @@ export class Provider { } } + /** Get Compute Environments + * @return {Promise} urlDetails + */ + public async getComputeEnvironments( + providerUri: string, + signal?: AbortSignal + ): Promise { + const providerEndpoints = await this.getEndpoints(providerUri) + const serviceEndpoints = await this.getServiceEndpoints( + providerUri, + providerEndpoints + ) + const path = this.getEndpointURL(serviceEndpoints, 'computeEnvironments')?.urlPath + if (!path) return null + try { + const response = await fetch(path, { + method: 'GET', + headers: { + 'Content-Type': 'application/json' + }, + signal: signal + }) + const envs: ComputeEnvironment[] = await response.json() + return envs + } catch (e) { + LoggerInstance.error(e.message) + return null + } + } + /** Initialize a service request. * @param {DDO | string} asset * @param {number} serviceIndex @@ -282,7 +313,7 @@ export class Provider { initializeUrl += `&consumerAddress=${consumerAddress}` if (userCustomParameters) initializeUrl += '&userdata=' + encodeURI(JSON.stringify(userCustomParameters)) - if (computeEnv) initializeUrl += '&computeEnv=' + encodeURI(computeEnv) + if (computeEnv) initializeUrl += '&environment=' + encodeURI(computeEnv) if (validUntil) initializeUrl += '&validUntil=' + validUntil try { const response = await fetch(initializeUrl, { @@ -348,6 +379,7 @@ export class Provider { /** Instruct the provider to start a compute job * @param {string} did * @param {string} consumerAddress + * @param {string} computeEnv * @param {ComputeAlgorithm} algorithm * @param {string} providerUri * @param {Web3} web3 @@ -389,7 +421,7 @@ export class Provider { payload.consumerAddress = consumerAddress payload.signature = signature payload.nonce = nonce - payload.computeEnv = computeEnv + payload.environment = computeEnv payload.dataset = dataset payload.algorithm = algorithm if (payload.additionalDatasets) payload.additionalDatasets = additionalDatasets @@ -526,7 +558,7 @@ export class Provider { : null let url = '?documentId=' + noZeroX(did) - url += `&consumerAddress=${consumerAddress}` + url += (consumerAddress && `&consumerAddress=${consumerAddress}`) || '' url += (jobId && `&jobId=${jobId}`) || '' if (!computeStatusUrl) return null @@ -538,7 +570,6 @@ export class Provider { }, signal: signal }) - if (response?.ok) { const params = await response.json() return params diff --git a/src/utils/General.ts b/src/utils/General.ts new file mode 100644 index 00000000..b24cdde3 --- /dev/null +++ b/src/utils/General.ts @@ -0,0 +1,8 @@ +/** + * Simple blocking sleep function + */ +export async function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) +} diff --git a/src/utils/index.ts b/src/utils/index.ts index b44f6204..4d43f0a4 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -7,3 +7,4 @@ export * from './DdoHelpers' export * from './Constants' export * from './SignatureUtils' export * from './TokenUtils' +export * from './General' diff --git a/test/integration/ComputeFlow.test.ts b/test/integration/ComputeFlow.test.ts index 44d808e5..cb62e670 100644 --- a/test/integration/ComputeFlow.test.ts +++ b/test/integration/ComputeFlow.test.ts @@ -4,7 +4,7 @@ import Aquarius from '../../src/aquarius/Aquarius' import { assert } from 'chai' import { NftFactory, NftCreateData } from '../../src/factories/index' import { Datatoken } from '../../src/tokens/Datatoken' -import { getHash } from '../../src/utils' +import { getHash, sleep } from '../../src/utils' import { Nft } from '../../src/tokens/NFT' import Web3 from 'web3' import { SHA256 } from 'crypto-js' @@ -236,13 +236,27 @@ describe('Simple compute tests', async () => { await datatoken.mint(datatokenAddressAsset, publisherAccount, '1', consumerAccount) await datatoken.mint(datatokenAddressAlgo, publisherAccount, '1', consumerAccount) + // get compute environments + const computeEnvs = await ProviderInstance.getComputeEnvironments(providerUrl) + assert(computeEnvs, 'No Compute environments found') + // we choose the first env + const computeEnv = computeEnvs[0].id + const computeConsumerAddress = computeEnvs[0].consumerAddress + // let's have 60 seconds of compute access + const mytime = new Date() + mytime.setMinutes(mytime.getMinutes() + 1) + const computeValidUntil = mytime.getTime() // initialize provider orders for algo const initializeDataAlgo = await ProviderInstance.initialize( resolvedDDOAlgo.id, resolvedDDOAlgo.services[0].id, 0, consumerAccount, - providerUrl + providerUrl, + null, + null, + computeEnv, + computeValidUntil ) const providerAlgoFees: ProviderFees = { providerFeeAddress: initializeDataAlgo.providerFee.providerFeeAddress, @@ -259,7 +273,7 @@ describe('Simple compute tests', async () => { const txidAlgo = await datatoken.startOrder( datatokenAddressAlgo, consumerAccount, - initializeDataAlgo.computeAddress, + computeConsumerAddress, // this is important because the c2d is the consumer, and user is the payer. Otherwise, c2d will have no access to the asset 0, providerAlgoFees ) @@ -277,8 +291,8 @@ describe('Simple compute tests', async () => { providerUrl, null, null, - 'env1', - providerValidUntil.getTime() + computeEnv, + computeValidUntil ) const providerDatasetFees: ProviderFees = { providerFeeAddress: initializeData.providerFee.providerFeeAddress, @@ -294,7 +308,7 @@ describe('Simple compute tests', async () => { const txidAsset = await datatoken.startOrder( datatokenAddressAsset, consumerAccount, - initializeDataAlgo.computeAddress, + computeConsumerAddress, // this is important because the c2d is the consumer, and user is the payer. Otherwise, c2d will have no access to the asset 0, providerDatasetFees ) @@ -304,7 +318,7 @@ describe('Simple compute tests', async () => { providerUrl, web3, consumerAccount, - 'env1', + computeEnv, { documentId: resolvedDDOAsset.id, serviceId: resolvedDDOAsset.services[0].id, @@ -320,7 +334,9 @@ describe('Simple compute tests', async () => { const jobStatus = await ProviderInstance.computeStatus( providerUrl, null, - computeJobs[0].jobId + computeJobs[0].jobId, + resolvedDDOAsset.id, + consumerAccount ) assert(jobStatus) })