import { Asset, LoggerInstance } from '@oceanprotocol/lib' import { AssetSelectionAsset } from '@shared/FormFields/AssetSelection' import axios, { CancelToken, AxiosResponse } from 'axios' import { OrdersData_orders as OrdersData } from '../@types/subgraph/OrdersData' import { metadataCacheUri, v3MetadataCacheUri } from '../../app.config' import { SortDirectionOptions, SortTermOptions } from '../@types/aquarius/SearchQuery' import { transformAssetToAssetSelection } from './assetConvertor' export const MAXIMUM_NUMBER_OF_PAGES_WITH_RESULTS = 476 export function escapeEsReservedCharacters(value: string): string { // eslint-disable-next-line no-useless-escape const pattern = /([\!\*\+\-\=\<\>\&\|\(\)\[\]\{\}\^\~\?\:\\/"])/g return value.replace(pattern, '\\$1') } /** * @param filterField the name of the actual field from the ddo schema e.g. 'id','service.attributes.main.type' * @param value the value of the filter * @returns json structure of the es filter */ export function getFilterTerm( filterField: string, value: string | number | boolean | number[] | string[] ): FilterTerm { const isArray = Array.isArray(value) return { [isArray ? 'terms' : 'term']: { [filterField]: value } } } export function generateBaseQuery( baseQueryParams: BaseQueryParams ): SearchQuery { const generatedQuery = { from: baseQueryParams.esPaginationOptions?.from || 0, size: baseQueryParams.esPaginationOptions?.size || 1000, query: { bool: { ...baseQueryParams.nestedQuery, filter: [ ...(baseQueryParams.filters || []), getFilterTerm('chainId', baseQueryParams.chainIds), getFilterTerm('_index', 'aquarius'), ...(baseQueryParams.ignorePurgatory ? [] : [getFilterTerm('purgatory.state', false)]) ] } } } as SearchQuery if (baseQueryParams.aggs !== undefined) { generatedQuery.aggs = baseQueryParams.aggs } if (baseQueryParams.sortOptions !== undefined) generatedQuery.sort = { [baseQueryParams.sortOptions.sortBy]: baseQueryParams.sortOptions.sortDirection || SortDirectionOptions.Descending } return generatedQuery } export function transformQueryResult( queryResult: SearchResponse, from = 0, size = 21 ): PagedAssets { const result: PagedAssets = { results: [], page: 0, totalPages: 0, totalResults: 0, aggregations: [] } result.results = (queryResult.hits.hits || []).map( (hit) => hit._source as Asset ) result.aggregations = queryResult.aggregations result.totalResults = queryResult.hits.total.value result.totalPages = result.totalResults / size < 1 ? Math.floor(result.totalResults / size) : Math.ceil(result.totalResults / size) result.page = from ? from / size + 1 : 1 return result } export async function queryMetadata( query: SearchQuery, cancelToken: CancelToken ): Promise { try { const response: AxiosResponse = await axios.post( `${metadataCacheUri}/api/aquarius/assets/query`, { ...query }, { cancelToken } ) if (!response || response.status !== 200 || !response.data) return return transformQueryResult(response.data, query.from, query.size) } catch (error) { if (axios.isCancel(error)) { LoggerInstance.log(error.message) } else { LoggerInstance.error(error.message) } } } export async function retrieveAsset( did: string, cancelToken: CancelToken ): Promise { try { const response: AxiosResponse = await axios.get( `${metadataCacheUri}/api/aquarius/assets/ddo/${did}`, { cancelToken } ) if (!response || response.status !== 200 || !response.data) return const data = { ...response.data } return data } catch (error) { if (axios.isCancel(error)) { LoggerInstance.log(error.message) } else { LoggerInstance.error(error.message) } } } export async function checkV3Asset( did: string, cancelToken: CancelToken ): Promise { try { const response: AxiosResponse = await axios.get( `${v3MetadataCacheUri}/api/v1/aquarius/assets/ddo/${did}`, { cancelToken } ) if (!response || response.status !== 200 || !response.data) return false return true } catch (error) { if (axios.isCancel(error)) { LoggerInstance.log(error.message) } else { LoggerInstance.error(error.message) } return false } } export async function getAssetsNames( didList: string[], cancelToken: CancelToken ): Promise> { try { const response: AxiosResponse> = await axios.post( `${metadataCacheUri}/api/aquarius/assets/names`, { didList }, { cancelToken } ) if (!response || response.status !== 200 || !response.data) return return response.data } catch (error) { if (axios.isCancel(error)) { LoggerInstance.log(error.message) } else { LoggerInstance.error(error.message) } } } export async function getAssetsFromDidList( didList: string[], chainIds: number[], cancelToken: CancelToken ): Promise { try { if (!(didList.length > 0)) return const baseParams = { chainIds, filters: [getFilterTerm('_id', didList)], ignorePurgatory: true } as BaseQueryParams const query = generateBaseQuery(baseParams) const queryResult = await queryMetadata(query, cancelToken) return queryResult } catch (error) { LoggerInstance.error(error.message) } } export async function getAssetsFromDtList( dtList: string[], chainIds: number[], cancelToken: CancelToken ): Promise { try { if (!(dtList.length > 0)) return const baseParams = { chainIds, filters: [getFilterTerm('services.datatokenAddress', dtList)], ignorePurgatory: true } as BaseQueryParams const query = generateBaseQuery(baseParams) const queryResult = await queryMetadata(query, cancelToken) return queryResult?.results } catch (error) { LoggerInstance.error(error.message) } } export async function retrieveDDOListByDIDs( didList: string[], chainIds: number[], cancelToken: CancelToken ): Promise { if (didList?.length === 0 || chainIds?.length === 0) return [] try { const orderedDDOListByDIDList: Asset[] = [] const baseQueryparams = { chainIds, filters: [getFilterTerm('_id', didList)], ignorePurgatory: true } as BaseQueryParams const query = generateBaseQuery(baseQueryparams) const result = await queryMetadata(query, cancelToken) didList.forEach((did: string) => { const ddo = result.results.find((ddo: Asset) => ddo.id === did) if (ddo) orderedDDOListByDIDList.push(ddo) }) return orderedDDOListByDIDList } catch (error) { LoggerInstance.error(error.message) } } export async function getAlgorithmDatasetsForCompute( algorithmId: string, datasetProviderUri: string, datasetChainId?: number, cancelToken?: CancelToken ): Promise { const baseQueryParams = { chainIds: [datasetChainId], nestedQuery: { must: { match: { 'services.compute.publisherTrustedAlgorithms.did': { query: escapeEsReservedCharacters(algorithmId) } } } }, sortOptions: { sortBy: SortTermOptions.Created, sortDirection: SortDirectionOptions.Descending } } as BaseQueryParams const query = generateBaseQuery(baseQueryParams) const computeDatasets = await queryMetadata(query, cancelToken) if (computeDatasets.totalResults === 0) return [] const datasets = await transformAssetToAssetSelection( datasetProviderUri, computeDatasets.results, [] ) return datasets } export async function getPublishedAssets( accountId: string, chainIds: number[], cancelToken: CancelToken, page?: number, type?: string, accesType?: string ): Promise { if (!accountId) return const filters: FilterTerm[] = [] filters.push(getFilterTerm('nft.owner', accountId.toLowerCase())) accesType !== undefined && filters.push(getFilterTerm('services.type', accesType)) type !== undefined && filters.push(getFilterTerm('metadata.type', type)) const baseQueryParams = { chainIds, filters, sortOptions: { sortBy: SortTermOptions.Created, sortDirection: SortDirectionOptions.Descending }, aggs: { totalOrders: { sum: { field: SortTermOptions.Stats } } }, ignorePurgatory: true, esPaginationOptions: { from: (Number(page) - 1 || 0) * 9, size: 9 } } as BaseQueryParams const query = generateBaseQuery(baseQueryParams) try { const result = await queryMetadata(query, cancelToken) return result } catch (error) { if (axios.isCancel(error)) { LoggerInstance.log(error.message) } else { LoggerInstance.error(error.message) } } } export async function getTopPublishers( chainIds: number[], cancelToken: CancelToken, page?: number, type?: string, accesType?: string ): Promise { const filters: FilterTerm[] = [] accesType !== undefined && filters.push(getFilterTerm('services.type', accesType)) type !== undefined && filters.push(getFilterTerm('metadata.type', type)) const baseQueryParams = { chainIds, filters, sortOptions: { sortBy: SortTermOptions.Created, sortDirection: SortDirectionOptions.Descending }, aggs: { topPublishers: { terms: { field: 'nft.owner.keyword', order: { totalSales: 'desc' } }, aggs: { totalSales: { sum: { field: SortTermOptions.Stats } } } } }, esPaginationOptions: { from: (Number(page) - 1 || 0) * 9, size: 9 } } as BaseQueryParams const query = generateBaseQuery(baseQueryParams) try { const result = await queryMetadata(query, cancelToken) return result } catch (error) { if (axios.isCancel(error)) { LoggerInstance.log(error.message) } else { LoggerInstance.error(error.message) } } } export async function getDownloadAssets( dtList: string[], tokenOrders: OrdersData[], chainIds: number[], cancelToken: CancelToken ): Promise { const baseQueryparams = { chainIds, filters: [ getFilterTerm('services.datatokenAddress', dtList), getFilterTerm('services.type', 'access') ] } as BaseQueryParams const query = generateBaseQuery(baseQueryparams) try { const result = await queryMetadata(query, cancelToken) const downloadedAssets: DownloadedAsset[] = result.results .map((asset) => { const order = tokenOrders.find( ({ datatoken }) => datatoken?.address.toLowerCase() === asset.services[0].datatokenAddress.toLowerCase() ) return { asset, networkId: asset.chainId, dtSymbol: order?.datatoken?.symbol, timestamp: order?.createdTimestamp } }) .sort((a, b) => b.timestamp - a.timestamp) return downloadedAssets } catch (error) { if (axios.isCancel(error)) { LoggerInstance.log(error.message) } else { LoggerInstance.error(error.message) } } }