From 6d5be0db75789b6fe787098d43a48b6a282ad68c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Guti=C3=A9rrez?= Date: Thu, 21 Feb 2019 17:58:54 +0100 Subject: [PATCH] fixed consume flow --- src/brizo/Brizo.ts | 47 +++++++- src/ddo/DDO.ts | 5 + src/ocean/Ocean.ts | 47 +------- src/ocean/OceanAgreements.ts | 6 +- src/ocean/OceanAssets.ts | 103 +++++++++++++++++- .../ServiceAgreementTemplate.ts | 6 +- 6 files changed, 163 insertions(+), 51 deletions(-) diff --git a/src/brizo/Brizo.ts b/src/brizo/Brizo.ts index b92ac60..23add94 100644 --- a/src/brizo/Brizo.ts +++ b/src/brizo/Brizo.ts @@ -1,5 +1,9 @@ import Config from "../models/Config" +import { File } from "../ddo/MetaData" +import Account from "../ocean/Account" import WebServiceConnectorProvider from "../utils/WebServiceConnectorProvider" +import Logger from '../utils/Logger' +const { save } = require('save-file') const apiPath = "/api/v1/brizo/services" @@ -32,7 +36,8 @@ export default class Brizo { serviceAgreementId: string, serviceDefinitionId: string, signature: string, - consumerAddress: string): Promise { + consumerAddress: string, + ): Promise { const args = { did, @@ -53,4 +58,44 @@ export default class Brizo { throw new Error("HTTP request failed") } } + + public async consumeService( + agreementId: string, + serviceEndpoint: string, + account: Account, + files: File[], + destination: string, + ): Promise { + const filesPromises = files + .map(async ({url}, i) => { + let consumeUrl = serviceEndpoint + consumeUrl += `?url=${url}` + consumeUrl += `&serviceAgreementId=${agreementId}` + consumeUrl += `&consumerAddress=${account.getId()}` + + try { + await this.downloadFile( + consumeUrl, + url.split('/').pop() || `file-${i}`, + destination, + ) + } catch (e) { + Logger.error("Error consuming assets") + Logger.error(e) + throw new Error("Error consuming assets") + } + }) + await Promise.all(filesPromises) + return destination + } + + private async downloadFile(url: string, filename: string, destination?: string): Promise { + const path = `${destination}${filename}` + const response = await WebServiceConnectorProvider + .getConnector() + .get(url) + + await save(await response.buffer(), path) + return path + } } diff --git a/src/ddo/DDO.ts b/src/ddo/DDO.ts index d7e470e..66a8f58 100644 --- a/src/ddo/DDO.ts +++ b/src/ddo/DDO.ts @@ -53,6 +53,11 @@ export class DDO { this.service = (ddo && ddo.service) || [] } + + public shortId(): string { + return this.id.replace("did:op:", "") + } + /** * Finds a service of a DDO by ID. * @param {string} serviceDefinitionId Service ID. diff --git a/src/ocean/Ocean.ts b/src/ocean/Ocean.ts index 72227da..14f0d91 100644 --- a/src/ocean/Ocean.ts +++ b/src/ocean/Ocean.ts @@ -173,7 +173,7 @@ export default class Ocean { /** * Creates a new service agreement. - * @private + * @deprecated Replace by [Ocean.assets.consume]{@link #OceanAssets.consume} * @param {string} did Decentralized ID. * @param {string} serviceDefinitionId Service definition ID. * @param {string} serviceAgreementId Service agreement ID. @@ -181,6 +181,7 @@ export default class Ocean { * @param {Function} cb Callback executen when the access is granted. * @param {Account} consumer Consumer account. */ + @deprecated("OceanAssets.consume") public async initializeServiceAgreement( did: string, serviceDefinitionId: string, @@ -189,49 +190,7 @@ export default class Ocean { cb: (files: string[]) => void, consumer: Account, ) { - const d: DID = DID.parse(did) - const ddo = await AquariusProvider.getAquarius().retrieveDDO(d) - - const accessService = ddo.findServiceByType("Access") - const metadataService = ddo.findServiceByType("Metadata") - - const accessEvent: ContractEvent = EventListener.subscribe( - accessService.conditions[1].contractName, - accessService.conditions[1].events[1].name, - {}, - ) - const filesPromise = new Promise((resolve) => { - accessEvent.listenOnce(async () => { - Logger.log("Awesome; got a AccessGranted Event. Let's download the asset files.") - const contentUrls = await SecretStoreProvider - .getSecretStore() - .decryptDocument(d.getId(), (metadataService as any).metadata.base.contentUrls[0]) - const serviceUrl: string = accessService.serviceEndpoint - Logger.log("Consuming asset files using service url: ", serviceUrl) - const files = [] - - for (const cUrl of contentUrls) { - let url: string = serviceUrl + `?url=${cUrl}` - url = url + `&serviceAgreementId=${serviceAgreementId}` - url = url + `&consumerAddress=${consumer.getId()}` - files.push(url) - } - - cb(files) - resolve(files) - }) - }) - - await BrizoProvider - .getBrizo() - .initializeServiceAgreement( - did, - serviceAgreementId, - serviceDefinitionId, - serviceAgreementSignature, - consumer.getId()) - - await filesPromise + return await this.assets.consume(serviceDefinitionId, did, serviceDefinitionId, consumer) } /** diff --git a/src/ocean/OceanAgreements.ts b/src/ocean/OceanAgreements.ts index cad58e7..45b257b 100644 --- a/src/ocean/OceanAgreements.ts +++ b/src/ocean/OceanAgreements.ts @@ -70,7 +70,7 @@ export default class OceanAgreements { consumer: Account, ): Promise { - await BrizoProvider + const result = await BrizoProvider .getBrizo() .initializeServiceAgreement( did, @@ -79,6 +79,10 @@ export default class OceanAgreements { signature, consumer.getId(), ) + + if (!result.ok) { + throw new Error("Error on initialize agreement: " + await result.text()) + } } /** diff --git a/src/ocean/OceanAssets.ts b/src/ocean/OceanAssets.ts index 2d0c654..ae71b85 100644 --- a/src/ocean/OceanAssets.ts +++ b/src/ocean/OceanAssets.ts @@ -1,10 +1,11 @@ import AquariusProvider from "../aquarius/AquariusProvider" import { SearchQuery } from "../aquarius/query/SearchQuery" import BrizoProvider from "../brizo/BrizoProvider" +import ConfigProvider from "../ConfigProvider" import { Condition } from "../ddo/Condition" import { DDO } from "../ddo/DDO" import { MetaData } from "../ddo/MetaData" -import { Service } from "../ddo/Service" +import { Service, ServiceAuthorization } from "../ddo/Service" import Keeper from "../keeper/Keeper" import SecretStoreProvider from "../secretstore/SecretStoreProvider" import Account from "./Account" @@ -12,6 +13,9 @@ import DID from "./DID" import OceanAgreements from "./OceanAgreements" import ServiceAgreementTemplate from "./ServiceAgreements/ServiceAgreementTemplate" import Access from "./ServiceAgreements/Templates/Access" +import EventListener from "../keeper/EventListener" +import ServiceAgreement from "./ServiceAgreements/ServiceAgreement" +import Logger from '../utils/Logger' /** * Assets submodule of Ocean Protocol. @@ -177,6 +181,58 @@ export default class OceanAssets { return storedDdo } + public async consume (agreementId: string, did: string, serviceDefinitionId: string, consumerAccount: Account, resultPath: string): Promise + public async consume (agreementId: string, did: string, serviceDefinitionId: string, consumerAccount: Account): Promise + public async consume ( + agreementId: string, + did: string, + serviceDefinitionId: string, + consumerAccount: Account, + resultPath?: string, + ): Promise { + + const brizo = BrizoProvider.getBrizo() + const ddo = await this.resolve(did) + const {metadata} = ddo.findServiceByType('Metadata') + + const authorizationService = ddo.findServiceByType('Authorization') + const accessService = ddo.findServiceById(serviceDefinitionId) + + const files = metadata.base.encryptedFiles + + const {serviceEndpoint} = accessService + + if (!serviceEndpoint) { + throw new Error('Consume asset failed, service definition is missing the `serviceEndpoint`.') + } + + const secretStoreUrl = authorizationService.service === "SecretStore" && authorizationService.serviceEndpoint + const secretStoreConfig = { + secretStoreUri: secretStoreUrl, + } + + Logger.log("Decrypting files") + const decryptedFiles = await SecretStoreProvider.getSecretStore(secretStoreConfig).decryptDocument(DID.parse(did).getId(), files) + Logger.log("Files decrypted") + + Logger.log("Consuming files") + + resultPath = resultPath ? `${resultPath}/datafile.${ddo.shortId()}.${agreementId}/` : undefined + await brizo.consumeService( + agreementId, + serviceEndpoint, + consumerAccount, + decryptedFiles, + resultPath, + ) + Logger.log("Files consumed") + + if (resultPath) { + return resultPath + } + return true + } + /** * Start the purchase/order of an asset's service. Starts by signing the service agreement * then sends the request to the publisher via the service endpoint (Brizo http service). @@ -193,9 +249,54 @@ export default class OceanAssets { const oceanAreements = await OceanAgreements.getInstance() + Logger.log("Asking for agreement signature") const {agreementId, signature} = await oceanAreements.prepare(did, serviceDefinitionId, consumer) + Logger.log(`Agreement ${agreementId} signed`) + + const ddo = await this.resolve(did) + + const paymentFlow = new Promise((resolve, reject) => { + EventListener + .subscribe( + "ServiceExecutionAgreement", + "AgreementInitialized", + {agreementId: "0x" + agreementId}, + ) + .listenOnce(async (...args) => { + Logger.log("Agreement initialized") + const serviceAgreement = new ServiceAgreement("0x" + agreementId) + const {metadata} = ddo.findServiceByType("Metadata") + + Logger.log("Locking payment") + const paid = await serviceAgreement.payAsset(ddo.shortId(), metadata.base.price, consumer) + + if (paid) { + Logger.log("Payment was OK") + } else { + Logger.error("Payment was KO") + Logger.error("Agreement ID: ", agreementId) + Logger.error("DID: ", ddo.id) + reject("Error on payment") + } + }) + + EventListener + .subscribe( + "AccessConditions", + "AccessGranted", + {agreementId: "0x" + agreementId}, + ) + .listenOnce(async (...args) => { + Logger.log("Access granted") + resolve() + }) + }) + + Logger.log("Sending agreement request") await oceanAreements.send(did, agreementId, serviceDefinitionId, signature, consumer) + await paymentFlow + return agreementId } diff --git a/src/ocean/ServiceAgreements/ServiceAgreementTemplate.ts b/src/ocean/ServiceAgreements/ServiceAgreementTemplate.ts index b8efa58..5f1be74 100644 --- a/src/ocean/ServiceAgreements/ServiceAgreementTemplate.ts +++ b/src/ocean/ServiceAgreements/ServiceAgreementTemplate.ts @@ -15,8 +15,7 @@ import TemplateBase from "./Templates/TemplateBase" export default class ServiceAgreementTemplate extends OceanBase { - private static generateConditionsKey(serviceAgreementTemplateId: string, - methodReflection: MethodReflection): string { + private static generateConditionsKey(serviceAgreementTemplateId: string, methodReflection: MethodReflection): string { const values = [ {type: "bytes32", value: serviceAgreementTemplateId} as ValuePair, {type: "address", value: methodReflection.address} as ValuePair, @@ -29,8 +28,7 @@ export default class ServiceAgreementTemplate extends OceanBase { super(template.id) } - public async register(templateOwnerAddress: string) - : Promise { + public async register(templateOwnerAddress: string): Promise { const dependencyMatrix: number[] = await Promise.all(this.template.Methods.map(async (method: Method) => {