1
0
mirror of https://github.com/oceanprotocol/docs.git synced 2024-11-01 15:55:34 +01:00
docs/developers/ocean.py/technical-details.md
2023-06-09 13:10:40 +00:00

46 KiB

description
Technical details about most used ocean.py functions

Ocean Instance Tech Details

At the beginning of most flows, we create an ocean object, which is an instance of class Ocean. It exposes useful information, including the following.

Constructor

  • __init__(self, config_dict: Dict, data_provider: Optional[Type] = None)

The Ocean class is the entry point into Ocean Procol.

In order to initialize a Ocean object, you must provide config_dict which is a Dictionary instance and optionally a DataServiceProvider instance.

Parameters

  • config_dict: dict which is mandatory and it contains the configuration as dictionary format.
  • data_provider: Optional[DataProvider] which is optional with a default value of None. If it is not provided, the constructor will instantiate a new one from scratch.

Returns

None

Defined in

ocean/ocean.py

Source code

{% code overflow="wrap" %}

class Ocean:
    """The Ocean class is the entry point into Ocean Protocol."""

    @enforce_types
    def __init__(self, config_dict: Dict, data_provider: Optional[Type] = None) -> None:
        """Initialize Ocean class.

        Usage: Make a new Ocean instance

        `ocean = Ocean({...})`

        This class provides the main top-level functions in ocean protocol:
        1. Publish assets metadata and associated services
            - Each asset is assigned a unique DID and a DID Document (DDO)
            - The DDO contains the asset's services including the metadata
            - The DID is registered on-chain with a URL of the metadata store
              to retrieve the DDO from

            `ddo = ocean.assets.create(metadata, publisher_wallet)`

        2. Discover/Search ddos via the current configured metadata store (Aquarius)

            - Usage:
            `ddos_list = ocean.assets.search('search text')`

        An instance of Ocean is parameterized by a `Config` instance.

        :param config_dict: variable definitions
        :param data_provider: `DataServiceProvider` instance
        """
        config_errors = {}
        for key, value in config_defaults.items():
            if key not in config_dict:
                config_errors[key] = "required"
                continue

            if not isinstance(config_dict[key], type(value)):
                config_errors[key] = f"must be {type(value).__name__}"

        if config_errors:
            raise Exception(json.dumps(config_errors))

        self.config_dict = config_dict

        network_name = config_dict["NETWORK_NAME"]
        check_network(network_name)

        if not data_provider:
            data_provider = DataServiceProvider

        self.assets = OceanAssets(self.config_dict, data_provider)
        self.compute = OceanCompute(self.config_dict, data_provider)

        logger.debug("Ocean instance initialized: ")

{% endcode %}

Config Getter

  • config(self) -> dict

It is a helper method for retrieving the user's configuration for ocean.py.
It can be called only by Ocean object and returns a python dictionary.

Returns

dict

Configuration fields as dictionary.

Defined in

ocean/ocean.py

Source code
@property
    @enforce_types
    def config(self) -> dict:  # alias for config_dict
        return self.config_dict

OCEAN Token Address

  • OCEAN_address(self) -> str

It is a helper method for retrieving the OCEAN's token address.
It can be called only by Ocean object and returns the address as a string.

Returns

str

OCEAN token address for that network.

Defined in

ocean/ocean.py

Source code
 @property
    @enforce_types
    def OCEAN_address(self) -> str:
        return get_ocean_token_address(self.config)

get_ocean_token_address function is an utilitary function which gets the address from address.json file

{% code overflow="wrap" %}

@enforce_types
def get_ocean_token_address(config_dict: dict) -> str:
    """Returns the Ocean token address for given network or web3 instance
    Requires either network name or web3 instance.
    """
    addresses = get_contracts_addresses(config_dict)

    return Web3.toChecksumAddress(addresses.get("Ocean").lower()) if addresses else None

{% endcode %}

OCEAN Token Object

  • OCEAN_token(self) -> DatatokenBase
  • OCEAN(self) -> DatatokenBase as alias for the above option

It is a helper method for retrieving the OCEAN token object (Datatoken class).
It can be called within Ocean class and returns the OCEAN Datatoken.

Returns

DatatokenBase

OCEAN token as DatatokenBase object.

Defined in

ocean/ocean.py

Source code
    @property
    @enforce_types
    def OCEAN_token(self) -> DatatokenBase:
        return DatatokenBase.get_typed(self.config, self.OCEAN_address)

    @property
    @enforce_types
    def OCEAN(self):  # alias for OCEAN_token
        return self.OCEAN_token

