mirror of
https://github.com/oceanprotocol-archive/squid-js.git
synced 2024-02-02 15:31:51 +01:00
fixed consume flow
This commit is contained in:
parent
181940a62c
commit
6d5be0db75
@ -1,5 +1,9 @@
|
||||
import Config from "../models/Config"
|
||||
import { File } from "../ddo/MetaData"
|
||||
import Account from "../ocean/Account"
|
||||
import WebServiceConnectorProvider from "../utils/WebServiceConnectorProvider"
|
||||
import Logger from '../utils/Logger'
|
||||
const { save } = require('save-file')
|
||||
|
||||
const apiPath = "/api/v1/brizo/services"
|
||||
|
||||
@ -32,7 +36,8 @@ export default class Brizo {
|
||||
serviceAgreementId: string,
|
||||
serviceDefinitionId: string,
|
||||
signature: string,
|
||||
consumerAddress: string): Promise<any> {
|
||||
consumerAddress: string,
|
||||
): Promise<any> {
|
||||
|
||||
const args = {
|
||||
did,
|
||||
@ -53,4 +58,44 @@ export default class Brizo {
|
||||
throw new Error("HTTP request failed")
|
||||
}
|
||||
}
|
||||
|
||||
public async consumeService(
|
||||
agreementId: string,
|
||||
serviceEndpoint: string,
|
||||
account: Account,
|
||||
files: File[],
|
||||
destination: string,
|
||||
): Promise<string> {
|
||||
const filesPromises = files
|
||||
.map(async ({url}, i) => {
|
||||
let consumeUrl = serviceEndpoint
|
||||
consumeUrl += `?url=${url}`
|
||||
consumeUrl += `&serviceAgreementId=${agreementId}`
|
||||
consumeUrl += `&consumerAddress=${account.getId()}`
|
||||
|
||||
try {
|
||||
await this.downloadFile(
|
||||
consumeUrl,
|
||||
url.split('/').pop() || `file-${i}`,
|
||||
destination,
|
||||
)
|
||||
} catch (e) {
|
||||
Logger.error("Error consuming assets")
|
||||
Logger.error(e)
|
||||
throw new Error("Error consuming assets")
|
||||
}
|
||||
})
|
||||
await Promise.all(filesPromises)
|
||||
return destination
|
||||
}
|
||||
|
||||
private async downloadFile(url: string, filename: string, destination?: string): Promise<string> {
|
||||
const path = `${destination}${filename}`
|
||||
const response = await WebServiceConnectorProvider
|
||||
.getConnector()
|
||||
.get(url)
|
||||
|
||||
await save(await response.buffer(), path)
|
||||
return path
|
||||
}
|
||||
}
|
||||
|
@ -53,6 +53,11 @@ export class DDO {
|
||||
this.service = (ddo && ddo.service) || []
|
||||
}
|
||||
|
||||
|
||||
public shortId(): string {
|
||||
return this.id.replace("did:op:", "")
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds a service of a DDO by ID.
|
||||
* @param {string} serviceDefinitionId Service ID.
|
||||
|
@ -173,7 +173,7 @@ export default class Ocean {
|
||||
|
||||
/**
|
||||
* Creates a new service agreement.
|
||||
* @private
|
||||
* @deprecated Replace by [Ocean.assets.consume]{@link #OceanAssets.consume}
|
||||
* @param {string} did Decentralized ID.
|
||||
* @param {string} serviceDefinitionId Service definition ID.
|
||||
* @param {string} serviceAgreementId Service agreement ID.
|
||||
@ -181,6 +181,7 @@ export default class Ocean {
|
||||
* @param {Function} cb Callback executen when the access is granted.
|
||||
* @param {Account} consumer Consumer account.
|
||||
*/
|
||||
@deprecated("OceanAssets.consume")
|
||||
public async initializeServiceAgreement(
|
||||
did: string,
|
||||
serviceDefinitionId: string,
|
||||
@ -189,49 +190,7 @@ export default class Ocean {
|
||||
cb: (files: string[]) => void,
|
||||
consumer: Account,
|
||||
) {
|
||||
const d: DID = DID.parse(did)
|
||||
const ddo = await AquariusProvider.getAquarius().retrieveDDO(d)
|
||||
|
||||
const accessService = ddo.findServiceByType("Access")
|
||||
const metadataService = ddo.findServiceByType("Metadata")
|
||||
|
||||
const accessEvent: ContractEvent = EventListener.subscribe(
|
||||
accessService.conditions[1].contractName,
|
||||
accessService.conditions[1].events[1].name,
|
||||
{},
|
||||
)
|
||||
const filesPromise = new Promise((resolve) => {
|
||||
accessEvent.listenOnce(async () => {
|
||||
Logger.log("Awesome; got a AccessGranted Event. Let's download the asset files.")
|
||||
const contentUrls = await SecretStoreProvider
|
||||
.getSecretStore()
|
||||
.decryptDocument(d.getId(), (metadataService as any).metadata.base.contentUrls[0])
|
||||
const serviceUrl: string = accessService.serviceEndpoint
|
||||
Logger.log("Consuming asset files using service url: ", serviceUrl)
|
||||
const files = []
|
||||
|
||||
for (const cUrl of contentUrls) {
|
||||
let url: string = serviceUrl + `?url=${cUrl}`
|
||||
url = url + `&serviceAgreementId=${serviceAgreementId}`
|
||||
url = url + `&consumerAddress=${consumer.getId()}`
|
||||
files.push(url)
|
||||
}
|
||||
|
||||
cb(files)
|
||||
resolve(files)
|
||||
})
|
||||
})
|
||||
|
||||
await BrizoProvider
|
||||
.getBrizo()
|
||||
.initializeServiceAgreement(
|
||||
did,
|
||||
serviceAgreementId,
|
||||
serviceDefinitionId,
|
||||
serviceAgreementSignature,
|
||||
consumer.getId())
|
||||
|
||||
await filesPromise
|
||||
return await this.assets.consume(serviceDefinitionId, did, serviceDefinitionId, consumer)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -70,7 +70,7 @@ export default class OceanAgreements {
|
||||
consumer: Account,
|
||||
): Promise<void> {
|
||||
|
||||
await BrizoProvider
|
||||
const result = await BrizoProvider
|
||||
.getBrizo()
|
||||
.initializeServiceAgreement(
|
||||
did,
|
||||
@ -79,6 +79,10 @@ export default class OceanAgreements {
|
||||
signature,
|
||||
consumer.getId(),
|
||||
)
|
||||
|
||||
if (!result.ok) {
|
||||
throw new Error("Error on initialize agreement: " + await result.text())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,10 +1,11 @@
|
||||
import AquariusProvider from "../aquarius/AquariusProvider"
|
||||
import { SearchQuery } from "../aquarius/query/SearchQuery"
|
||||
import BrizoProvider from "../brizo/BrizoProvider"
|
||||
import ConfigProvider from "../ConfigProvider"
|
||||
import { Condition } from "../ddo/Condition"
|
||||
import { DDO } from "../ddo/DDO"
|
||||
import { MetaData } from "../ddo/MetaData"
|
||||
import { Service } from "../ddo/Service"
|
||||
import { Service, ServiceAuthorization } from "../ddo/Service"
|
||||
import Keeper from "../keeper/Keeper"
|
||||
import SecretStoreProvider from "../secretstore/SecretStoreProvider"
|
||||
import Account from "./Account"
|
||||
@ -12,6 +13,9 @@ import DID from "./DID"
|
||||
import OceanAgreements from "./OceanAgreements"
|
||||
import ServiceAgreementTemplate from "./ServiceAgreements/ServiceAgreementTemplate"
|
||||
import Access from "./ServiceAgreements/Templates/Access"
|
||||
import EventListener from "../keeper/EventListener"
|
||||
import ServiceAgreement from "./ServiceAgreements/ServiceAgreement"
|
||||
import Logger from '../utils/Logger'
|
||||
|
||||
/**
|
||||
* Assets submodule of Ocean Protocol.
|
||||
@ -177,6 +181,58 @@ export default class OceanAssets {
|
||||
return storedDdo
|
||||
}
|
||||
|
||||
public async consume (agreementId: string, did: string, serviceDefinitionId: string, consumerAccount: Account, resultPath: string): Promise<string>
|
||||
public async consume (agreementId: string, did: string, serviceDefinitionId: string, consumerAccount: Account): Promise<true>
|
||||
public async consume (
|
||||
agreementId: string,
|
||||
did: string,
|
||||
serviceDefinitionId: string,
|
||||
consumerAccount: Account,
|
||||
resultPath?: string,
|
||||
): Promise<string | true> {
|
||||
|
||||
const brizo = BrizoProvider.getBrizo()
|
||||
const ddo = await this.resolve(did)
|
||||
const {metadata} = ddo.findServiceByType('Metadata')
|
||||
|
||||
const authorizationService = ddo.findServiceByType('Authorization')
|
||||
const accessService = ddo.findServiceById(serviceDefinitionId)
|
||||
|
||||
const files = metadata.base.encryptedFiles
|
||||
|
||||
const {serviceEndpoint} = accessService
|
||||
|
||||
if (!serviceEndpoint) {
|
||||
throw new Error('Consume asset failed, service definition is missing the `serviceEndpoint`.')
|
||||
}
|
||||
|
||||
const secretStoreUrl = authorizationService.service === "SecretStore" && authorizationService.serviceEndpoint
|
||||
const secretStoreConfig = {
|
||||
secretStoreUri: secretStoreUrl,
|
||||
}
|
||||
|
||||
Logger.log("Decrypting files")
|
||||
const decryptedFiles = await SecretStoreProvider.getSecretStore(secretStoreConfig).decryptDocument(DID.parse(did).getId(), files)
|
||||
Logger.log("Files decrypted")
|
||||
|
||||
Logger.log("Consuming files")
|
||||
|
||||
resultPath = resultPath ? `${resultPath}/datafile.${ddo.shortId()}.${agreementId}/` : undefined
|
||||
await brizo.consumeService(
|
||||
agreementId,
|
||||
serviceEndpoint,
|
||||
consumerAccount,
|
||||
decryptedFiles,
|
||||
resultPath,
|
||||
)
|
||||
Logger.log("Files consumed")
|
||||
|
||||
if (resultPath) {
|
||||
return resultPath
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the purchase/order of an asset's service. Starts by signing the service agreement
|
||||
* then sends the request to the publisher via the service endpoint (Brizo http service).
|
||||
@ -193,9 +249,54 @@ export default class OceanAssets {
|
||||
|
||||
const oceanAreements = await OceanAgreements.getInstance()
|
||||
|
||||
Logger.log("Asking for agreement signature")
|
||||
const {agreementId, signature} = await oceanAreements.prepare(did, serviceDefinitionId, consumer)
|
||||
Logger.log(`Agreement ${agreementId} signed`)
|
||||
|
||||
const ddo = await this.resolve(did)
|
||||
|
||||
const paymentFlow = new Promise((resolve, reject) => {
|
||||
EventListener
|
||||
.subscribe(
|
||||
"ServiceExecutionAgreement",
|
||||
"AgreementInitialized",
|
||||
{agreementId: "0x" + agreementId},
|
||||
)
|
||||
.listenOnce(async (...args) => {
|
||||
Logger.log("Agreement initialized")
|
||||
const serviceAgreement = new ServiceAgreement("0x" + agreementId)
|
||||
const {metadata} = ddo.findServiceByType("Metadata")
|
||||
|
||||
Logger.log("Locking payment")
|
||||
const paid = await serviceAgreement.payAsset(ddo.shortId(), metadata.base.price, consumer)
|
||||
|
||||
if (paid) {
|
||||
Logger.log("Payment was OK")
|
||||
} else {
|
||||
Logger.error("Payment was KO")
|
||||
Logger.error("Agreement ID: ", agreementId)
|
||||
Logger.error("DID: ", ddo.id)
|
||||
reject("Error on payment")
|
||||
}
|
||||
})
|
||||
|
||||
EventListener
|
||||
.subscribe(
|
||||
"AccessConditions",
|
||||
"AccessGranted",
|
||||
{agreementId: "0x" + agreementId},
|
||||
)
|
||||
.listenOnce(async (...args) => {
|
||||
Logger.log("Access granted")
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
|
||||
Logger.log("Sending agreement request")
|
||||
await oceanAreements.send(did, agreementId, serviceDefinitionId, signature, consumer)
|
||||
|
||||
await paymentFlow
|
||||
|
||||
return agreementId
|
||||
}
|
||||
|
||||
|
@ -15,8 +15,7 @@ import TemplateBase from "./Templates/TemplateBase"
|
||||
|
||||
export default class ServiceAgreementTemplate extends OceanBase {
|
||||
|
||||
private static generateConditionsKey(serviceAgreementTemplateId: string,
|
||||
methodReflection: MethodReflection): string {
|
||||
private static generateConditionsKey(serviceAgreementTemplateId: string, methodReflection: MethodReflection): string {
|
||||
const values = [
|
||||
{type: "bytes32", value: serviceAgreementTemplateId} as ValuePair,
|
||||
{type: "address", value: methodReflection.address} as ValuePair,
|
||||
@ -29,8 +28,7 @@ export default class ServiceAgreementTemplate extends OceanBase {
|
||||
super(template.id)
|
||||
}
|
||||
|
||||
public async register(templateOwnerAddress: string)
|
||||
: Promise<boolean> {
|
||||
public async register(templateOwnerAddress: string): Promise<boolean> {
|
||||
|
||||
const dependencyMatrix: number[] =
|
||||
await Promise.all(this.template.Methods.map(async (method: Method) => {
|
||||
|
Loading…
x
Reference in New Issue
Block a user