market/src/@utils/aquarius.ts

481 lines
12 KiB
TypeScript

import { Asset, LoggerInstance } from '@oceanprotocol/lib'
import { AssetSelectionAsset } from '@shared/FormFields/AssetSelection'
import axios, { CancelToken, AxiosResponse } from 'axios'
import { OrdersData_orders as OrdersData } from '../@types/subgraph/OrdersData'
import { metadataCacheUri, v3MetadataCacheUri } from '../../app.config'
import {
SortDirectionOptions,
SortTermOptions
} from '../@types/aquarius/SearchQuery'
import { transformAssetToAssetSelection } from './assetConvertor'
export interface UserSales {
id: string
totalSales: number
}
export const MAXIMUM_NUMBER_OF_PAGES_WITH_RESULTS = 476
export function escapeEsReservedCharacters(value: string): string {
// eslint-disable-next-line no-useless-escape
const pattern = /([\!\*\+\-\=\<\>\&\|\(\)\[\]\{\}\^\~\?\:\\/"])/g
return value.replace(pattern, '\\$1')
}
/**
* @param filterField the name of the actual field from the ddo schema e.g. 'id','service.attributes.main.type'
* @param value the value of the filter
* @returns json structure of the es filter
*/
export function getFilterTerm(
filterField: string,
value: string | number | boolean | number[] | string[]
): FilterTerm {
const isArray = Array.isArray(value)
return {
[isArray ? 'terms' : 'term']: {
[filterField]: value
}
}
}
export function generateBaseQuery(
baseQueryParams: BaseQueryParams
): SearchQuery {
const generatedQuery = {
from: baseQueryParams.esPaginationOptions?.from || 0,
size: baseQueryParams.esPaginationOptions?.size || 1000,
query: {
bool: {
...baseQueryParams.nestedQuery,
filter: [
...(baseQueryParams.filters || []),
getFilterTerm('chainId', baseQueryParams.chainIds),
getFilterTerm('_index', 'aquarius'),
...(baseQueryParams.ignorePurgatory
? []
: [getFilterTerm('purgatory.state', false)])
]
}
}
} as SearchQuery
if (baseQueryParams.aggs !== undefined) {
generatedQuery.aggs = baseQueryParams.aggs
}
if (baseQueryParams.sortOptions !== undefined)
generatedQuery.sort = {
[baseQueryParams.sortOptions.sortBy]:
baseQueryParams.sortOptions.sortDirection ||
SortDirectionOptions.Descending
}
return generatedQuery
}
export function transformQueryResult(
queryResult: SearchResponse,
from = 0,
size = 21
): PagedAssets {
const result: PagedAssets = {
results: [],
page: 0,
totalPages: 0,
totalResults: 0,
aggregations: []
}
result.results = (queryResult.hits.hits || []).map(
(hit) => hit._source as Asset
)
result.aggregations = queryResult.aggregations
result.totalResults = queryResult.hits.total.value
result.totalPages =
result.totalResults / size < 1
? Math.floor(result.totalResults / size)
: Math.ceil(result.totalResults / size)
result.page = from ? from / size + 1 : 1
return result
}
export async function queryMetadata(
query: SearchQuery,
cancelToken: CancelToken
): Promise<PagedAssets> {
try {
const response: AxiosResponse<SearchResponse> = await axios.post(
`${metadataCacheUri}/api/aquarius/assets/query`,
{ ...query },
{ cancelToken }
)
if (!response || response.status !== 200 || !response.data) return
return transformQueryResult(response.data, query.from, query.size)
} catch (error) {
if (axios.isCancel(error)) {
LoggerInstance.log(error.message)
} else {
LoggerInstance.error(error.message)
}
}
}
export async function retrieveAsset(
did: string,
cancelToken: CancelToken
): Promise<Asset> {
try {
const response: AxiosResponse<Asset> = await axios.get(
`${metadataCacheUri}/api/aquarius/assets/ddo/${did}`,
{ cancelToken }
)
if (!response || response.status !== 200 || !response.data) return
const data = { ...response.data }
return data
} catch (error) {
if (axios.isCancel(error)) {
LoggerInstance.log(error.message)
} else {
LoggerInstance.error(error.message)
}
}
}
export async function checkV3Asset(
did: string,
cancelToken: CancelToken
): Promise<boolean> {
try {
const response: AxiosResponse<Asset> = await axios.get(
`${v3MetadataCacheUri}/api/v1/aquarius/assets/ddo/${did}`,
{ cancelToken }
)
if (!response || response.status !== 200 || !response.data) return false
return true
} catch (error) {
if (axios.isCancel(error)) {
LoggerInstance.log(error.message)
} else {
LoggerInstance.error(error.message)
}
return false
}
}
export async function getAssetsNames(
didList: string[],
cancelToken: CancelToken
): Promise<Record<string, string>> {
try {
const response: AxiosResponse<Record<string, string>> = await axios.post(
`${metadataCacheUri}/api/aquarius/assets/names`,
{ didList },
{ cancelToken }
)
if (!response || response.status !== 200 || !response.data) return
return response.data
} catch (error) {
if (axios.isCancel(error)) {
LoggerInstance.log(error.message)
} else {
LoggerInstance.error(error.message)
}
}
}
export async function getAssetsFromDidList(
didList: string[],
chainIds: number[],
cancelToken: CancelToken
): Promise<PagedAssets> {
try {
if (!(didList.length > 0)) return
const baseParams = {
chainIds,
filters: [getFilterTerm('_id', didList)],
ignorePurgatory: true
} as BaseQueryParams
const query = generateBaseQuery(baseParams)
const queryResult = await queryMetadata(query, cancelToken)
return queryResult
} catch (error) {
LoggerInstance.error(error.message)
}
}
export async function getAssetsFromDtList(
dtList: string[],
chainIds: number[],
cancelToken: CancelToken
): Promise<Asset[]> {
try {
if (!(dtList.length > 0)) return
const baseParams = {
chainIds,
filters: [getFilterTerm('services.datatokenAddress', dtList)],
ignorePurgatory: true
} as BaseQueryParams
const query = generateBaseQuery(baseParams)
const queryResult = await queryMetadata(query, cancelToken)
return queryResult?.results
} catch (error) {
LoggerInstance.error(error.message)
}
}
export async function retrieveDDOListByDIDs(
didList: string[],
chainIds: number[],
cancelToken: CancelToken
): Promise<Asset[]> {
if (didList?.length === 0 || chainIds?.length === 0) return []
try {
const orderedDDOListByDIDList: Asset[] = []
const baseQueryparams = {
chainIds,
filters: [getFilterTerm('_id', didList)],
ignorePurgatory: true
} as BaseQueryParams
const query = generateBaseQuery(baseQueryparams)
const result = await queryMetadata(query, cancelToken)
didList.forEach((did: string) => {
const ddo = result.results.find((ddo: Asset) => ddo.id === did)
if (ddo) orderedDDOListByDIDList.push(ddo)
})
return orderedDDOListByDIDList
} catch (error) {
LoggerInstance.error(error.message)
}
}
export async function getAlgorithmDatasetsForCompute(
algorithmId: string,
datasetProviderUri: string,
datasetChainId?: number,
cancelToken?: CancelToken
): Promise<AssetSelectionAsset[]> {
const baseQueryParams = {
chainIds: [datasetChainId],
nestedQuery: {
must: {
match: {
'services.compute.publisherTrustedAlgorithms.did': {
query: escapeEsReservedCharacters(algorithmId)
}
}
}
},
sortOptions: {
sortBy: SortTermOptions.Created,
sortDirection: SortDirectionOptions.Descending
}
} as BaseQueryParams
const query = generateBaseQuery(baseQueryParams)
const computeDatasets = await queryMetadata(query, cancelToken)
if (computeDatasets.totalResults === 0) return []
const datasets = await transformAssetToAssetSelection(
datasetProviderUri,
computeDatasets.results,
[]
)
return datasets
}
export async function getPublishedAssets(
accountId: string,
chainIds: number[],
cancelToken: CancelToken,
page?: number,
type?: string,
accesType?: string
): Promise<PagedAssets> {
if (!accountId) return
const filters: FilterTerm[] = []
filters.push(getFilterTerm('nft.owner', accountId.toLowerCase()))
accesType !== undefined &&
filters.push(getFilterTerm('services.type', accesType))
type !== undefined && filters.push(getFilterTerm('metadata.type', type))
const baseQueryParams = {
chainIds,
filters,
sortOptions: {
sortBy: SortTermOptions.Created,
sortDirection: SortDirectionOptions.Descending
},
aggs: {
totalOrders: {
sum: {
field: SortTermOptions.Stats
}
}
},
ignorePurgatory: true,
esPaginationOptions: {
from: (Number(page) - 1 || 0) * 9,
size: 9
}
} as BaseQueryParams
const query = generateBaseQuery(baseQueryParams)
try {
const result = await queryMetadata(query, cancelToken)
return result
} catch (error) {
if (axios.isCancel(error)) {
LoggerInstance.log(error.message)
} else {
LoggerInstance.error(error.message)
}
}
}
export async function getTopPublishers(
chainIds: number[],
cancelToken: CancelToken,
page?: number,
type?: string,
accesType?: string
): Promise<PagedAssets> {
const filters: FilterTerm[] = []
accesType !== undefined &&
filters.push(getFilterTerm('services.type', accesType))
type !== undefined && filters.push(getFilterTerm('metadata.type', type))
const baseQueryParams = {
chainIds,
filters,
sortOptions: {
sortBy: SortTermOptions.Created,
sortDirection: SortDirectionOptions.Descending
},
aggs: {
topPublishers: {
terms: {
field: 'nft.owner.keyword',
order: { totalSales: 'desc' }
},
aggs: {
totalSales: {
sum: {
field: SortTermOptions.Stats
}
}
}
}
},
esPaginationOptions: {
from: (Number(page) - 1 || 0) * 9,
size: 9
}
} as BaseQueryParams
const query = generateBaseQuery(baseQueryParams)
try {
const result = await queryMetadata(query, cancelToken)
return result
} catch (error) {
if (axios.isCancel(error)) {
LoggerInstance.log(error.message)
} else {
LoggerInstance.error(error.message)
}
}
}
export async function getTopAssetsPublishers(
chainIds: number[],
nrItems = 9
): Promise<UserSales[]> {
const publishers: UserSales[] = []
const result = await getTopPublishers(chainIds, null)
const { topPublishers } = result.aggregations
for (let i = 0; i < topPublishers.buckets.length; i++) {
publishers.push({
id: topPublishers.buckets[i].key,
totalSales: parseInt(topPublishers.buckets[i].totalSales.value)
})
}
publishers.sort((a, b) => b.totalSales - a.totalSales)
return publishers.slice(0, nrItems)
}
export async function getUserSales(
accountId: string,
chainIds: number[]
): Promise<number> {
try {
const result = await getPublishedAssets(accountId, chainIds, null)
const { totalOrders } = result.aggregations
return totalOrders.value
} catch (error) {
LoggerInstance.error('Error getUserSales', error.message)
}
}
export async function getDownloadAssets(
dtList: string[],
tokenOrders: OrdersData[],
chainIds: number[],
cancelToken: CancelToken
): Promise<DownloadedAsset[]> {
const baseQueryparams = {
chainIds,
filters: [
getFilterTerm('services.datatokenAddress', dtList),
getFilterTerm('services.type', 'access')
]
} as BaseQueryParams
const query = generateBaseQuery(baseQueryparams)
try {
const result = await queryMetadata(query, cancelToken)
const downloadedAssets: DownloadedAsset[] = result.results
.map((asset) => {
const order = tokenOrders.find(
({ datatoken }) =>
datatoken?.address.toLowerCase() ===
asset.services[0].datatokenAddress.toLowerCase()
)
return {
asset,
networkId: asset.chainId,
dtSymbol: order?.datatoken?.symbol,
timestamp: order?.createdTimestamp
}
})
.sort((a, b) => b.timestamp - a.timestamp)
return downloadedAssets
} catch (error) {
if (axios.isCancel(error)) {
LoggerInstance.log(error.message)
} else {
LoggerInstance.error(error.message)
}
}
}