Data NFT Factory

  • data_nft_factory(self) -> DataNFTFactoryContract

It is a property for getting Data NFT Factory object for the singleton smart contract.
It can be called within Ocean class and returns the DataNFTFactoryContract instance.

Returns

DataNFTFactoryContract

Data NFT Factory contract object which access all the functionalities available from smart contracts in Python.

Defined in

ocean/ocean.py

Source code

{% code overflow="wrap" %}

@property
    @enforce_types
    def data_nft_factory(self) -> DataNFTFactoryContract:
        return DataNFTFactoryContract(self.config, self._addr("ERC721Factory"))

{% endcode %}

Dispenser

  • dispenser(self) -> Dispenser

Dispenser is represented by a faucet for free data.
It is a property for getting Dispenser object for the singleton smart contract.
It can be called within Ocean class and returns the Dispenser instance.

Returns

Dispenser

Dispenser contract object which access all the functionalities available from smart contracts in Python.

Defined in

ocean/ocean.py

Source code
    @property
    @enforce_types
    def dispenser(self) -> Dispenser:
        return Dispenser(self.config, self._addr("Dispenser"))

Fixed Rate Exchange

  • fixed_rate_exchange(self) -> FixedRateExchange

Exchange is used for priced data.
It is a property for getting FixedRateExchange object for the singleton smart contract.
It can be called within Ocean class and returns the FixedRateExchange instance.

Returns

FixedRateExchange

Fixed Rate Exchange contract object which access all the functionalities available from smart contracts in Python.

Defined in

ocean/ocean.py

Source code
 @property
    @enforce_types
    def fixed_rate_exchange(self) -> FixedRateExchange:
        return FixedRateExchange(self.config, self._addr("FixedPrice"))

NFT Token Getter

  • get_nft_token(self, token_adress: str) -> DataNFT

It is a getter for a specific data NFT object based on its checksumed address.
It can be called within Ocean class which returns the DataNFT instance based on string token_address specified as parameter.

Parameters

  • token_address - string checksumed address of the NFT token that you are searching for.

Returns

DataNFT

Data NFT object which access all the functionalities available for ERC721 template in Python.

Defined in

ocean/ocean.py

Source code
    @enforce_types
    def get_nft_token(self, token_address: str) -> DataNFT:
        """
        :param token_address: Token contract address, str
        :return: `DataNFT` instance
        """
        return DataNFT(self.config, token_address)

Datatoken Getter

  • get_datatoken(self, token_address: str) -> DatatokenBase

It is a getter for a specific datatoken object based on its checksumed address.
It can be called within Ocean class with a string token_address as parameter which returns the DatatokenBase instance depending on datatoken's template index.

Parameters

  • token_address - string checksumed address of the datatoken that you are searching for.

Returns

DatatokenBase

Datatoken object which access all the functionalities available for ERC20 templates in Python.

Defined in

ocean/ocean.py

Source code
@enforce_types
    def get_datatoken(self, token_address: str) -> DatatokenBase:
        """
        :param token_address: Token contract address, str
        :return: `Datatoken1` or `Datatoken2` instance
        """
        return DatatokenBase.get_typed(self.config, token_address)

User Orders Getter

  • get_user_orders(self, address: str, datatoken: str) -> List[AttributeDict]

Returns the list of orders that were made by a certain user on a specific datatoken.

It can be called within Ocean class.

Parameters

  • address - wallet address of that user
  • datatoken - datatoken address

Returns

List[AttributeDict]

List of all the orders on that datatoken done by the specified user.

Defined in

ocean/ocean.py

Source code

{% code overflow="wrap" %}

    @enforce_types
    def get_user_orders(self, address: str, datatoken: str) -> List[AttributeDict]:
        """
        :return: List of orders `[Order]`
        """
        dt = DatatokenBase.get_typed(self.config_dict, datatoken)
        _orders = []
        for log in dt.get_start_order_logs(address):
            a = dict(log.args.items())
            a["amount"] = int(log.args.amount)
            a["address"] = log.address
            a["transactionHash"] = log.transactionHash
            a = AttributeDict(a.items())

            _orders.append(a)

        return _orders

{% endcode %}

Provider Fees

  • retrieve_provider_fees( self, ddo: DDO, access_service: Service, publisher_wallet ) -> dict

Calls Provider to compute provider fees as dictionary for access service.

Parameters

  • ddo - the data asset which has the DDO object
  • access_service - Service instance for the service that needs the provider fees
  • publisher_wallet - Wallet instance of the user that wants to retrieve the provider fees

Returns

dict

