From b79ed9dee695e14529c019e98e9572b1e07023ba Mon Sep 17 00:00:00 2001 From: Mike Cao Date: Tue, 9 Apr 2024 00:37:23 -0700 Subject: [PATCH] Updated session event data save. --- src/lib/types.ts | 2 +- .../analytics/eventData/saveEventData.ts | 34 +++---- .../analytics/sessions/saveSessionData.ts | 90 ++++++++++++++++--- 3 files changed, 97 insertions(+), 29 deletions(-) diff --git a/src/lib/types.ts b/src/lib/types.ts index 3470abe8..5c9a39c6 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -73,7 +73,7 @@ export interface FilterQueryResult { } export interface DynamicData { - [key: string]: number | string | DynamicData | number[] | string[] | DynamicData[]; + [key: string]: number | string | number[] | string[]; } export interface Auth { diff --git a/src/queries/analytics/eventData/saveEventData.ts b/src/queries/analytics/eventData/saveEventData.ts index c37dd896..e14c88d4 100644 --- a/src/queries/analytics/eventData/saveEventData.ts +++ b/src/queries/analytics/eventData/saveEventData.ts @@ -7,7 +7,7 @@ import kafka from 'lib/kafka'; import prisma from 'lib/prisma'; import { DynamicData } from 'lib/types'; -export async function saveEventData(args: { +export async function saveEventData(data: { websiteId: string; eventId: string; sessionId?: string; @@ -17,8 +17,8 @@ export async function saveEventData(args: { createdAt?: string; }) { return runQuery({ - [PRISMA]: () => relationalQuery(args), - [CLICKHOUSE]: () => clickhouseQuery(args), + [PRISMA]: () => relationalQuery(data), + [CLICKHOUSE]: () => clickhouseQuery(data), }); } @@ -63,19 +63,21 @@ async function clickhouseQuery(data: { const jsonKeys = flattenJSON(eventData); - const messages = jsonKeys.map(a => ({ - website_id: websiteId, - session_id: sessionId, - event_id: eventId, - url_path: urlPath, - event_name: eventName, - event_key: a.key, - string_value: getStringValue(a.value, a.dataType), - number_value: a.dataType === DATA_TYPE.number ? a.value : null, - date_value: a.dataType === DATA_TYPE.date ? getDateFormat(a.value) : null, - data_type: a.dataType, - created_at: createdAt, - })); + const messages = jsonKeys.map(({ key, value, dataType }) => { + return { + website_id: websiteId, + session_id: sessionId, + event_id: eventId, + url_path: urlPath, + event_name: eventName, + data_key: key, + data_type: dataType, + string_value: getStringValue(value, dataType), + number_value: dataType === DATA_TYPE.number ? value : null, + date_value: dataType === DATA_TYPE.date ? getDateFormat(value) : null, + created_at: createdAt, + }; + }); await sendMessages(messages, 'event_data'); diff --git a/src/queries/analytics/sessions/saveSessionData.ts b/src/queries/analytics/sessions/saveSessionData.ts index efd4898f..5fa4fe72 100644 --- a/src/queries/analytics/sessions/saveSessionData.ts +++ b/src/queries/analytics/sessions/saveSessionData.ts @@ -3,13 +3,27 @@ import { uuid } from 'lib/crypto'; import { flattenJSON, getStringValue } from 'lib/data'; import prisma from 'lib/prisma'; import { DynamicData } from 'lib/types'; +import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db'; +import kafka from 'lib/kafka'; export async function saveSessionData(data: { websiteId: string; sessionId: string; sessionData: DynamicData; + createdAt?: string; }) { - const { client, transaction } = prisma; + return runQuery({ + [PRISMA]: () => relationalQuery(data), + [CLICKHOUSE]: () => clickhouseQuery(data), + }); +} + +export async function relationalQuery(data: { + websiteId: string; + sessionId: string; + sessionData: DynamicData; +}) { + const { client } = prisma; const { websiteId, sessionId, sessionData } = data; const jsonKeys = flattenJSON(sessionData); @@ -18,21 +32,73 @@ export async function saveSessionData(data: { id: uuid(), websiteId, sessionId, - key: a.key, + sessionKey: a.key, stringValue: getStringValue(a.value, a.dataType), numberValue: a.dataType === DATA_TYPE.number ? a.value : null, dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null, dataType: a.dataType, })); - return transaction([ - client.sessionData.deleteMany({ - where: { - sessionId, - }, - }), - client.sessionData.createMany({ - data: flattenedData as any, - }), - ]); + const existing = await client.sessionData.findMany({ + where: { + sessionId, + }, + select: { + id: true, + sessionId: true, + sessionKey: true, + }, + }); + + for (const data of flattenedData) { + const { sessionId, sessionKey, ...props } = data; + const record = existing.find(e => e.sessionId === sessionId && e.sessionKey === sessionKey); + + if (record) { + await client.sessionData.update({ + where: { + id: record.id, + }, + data: { + ...props, + }, + }); + } else { + await client.sessionData.create({ + data, + }); + } + } + + return flattenedData; +} + +async function clickhouseQuery(data: { + websiteId: string; + sessionId: string; + sessionData: DynamicData; + createdAt?: string; +}) { + const { websiteId, sessionId, sessionData, createdAt } = data; + + const { getDateFormat, sendMessages } = kafka; + + const jsonKeys = flattenJSON(sessionData); + + const messages = jsonKeys.map(({ key, value, dataType }) => { + return { + website_id: websiteId, + session_id: sessionId, + data_key: key, + data_type: dataType, + string_value: getStringValue(value, dataType), + number_value: dataType === DATA_TYPE.number ? value : null, + date_value: dataType === DATA_TYPE.date ? getDateFormat(value) : null, + created_at: createdAt, + }; + }); + + await sendMessages(messages, 'session_data'); + + return data; }