1
0
mirror of https://github.com/oceanprotocol-archive/squid-js.git synced 2024-02-02 15:31:51 +01:00

add execution of compute jobs

This commit is contained in:
Matthias Kretschmann 2020-01-09 12:20:19 +01:00
parent ca67de1254
commit ae7768253e
Signed by: m
GPG Key ID: 606EEEF3C479A91F
4 changed files with 133 additions and 60 deletions

View File

@ -1,4 +1,4 @@
import { File } from '../ddo/MetaData'
import { File, MetaData } from '../ddo/MetaData'
import Account from '../ocean/Account'
import { noZeroX } from '../utils'
import { Instantiable, InstantiableConfig } from '../Instantiable.abstract'
@ -35,13 +35,8 @@ export class Brizo extends Instantiable {
return `${this.url}${apiPath}/publish`
}
public getComputeEndpoint(
pubKey: string,
serviceIndex: number,
_notUsed: string,
container: string
) {
return `${this.url}${apiPath}/compute`
public getComputeEndpoint() {
return `${this.url}${apiPath}/exec`
}
public async initializeServiceAgreement(
@ -78,12 +73,7 @@ export class Brizo extends Instantiable {
destination: string,
index: number = -1
): Promise<string> {
const signature =
(await account.getToken()) ||
(await this.ocean.utils.signature.signText(
noZeroX(agreementId),
account.getId()
))
const signature = await this.createSignature(account, agreementId)
const filesPromises = files
.filter((_, i) => index === -1 || i === index)
.map(async ({ index: i }) => {
@ -105,6 +95,59 @@ export class Brizo extends Instantiable {
return destination
}
public async executeService(
agreementId: string,
serviceEndpoint: string,
account: Account,
algorithmDid: string,
algorithm: string,
algorithmMeta?: MetaData
): Promise<string> {
const signature = await this.createSignature(account, agreementId)
let executeUrl = serviceEndpoint
executeUrl += `&signature=${signature}`
executeUrl += `&serviceAgreementId=${noZeroX(agreementId)}`
executeUrl += `&consumerAddress=${account.getId()}`
executeUrl += `&algorithmDID=${algorithmDid}`
executeUrl += `&algorithm=${algorithm}`
executeUrl += `&algorithmMeta=${algorithmMeta}`
const result: { workflowId: string } = await this.ocean.utils.fetch
.post(executeUrl, '')
.then((response: any) => {
if (response.ok) {
return response.json()
}
this.logger.error(
'Executing compute job failed:',
response.status,
response.statusText
)
return null
})
.catch(error => {
this.logger.error('Error executing compute job')
this.logger.error(error)
throw error
})
return result.workflowId
}
public async createSignature(account: Account, agreementId: string): Promise<string> {
const signature =
(await account.getToken()) ||
(await this.ocean.utils.signature.signText(
noZeroX(agreementId),
account.getId()
))
return signature
}
public async encrypt(
did: string,
signature: string,

View File

@ -1,25 +0,0 @@
export interface Provider {
type: string
description: string
environment: {
cluster: {
type: string
url: string
}
supportedContainers: {
image: string
tag: string
checksum: string
}[]
supportedServers: {
serverId: string
serverType: string
price: string
cpu: string
gpu: string
memory: string
disk: string
maxExecutionTime: number
}[]
}
}

View File

@ -1,14 +1,7 @@
import { MetaData } from './MetaData'
import { ServiceAgreementTemplate } from './ServiceAgreementTemplate'
import { Provider } from './ComputingProvider'
export type ServiceType =
| 'authorization'
| 'metadata'
| 'access'
| 'compute'
| 'computing'
| 'fitchainCompute'
export type ServiceType = 'authorization' | 'metadata' | 'access' | 'compute'
export interface ServiceCommon {
type: ServiceType
@ -47,15 +40,45 @@ export interface ServiceAccess extends ServiceCommon {
}
}
export interface ServiceComputing extends ServiceCommon {
type: 'computing'
export interface ServiceCompute extends ServiceCommon {
type: 'compute'
templateId?: string
provider?: Provider
attributes: {
main: {
creator: string
datePublished: string
price: string
timeout: number
provider?: ServiceComputeProvider
serviceAgreementTemplate?: ServiceAgreementTemplate
}
}
}
export interface ServiceCompute extends ServiceCommon {
templateId?: string
export interface ServiceComputeProvider {
type: string
description: string
environment: {
cluster: {
type: string
url: string
}
supportedContainers: {
image: string
tag: string
checksum: string
}[]
supportedServers: {
serverId: string
serverType: string
price: string
cpu: string
gpu: string
memory: string
disk: string
maxExecutionTime: number
}[]
}
}
export type Service<
@ -64,8 +87,6 @@ export type Service<
? ServiceAuthorization
: T extends 'metadata'
? ServiceMetadata
: T extends 'computing'
? ServiceComputing
: T extends 'access'
? ServiceAccess
: T extends 'compute'

View File

@ -206,6 +206,7 @@ export class OceanAssets extends Instantiable {
useSecretStore?: boolean
): Promise<string>
/* eslint-disable no-dupe-class-members */
public async consume(
agreementId: string,
did: string,
@ -276,19 +277,20 @@ export class OceanAssets extends Instantiable {
}
return true
}
/* eslint-enable no-dupe-class-members */
/**
* Start the purchase/order of an asset's service. Starts by signing the service agreement
* then sends the request to the publisher via the service endpoint (Brizo http service).
* @param {string} did Decentralized ID.
* @param {number} index Service index.
* @param {Account} consumer Consumer account.
* @param {Account} consumerAccount Consumer account.
* @return {Promise<string>} Returns Agreement ID
*/
public order(
did: string,
index: number,
consumer: Account
consumerAccount: Account
): SubscribablePromise<OrderProgressStep, string> {
return new SubscribablePromise(async observer => {
const oceanAgreements = this.ocean.agreements
@ -321,7 +323,7 @@ export class OceanAssets extends Instantiable {
const paid = await oceanAgreements.conditions.lockReward(
agreementId,
attributes.main.price,
consumer
consumerAccount
)
observer.next(OrderProgressStep.LockedPayment)
@ -347,8 +349,8 @@ export class OceanAssets extends Instantiable {
agreementId,
index,
undefined,
consumer,
consumer
consumerAccount,
consumerAccount
)
this.logger.log('Agreement created')
@ -362,6 +364,38 @@ export class OceanAssets extends Instantiable {
})
}
/**
* Start the execution of a compute job.
* @param {string} agreementId ID of the agreement.
* @param {DDO} computeDdo DDO of the compute asset.
* @param {Account} consumerAccount Consumer account.
* @param {string} algorithmDid The asset DID (of type `algorithm`) which consist of `did:op:` and the `assetId` hex str (without `0x` prefix).
* @param {string} algorithm The text of the algorithm to run in the compute job (e.g. a jupyter notebook)
* @param {MetaData} algorithmMeta Metadata about the algorithm being run if `algorithm` is being used. This is ignored when `algorithmDID` is specified.
* @return {Promise<string>} Returns Workflow ID
*/
public async execute(
agreementId: string,
computeDdo: DDO,
consumerAccount: Account,
algorithmDid: string,
algorithm: string,
algorithmMeta?: MetaData
): Promise<string> {
const { serviceEndpoint } = computeDdo.findServiceByType('compute')
const workflowId = await this.ocean.brizo.executeService(
agreementId,
serviceEndpoint,
consumerAccount,
algorithmDid,
algorithm,
algorithmMeta
)
return workflowId
}
/**
* Returns the owner of a asset.
* @param {string} did Decentralized ID.