A dictionary which contains the following keys (providerFeeAddress, providerFeeToken, providerFeeAmount, providerData, v, r, s, validUntil).

Defined in

ocean/ocean.py

Source code

{% code overflow="wrap" %}

 @enforce_types
    def retrieve_provider_fees(
        self, ddo: DDO, access_service: Service, publisher_wallet
    ) -> dict:

        initialize_response = DataServiceProvider.initialize(
            ddo.did, access_service, consumer_address=publisher_wallet.address
        )
        initialize_data = initialize_response.json()
        provider_fees = initialize_data["providerFee"]

        return provider_fees

{% endcode %}

Compute Provider Fees

  • retrieve_provider_fees_for_compute(self, datasets: List[ComputeInput], algorithm_data: Union[ComputeInput, AlgorithmMetadata], consumer_address: str, compute_environment: str, valid_until: int) -> dict

Calls Provider to generate provider fees as dictionary for compute service.

Parameters

  • datasets - list of ComputeInput which contains the data assets
  • algorithm_data - necessary data for algorithm and it can be either a ComputeInput object, either just the algorithm metadata, AlgorithmMetadata
  • consumer_address - address of the compute consumer wallet which is requesting the provider fees
  • compute_environment - id provided from the compute environment as string
  • valid_until - timestamp in UNIX miliseconds for the duration of provider fees for the compute service.

Returns

dict

A dictionary which contains the following keys (providerFeeAddress, providerFeeToken, providerFeeAmount, providerData, v, r, s, validUntil).

Defined in

ocean/ocean.py

Source code

{% code overflow="wrap" %}

@enforce_types
    def retrieve_provider_fees_for_compute(
        self,
        datasets: List[ComputeInput],
        algorithm_data: Union[ComputeInput, AlgorithmMetadata],
        consumer_address: str,
        compute_environment: str,
        valid_until: int,
    ) -> dict:

        initialize_compute_response = DataServiceProvider.initialize_compute(
            [x.as_dictionary() for x in datasets],
            algorithm_data.as_dictionary(),
            datasets[0].service.service_endpoint,
            consumer_address,
            compute_environment,
            valid_until,
        )

        return initialize_compute_response.json()

{% endcode %}

ocean.assets.create_url_asset( self, name: str, url: str, publisher_wallet, wait_for_aqua: bool = True ) -> tuple

It is the most used functions in all the READMEs.

Creates asset of type "dataset", having UrlFiles, with good defaults.

It can be called after instantiating Ocean object.

Params:

  1. name - name of the asset, string
  2. url - url that is stored in the asset, string
  3. publisher_wallet - wallet of the asset publisher/owner, Brownie account
  4. wait_for_aqua - boolean value which default is True, waiting for aquarius to fetch the asset takes additional time, but if you want to be sure that your asset is indexed, keep the default value.

Return:

A tuple which contains the data NFT, datatoken and the data asset.

{% code overflow="wrap" %}

 @enforce_types
    def create_url_asset(
        self, name: str, url: str, publisher_wallet, wait_for_aqua: bool = True
    ) -> tuple:
        """Create asset of type "data", having UrlFiles, with good defaults"""
        metadata = self._default_metadata(name, publisher_wallet)
        files = [UrlFile(url)]
        return self._create_1dt(metadata, files, publisher_wallet, wait_for_aqua)

{% endcode %}

Ocean Compute

ocean.compute.start( self, consumer_wallet, dataset: ComputeInput, compute_environment: str, algorithm: Optional[ComputeInput] = None, algorithm_meta: Optional[AlgorithmMetadata] = None, algorithm_algocustomdata: Optional[dict] = None, additional_datasets: List[ComputeInput] = []) -> str

Starts a compute job.

It can be called within Ocean Compute class.

Params:

  • consumer_wallet - the Brownie account of consumer who pays & starts for compute job.
  • dataset - ComputeInput object, each of them includes mandatory the DDO and service.
  • compute_environment - string that represents the ID from the chosen C2D environment.
  • additional_datasets - list of ComputeInput objects for additional datasets in case of starting a compute job for multiple datasets.

Optional params:

  • algorithm - ComputeInput object, each of them includes mandatory the DDO and service for algorithm.
  • algorithm_meta - either provide just the algorithm metadata as AlgorithmMetadata.
  • algorithm_algocustomedata - additional user data for the algorithm as dictionary.

Return:

