7.9 KiB
title | description |
---|---|
Writing Algorithms for Compute to Data | Learn how to write algorithms for use in Ocean Protocol's Compute-to-Data feature. |
Overview
An algorithm in the Ocean Protocol stack is another asset type, in addition to data sets. An algorithm for Compute to Data is composed of the following:
- an algorithm code
- a Docker image (base image + tag)
- an entry point
Environment
When creating an algorithm asset in Ocean Protocol, the additional algorithm
object needs to be included in its metadata service to define the Docker container environment:
{
"algorithm": {
"container": {
"entrypoint": "node $ALGO",
"image": "node",
"tag": "latest"
}
}
}
Variable | Usage |
---|---|
image |
The Docker image name the algorithm will run with. |
tag |
The Docker image tag that you are going to use. |
entrypoint |
The Docker entrypoint. $ALGO is a macro that gets replaced inside the compute job, depending where your algorithm code is downloaded. |
When publishing an algorithm through the Ocean Market, these properties can be set via the publish UI.
Environment Examples
Run an algorithm written in JavaScript/Node.js, based on Node.js v14:
{
"algorithm": {
"container": {
"entrypoint": "node $ALGO",
"image": "node",
"tag": "14"
}
}
}
Run an algorithm written in Python, based on Python v3.9:
{
"algorithm": {
"container": {
"entrypoint": "python3.9 $ALGO",
"image": "python",
"tag": "3.9.4-alpine3.13"
}
}
}
Be aware that you might need a lot of dependencies, so it's a lot faster if you are going to build your own image and publish your algorithm with that custom image. We also collect some example images.
Data Storage
As part of a compute job, every algorithm runs in a K8s pod with these volumes mounted:
Path | Permissions | Usage |
---|---|---|
/data/inputs |
read | Storage for input data sets, accessible only to the algorithm running in the pod. |
/data/ddos |
read | Storage for all DDOs involved in compute job (input data set + algorithm). |
/data/outputs |
read/write | Storage for all of the algorithm's output files. They are uploaded on some form of cloud storage, and URLs are sent back to the consumer. |
/data/logs/ |
read/write | All algorithm output (such as print , console.log , etc.) is stored in a file located in this folder. They are stored and sent to the consumer as well. |
Environment variables available to algorithms
For every algorithm pod, the Compute to Data environment provides the following environment variables:
Variable | Usage |
---|---|
DIDS |
An array of DID strings containing the input datasets. |
TRANSFORMATION_DID |
The DID of the algorithm. |
Example: JavaScript/Node.js
The following is a simple JavaScript/Node.js algorithm, doing a line count for ALL input datasets. The algorithm is not using any environment variables, but instead it's scanning the /data/inputs
folder.
const fs = require('fs')
const inputFolder = '/data/inputs'
const outputFolder = '/data/outputs'
async function countrows(file) {
console.log('Start counting for ' + file)
const fileBuffer = fs.readFileSync(file)
const toString = fileBuffer.toString()
const splitLines = toString.split('\n')
const rows = splitLines.length - 1
fs.appendFileSync(outputFolder + '/output.log', file + ',' + rows + '\r\n')
console.log('Finished. We have ' + rows + ' lines')
}
async function processfolder(folder) {
const files = fs.readdirSync(folder)
for (const i = 0; i < files.length; i++) {
const file = files[i]
const fullpath = folder + '/' + file
if (fs.statSync(fullpath).isDirectory()) {
await processfolder(fullpath)
} else {
await countrows(fullpath)
}
}
}
processfolder(inputFolder)
This snippet will create and expose the following files as compute job results to the consumer:
/data/outputs/output.log
/data/logs/algo.log
To run this, use the following container object:
{
"algorithm": {
"container": {
"entrypoint": "node $ALGO",
"image": "node",
"tag": "12"
}
}
}
Example: Python
A more advanced line counting in Python, which relies on environment variables and constructs a job object, containing all the input files & DDOs
import pandas as pd
import numpy as np
import os
import time
import json
def get_job_details():
"""Reads in metadata information about assets used by the algo"""
job = dict()
job['dids'] = json.loads(os.getenv('DIDS', None))
job['metadata'] = dict()
job['files'] = dict()
job['algo'] = dict()
job['secret'] = os.getenv('secret', None)
algo_did = os.getenv('TRANSFORMATION_DID', None)
if job['dids'] is not None:
for did in job['dids']:
# get the ddo from disk
filename = '/data/ddos/' + did
print(f'Reading json from {filename}')
with open(filename) as json_file:
ddo = json.load(json_file)
# search for metadata service
for service in ddo['service']:
if service['type'] == 'metadata':
job['files'][did] = list()
index = 0
for file in service['attributes']['main']['files']:
job['files'][did].append(
'/data/inputs/' + did + '/' + str(index))
index = index + 1
if algo_did is not None:
job['algo']['did'] = algo_did
job['algo']['ddo_path'] = '/data/ddos/' + algo_did
return job
def line_counter(job_details):
"""Executes the line counter based on inputs"""
print('Starting compute job with the following input information:')
print(json.dumps(job_details, sort_keys=True, indent=4))
""" Now, count the lines of the first file in first did """
first_did = job_details['dids'][0]
filename = job_details['files'][first_did][0]
non_blank_count = 0
with open(filename) as infp:
for line in infp:
if line.strip():
non_blank_count += 1
print ('number of non-blank lines found %d' % non_blank_count)
""" Print that number to output to generate algo output"""
f = open("/data/outputs/result", "w")
f.write(str(non_blank_count))
f.close()
if __name__ == '__main__':
line_counter(get_job_details())
To run this algorithm, use the following container
object:
{
"algorithm": {
"container": {
"entrypoint": "python3.6 $ALGO",
"image": "oceanprotocol/algo_dockers",
"tag": "python-sql"
}
}
}