Returns a string type job ID.

 @enforce_types
    def start(
        self,
        consumer_wallet,
        dataset: ComputeInput,
        compute_environment: str,
        algorithm: Optional[ComputeInput] = None,
        algorithm_meta: Optional[AlgorithmMetadata] = None,
        algorithm_algocustomdata: Optional[dict] = None,
        additional_datasets: List[ComputeInput] = [],
    ) -> str:
        metadata_cache_uri = self._config_dict.get("METADATA_CACHE_URI")
        ddo = Aquarius.get_instance(metadata_cache_uri).get_ddo(dataset.did)
        service = ddo.get_service_by_id(dataset.service_id)
        assert (
            ServiceTypes.CLOUD_COMPUTE == service.type
        ), "service at serviceId is not of type compute service."

        consumable_result = is_consumable(
            ddo,
            service,
            {"type": "address", "value": consumer_wallet.address},
            with_connectivity_check=True,
        )
        if consumable_result != ConsumableCodes.OK:
            raise AssetNotConsumable(consumable_result)

        # Start compute job
        job_info = self._data_provider.start_compute_job(
            dataset_compute_service=service,
            consumer=consumer_wallet,
            dataset=dataset,
            compute_environment=compute_environment,
            algorithm=algorithm,
            algorithm_meta=algorithm_meta,
            algorithm_custom_data=algorithm_algocustomdata,
            input_datasets=additional_datasets,
        )
        return job_info["jobId"]
ocean.compute.status(self, ddo: DDO, service: Service, job_id: str, wallet) -> Dict[str, Any]

Gets status of the compute job.

It can be called within Ocean Compute class.

Params:

  • ddo - DDO offering the compute service of this job
  • service - Service object of compute
  • job_id - ID of the compute job
  • wallet - Brownie account which initiated the compute job

Return:

A dictionary which contains the status for an existing compute job, keys are (ok, status, statusText).

{% code overflow="wrap" %}

@enforce_types
    def status(self, ddo: DDO, service: Service, job_id: str, wallet) -> Dict[str, Any]:
        """
        Gets job status.

        :param ddo: DDO offering the compute service of this job
        :param service: compute service of this job
        :param job_id: str id of the compute job
        :param wallet: Wallet instance
        :return: dict the status for an existing compute job, keys are (ok, status, statusText)
        """
        job_info = self._data_provider.compute_job_status(
            ddo.did, job_id, service, wallet
        )
        job_info.update({"ok": job_info.get("status") not in (31, 32, None)})

        return job_info

{% endcode %}

ocean.compute.result( self, ddo: DDO, service: Service, job_id: str, index: int, wallet ) -> Dict[str, Any]

Gets compute job result.

It can be called within Ocean Compute class.

Params:

  • ddo - DDO offering the compute service of this job
  • service - Service object of compute
  • job_id - ID of the compute job
  • index - compute result index
  • wallet - Brownie account which initiated the compute job

Return:

A dictionary wich contains the results/logs urls for an existing compute job, keys are (did, urls, logs).

{% code overflow="wrap" %}

@enforce_types
    def result(
        self, ddo: DDO, service: Service, job_id: str, index: int, wallet
    ) -> Dict[str, Any]:
        """
        Gets job result.

        :param ddo: DDO offering the compute service of this job
        :param service: compute service of this job
        :param job_id: str id of the compute job
        :param index: compute result index
        :param wallet: Wallet instance
        :return: dict the results/logs urls for an existing compute job, keys are (did, urls, logs)
        """
        result = self._data_provider.compute_job_result(job_id, index, service, wallet)

        return result

{% endcode %}

ocean.compute.compute_job_result_logs( self, ddo: DDO, service: Service, job_id: str, wallet, log_type="output", ) -> Dict[str, Any]

Gets job output if exists.

It can be called within Ocean Compute class.

Params:

  • ddo - DDO offering the compute service of this job
  • service - Service object of compute
  • job_id - ID of the compute job
  • wallet - Brownie account which initiated the compute job
  • log_type - string which selects what kind of logs to display. Default "output"

Return:

A dictionary which includes the results/logs urls for an existing compute job, keys are (did, urls, logs).

{% code overflow="wrap" %}

@enforce_types
    def compute_job_result_logs(
        self,
        ddo: DDO,
        service: Service,
        job_id: str,
        wallet,
        log_type="output",
    ) -> Dict[str, Any]:
        """
        Gets job output if exists.

        :param ddo: DDO offering the compute service of this job
        :param service: compute service of this job
        :param job_id: str id of the compute job
        :param wallet: Wallet instance
        :return: dict the results/logs urls for an existing compute job, keys are (did, urls, logs)
        """
        result = self._data_provider.compute_job_result_logs(
            ddo, job_id, service, wallet, log_type
        )

        return result

{% endcode %}

ocean.compute.stop(self, ddo: DDO, service: Service, job_id: str, wallet) -> Dict[str, Any]

Attempts to stop the running compute job.

It can be called within Ocean Compute class.

Params:

  • ddo - DDO offering the compute service of this job
  • service - Service object of compute
  • job_id - ID of the compute job
  • wallet - Brownie account which initiated the compute job

Return:

A dictionary which contains the status for the stopped compute job, keys are (ok, status, statusText).

{% code overflow="wrap" %}

@enforce_types
    def stop(self, ddo: DDO, service: Service, job_id: str, wallet) -> Dict[str, Any]:
        """
        Attempt to stop the running compute job.

        :param ddo: DDO offering the compute service of this job
        :param job_id: str id of the compute job
        :param wallet: Wallet instance
        :return: dict the status for the stopped compute job, keys are (ok, status, statusText)
        """
        job_info = self._data_provider.stop_compute_job(
            ddo.did, job_id, service, wallet
        )
        job_info.update({"ok": job_info.get("status") not in (31, 32, None)})
        return job_info

{% endcode %}

ocean.compute.get_c2d_environments(self, service_endpoint: str, chain_id: int)

Get list of compute environments.

It can be called within Ocean Compute class.

Params:

  • service_endpoint - string Provider URL that is stored in compute service.
  • chain_id - using Provider multichain, chain_id is required to specify the network for your environment. It has int type.

Return:

A list of objects containing information about each compute environment. For each compute environment, these are the following keys: (id, feeToken, priceMin, consumerAddress, lastSeen, namespace, status).

{% code overflow="wrap" %}

    @enforce_types
    def get_c2d_environments(self, service_endpoint: str, chain_id: int):
        return DataServiceProvider.get_c2d_environments(service_endpoint, chain_id)

{% endcode %}

ocean.compute.get_free_c2d_environment(self, service_endpoint: str, chain_id)

Get list of free compute environments.

Important thing is that not all Providers contain free environments (priceMin = 0).

It can be called within Ocean Compute class.

Params:

  • service_endpoint - string Provider URL that is stored in compute service.
  • chain_id - using Provider multichain, chain_id is required to specify the network for your environment. It has int type.

Return:

A list of objects containing information about each compute environment. For each compute environment, these are the following keys: (id, feeToken, priceMin, consumerAddress, lastSeen, namespace, status).

{% code overflow="wrap" %}

@enforce_types
    def get_free_c2d_environment(self, service_endpoint: str, chain_id):
        environments = self.get_c2d_environments(service_endpoint, chain_id)
        return next(env for env in environments if float(env["priceMin"]) == float(0))

{% endcode %}

Datatoken Interface

Dispenser utils:

datatoken.create_dispenser(self, tx_dict: dict, max_tokens: Optional[Union[int, str]] = None, max_balance: Optional[Union[int, str]] = None, with_mint: Optional[bool] = True)

Through datatoken, you can deploy a new dispenser schema which is used for creating free assets, because its behaviour is similar with a faucet.

It is implemented in DatatokenBase, inherited by Datatoken2, so it can be called within both instances.

Each parameter has the following meaning:

  1. tx_dict - is the configuration dictionary for that specific transaction. Usually for development we include just the from wallet, but for remote networks, you can provide gas fees, required confirmations for that block etc. For more info, check Brownie docs.
  2. max_tokens - maximum amount of tokens to dispense in wei. The default is a large number.
  3. max_balance - maximum balance of requester in wei. The default is a large number.
  4. with_mint - boolean, true if we want to allow the dispenser to be a minter as default value

Return value is a hex string which denotes the transaction hash of dispenser deployment.

@enforce_types
    def create_dispenser(
        self,
        tx_dict: dict,
        max_tokens: Optional[Union[int, str]] = None,
        max_balance: Optional[Union[int, str]] = None,
        with_mint: Optional[bool] = True,
    ):
        """
        For this datataken, create a dispenser faucet for free tokens.

        This wraps the smart contract method Datatoken.createDispenser()
          with a simpler interface.

        :param: max_tokens - max # tokens to dispense, in wei
        :param: max_balance - max balance of requester
        :tx_dict: e.g. {"from": alice_wallet}
        :return: tx
        """
        # already created, so nothing to do
        if self.dispenser_status().active:
            return

        # set max_tokens, max_balance if needed
        max_tokens = max_tokens or MAX_UINT256
        max_balance = max_balance or MAX_UINT256

        # args for contract tx
        dispenser_addr = get_address_of_type(self.config_dict, "Dispenser")
        with_mint = with_mint  # True -> can always mint more
        allowed_swapper = ZERO_ADDRESS  # 0 -> so anyone can call dispense

        # do contract tx
        tx = self.createDispenser(
            dispenser_addr,
            max_tokens,
            max_balance,
            with_mint,
            allowed_swapper,
            tx_dict,
        )
        return tx
datatoken.dispense(self, amount: Union[int, str], tx_dict: dict)

This function is used to retrieve funds or datatokens for an user who wants to start an order.

It is implemented in DatatokenBase, so it can be called within Datatoken class.

Each parameter has the following meaning:

  1. amount - amount of datatokens to be dispensed in wei (int or string format)
  2. tx_dict - is the configuration dictionary for that specific transaction. Usually for development we include just the from wallet, but for remote networks, you can provide gas fees, required confirmations for that block etc. For more info, check Brownie docs.

Return value is a hex string which denotes the transaction hash of dispensed datatokens, like a proof.

    @enforce_types
    def dispense(self, amount: Union[int, str], tx_dict: dict):
        """
        Dispense free tokens via the dispenser faucet.

        :param: amount - number of tokens to dispense, in wei
        :tx_dict: e.g. {"from": alice_wallet}
        :return: tx
        """
        # args for contract tx
        datatoken_addr = self.address
        from_addr = (
            tx_dict["from"].address
            if hasattr(tx_dict["from"], "address")
            else tx_dict["from"]
        )

        # do contract tx
        tx = self._ocean_dispenser().dispense(
            datatoken_addr, amount, from_addr, tx_dict
        )
        return tx
datatoken.dispense_and_order(self, consumer: str, service_index: int, provider_fees: dict, transaction_parameters: dict, consume_market_fees=None)

This function is used to retrieve funds or datatokens for an user who wants to start an order.

It is implemented in Datatoken2, so it can be called within Datatoken2 class (using the enterprise template).

Each parameter has the following meaning:

  1. consumer - address of the consumer wallet that needs funding
  2. service_index - service index as int for identifying the service that you want to further call start_order.
  3. transaction_parameters - is the configuration dictionary for that specific transaction. Usually for development we include just the from wallet, but for remote networks, you can provide gas fees, required confirmations for that block etc. For more info, check Brownie docs.
  4. consume_market_fees - TokenInfo object which contains the consume market fee amount, address & token address. If it is not explicitly specified, by default it has an empty TokenInfo object.

Return value is a hex string which denotes the transaction hash of dispensed datatokens, like a proof of starting order.

def dispense_and_order(
        self,
        consumer: str,
        service_index: int,
        provider_fees: dict,
        transaction_parameters: dict,
        consume_market_fees=None,
    ) -> str:
        if not consume_market_fees:
            consume_market_fees = TokenFeeInfo()

        buyer_addr = (
            transaction_parameters["from"].address
            if hasattr(transaction_parameters["from"], "address")
            else transaction_parameters["from"]
        )

        bal = from_wei(self.balanceOf(buyer_addr))
        if bal < 1.0:
            dispenser_addr = get_address_of_type(self.config_dict, "Dispenser")
            from ocean_lib.models.dispenser import Dispenser  # isort: skip

            dispenser = Dispenser(self.config_dict, dispenser_addr)

            # catch key failure modes
            st = dispenser.status(self.address)
            active, allowedSwapper = st[0], st[6]
            if not active:
                raise ValueError("No active dispenser for datatoken")
            if allowedSwapper not in [ZERO_ADDRESS, buyer_addr]:
                raise ValueError(f"Not allowed. allowedSwapper={allowedSwapper}")

            # Try to dispense. If other issues, they'll pop out
            dispenser.dispense(
                self.address, "1 ether", buyer_addr, transaction_parameters
            )

        return self.start_order(
            consumer=ContractBase.to_checksum_address(consumer),
            service_index=service_index,
            provider_fees=provider_fees,
            consume_market_fees=consume_market_fees,
            transaction_parameters=transaction_parameters,
        )
datatoken.dispenser_status(self) -> DispenserStatus

Returns a DispenserStatus object returned from Dispenser.sol::status(dt_addr) which is composed of:

  • bool active
  • address owner
  • bool isMinter
  • uint256 maxTokens
  • uint256 maxBalance
  • uint256 balance
  • address allowedSwapper

These are Solidity return values & types, but uint256 means int in Python and address is a string instance.

For tips & tricks, check this section from the README.

It is implemented in DatatokenBase, inherited by Datatoken2, so it can be called within both instances.

@enforce_types
    def dispenser_status(self):
        """:return: DispenserStatus object"""
        # import here to avoid circular import
        from ocean_lib.models.dispenser import DispenserStatus

        status_tup = self._ocean_dispenser().status(self.address)
        return DispenserStatus(status_tup)

Fixed Rate Exchange utils:

datatoken.create_exchange(self, rate: Union[int, str], base_token_addr: str, tx_dict: dict, owner_addr: Optional[str] = None, publish_market_fee_collector: Optional[str] = None, publish_market_fee: Union[int, str] = 0, with_mint: bool = False, allowed_swapper: str = ZERO_ADDRESS, full_info: bool = False) -> Union[OneExchange, tuple]

It is implemented in DatatokenBase, inherited by Datatoken2, so it can be called within both instances.

For this datatoken, create a single fixed-rate exchange (OneExchange).

This wraps the smart contract method Datatoken.createFixedRate() with a simpler interface.

Main params:

  • rate - how many base tokens does 1 datatoken cost? In wei or string
  • base_token_addr - e.g. OCEAN address
  • tx_dict - is the configuration dictionary for that specific transaction. Usually for development we include just the from wallet, but for remote networks, you can provide gas fees, required confirmations for that block etc. For more info, check Brownie docs.

Optional params, with good defaults:

  • owner_addr - owner of the datatoken
  • publish_market_fee_collector - fee going to publish market address
  • publish_market_fee - in wei or string, e.g. int(1e15) or "0.001 ether"
  • with_mint - should the exchange mint datatokens as needed (True), or do they need to be supplied/allowed by participants like base token (False)?
  • allowed_swapper - if ZERO_ADDRESS, anyone can swap
  • full_info - return just OneExchange, or (OneExchange, <other info>)

Return

  • exchange - OneExchange
  • (maybe) tx_receipt

{% code overflow="wrap" %}

@enforce_types
    def create_exchange(
        self,
        rate: Union[int, str],
        base_token_addr: str,
        tx_dict: dict,
        owner_addr: Optional[str] = None,
        publish_market_fee_collector: Optional[str] = None,
        publish_market_fee: Union[int, str] = 0,
        with_mint: bool = False,
        allowed_swapper: str = ZERO_ADDRESS,
        full_info: bool = False,
    ) -> Union[OneExchange, tuple]:
    
        # import now, to avoid circular import
        from ocean_lib.models.fixed_rate_exchange import OneExchange

        FRE_addr = get_address_of_type(self.config_dict, "FixedPrice")
        from_addr = (
            tx_dict["from"].address
            if hasattr(tx_dict["from"], "address")
            else tx_dict["from"]
        )
        BT = Datatoken(self.config_dict, base_token_addr)
        owner_addr = owner_addr or from_addr
        publish_market_fee_collector = publish_market_fee_collector or from_addr

        tx = self.contract.createFixedRate(
            checksum_addr(FRE_addr),
            [
                checksum_addr(BT.address),
                checksum_addr(owner_addr),
                checksum_addr(publish_market_fee_collector),
                checksum_addr(allowed_swapper),
            ],
            [
                BT.decimals(),
                self.decimals(),
                rate,
                publish_market_fee,
                with_mint,
            ],
            tx_dict,
        )

        exchange_id = tx.events["NewFixedRate"]["exchangeId"]
        FRE = self._FRE()
        exchange = OneExchange(FRE, exchange_id)
        if full_info:
            return (exchange, tx)
        return exchange

{% endcode %}

datatoken.buy_DT_and_order(self, consumer: str, service_index: int, provider_fees: dict, exchange: Any, transaction_parameters: dict, consume_market_fees=None, ) -> str:

This function is used to retrieve funds or datatokens for an user who wants to start an order.

It is implemented in Datatoken class and it is also inherited in Datatoken2 class.

Each parameter has the following meaning:

  1. consumer - address of the consumer wallet that needs funding
  2. service_index - service index as int for identifying the service that you want to further call start_order.
  3. transaction_parameters - is the configuration dictionary for that specific transaction. Usually for development we include just the from wallet, but for remote networks, you can provide gas fees, required confirmations for that block etc. For more info, check Brownie docs.
  4. consume_market_fees - TokenInfo object which contains the consume market fee amount, address & token address. If it is not explicitly specified, by default it has an empty TokenInfo object.

Return value is a hex string for transaction hash which denotes the proof of starting order.

 @enforce_types
    def buy_DT_and_order(
        self,
        consumer: str,
        service_index: int,
        provider_fees: dict,
        exchange: Any,
        transaction_parameters: dict,
        consume_market_fees=None,
    ) -> str:
        fre_address = get_address_of_type(self.config_dict, "FixedPrice")

        # import now, to avoid circular import
        from ocean_lib.models.fixed_rate_exchange import OneExchange

        if not consume_market_fees:
            consume_market_fees = TokenFeeInfo()

        if not isinstance(exchange, OneExchange):
            exchange = OneExchange(fre_address, exchange)

        exchange.buy_DT(
            datatoken_amt=to_wei(1),
            consume_market_fee_addr=consume_market_fees.address,
            consume_market_fee=consume_market_fees.amount,
            tx_dict=transaction_parameters,
        )

        return self.start_order(
            consumer=ContractBase.to_checksum_address(consumer),
            service_index=service_index,
            provider_fees=provider_fees,
            consume_market_fees=consume_market_fees,
            transaction_parameters=transaction_parameters,
        )
datatoken.get_exchanges(self) -> list

Returns List[OneExchange] - all the exchanges for this datatoken.

It is implemented in DatatokenBase, inherited by Datatoken2, so it can be called within both instances.

{% code overflow="wrap" %}

@enforce_types
    def get_exchanges(self) -> list:
        """return List[OneExchange] - all the exchanges for this datatoken"""
        # import now, to avoid circular import
        from ocean_lib.models.fixed_rate_exchange import OneExchange

        FRE = self._FRE()
        addrs_and_exchange_ids = self.getFixedRates()
        exchanges = [
            OneExchange(FRE, exchange_id) for _, exchange_id in addrs_and_exchange_ids
        ]
        return exchanges

{% endcode %}

Orders:

datatoken.start_order( self, consumer: str, service_index: int, provider_fees: dict, transaction_parameters: dict, consume_market_fees=None, ) -> str

Starting order of a certain datatoken.

It is implemented in Datatoken class and it is also inherited in Datatoken2 class.

Each parameter has the following meaning:

  1. consumer - address of the consumer wallet that needs funding
  2. service_index - service index as int for identifying the service that you want to apply start_order.
  3. provider_fees - dictionary which includes provider fees generated when initialize endpoint from Provider was called.
  4. transaction_parameters - is the configuration dictionary for that specific transaction. Usually for development we include just the from wallet, but for remote networks, you can provide gas fees, required confirmations for that block etc. For more info, check Brownie docs.
  5. consume_market_fees - TokenInfo object which contains the consume market fee amount, address & token address. If it is not explicitly specified, by default it has an empty TokenInfo object.

Return value is a hex string for transaction hash which denotes the proof of starting order.

@enforce_types
    def start_order(
        self,
        consumer: str,
        service_index: int,
        provider_fees: dict,
        transaction_parameters: dict,
        consume_market_fees=None,
    ) -> str:

        if not consume_market_fees:
            consume_market_fees = TokenFeeInfo()

        return self.contract.startOrder(
            checksum_addr(consumer),
            service_index,
            (
                checksum_addr(provider_fees["providerFeeAddress"]),
                checksum_addr(provider_fees["providerFeeToken"]),
                int(provider_fees["providerFeeAmount"]),
                provider_fees["v"],
                provider_fees["r"],
                provider_fees["s"],
                provider_fees["validUntil"],
                provider_fees["providerData"],
            ),
            consume_market_fees.to_tuple(),
            transaction_parameters,
        )
datatoken.reuse_order(self, order_tx_id: Union[str, bytes], provider_fees: dict, transaction_parameters: dict) -> str

Reusing an order from a certain datatoken.

It is implemented in Datatoken class and it is also inherited in Datatoken2 class.

Each parameter has the following meaning:

  1. order_tx_id - transaction hash of a previous order, string or bytes format.
  2. provider_fees - dictionary which includes provider fees generated when initialize endpoint from Provider was called.
  3. transaction_parameters - is the configuration dictionary for that specific transaction. Usually for development we include just the from wallet, but for remote networks, you can provide gas fees, required confirmations for that block etc. For more info, check Brownie docs.

Return value is a hex string for transaction hash which denotes the proof of reusing order.

    @enforce_types
    def reuse_order(
        self,
        order_tx_id: Union[str, bytes],
        provider_fees: dict,
        transaction_parameters: dict,
    ) -> str:
        return self.contract.reuseOrder(
            order_tx_id,
            (
                checksum_addr(provider_fees["providerFeeAddress"]),
                checksum_addr(provider_fees["providerFeeToken"]),
                int(provider_fees["providerFeeAmount"]),
                provider_fees["v"],
                provider_fees["r"],
                provider_fees["s"],
                provider_fees["validUntil"],
                provider_fees["providerData"],
            ),
            transaction_parameters,
        )