Updated session event data save.

This commit is contained in:
Mike Cao 2024-04-09 00:37:23 -07:00
parent 0a4eb05196
commit b79ed9dee6
3 changed files with 97 additions and 29 deletions

View File

@ -73,7 +73,7 @@ export interface FilterQueryResult<T> {
} }
export interface DynamicData { export interface DynamicData {
[key: string]: number | string | DynamicData | number[] | string[] | DynamicData[]; [key: string]: number | string | number[] | string[];
} }
export interface Auth { export interface Auth {

View File

@ -7,7 +7,7 @@ import kafka from 'lib/kafka';
import prisma from 'lib/prisma'; import prisma from 'lib/prisma';
import { DynamicData } from 'lib/types'; import { DynamicData } from 'lib/types';
export async function saveEventData(args: { export async function saveEventData(data: {
websiteId: string; websiteId: string;
eventId: string; eventId: string;
sessionId?: string; sessionId?: string;
@ -17,8 +17,8 @@ export async function saveEventData(args: {
createdAt?: string; createdAt?: string;
}) { }) {
return runQuery({ return runQuery({
[PRISMA]: () => relationalQuery(args), [PRISMA]: () => relationalQuery(data),
[CLICKHOUSE]: () => clickhouseQuery(args), [CLICKHOUSE]: () => clickhouseQuery(data),
}); });
} }
@ -63,19 +63,21 @@ async function clickhouseQuery(data: {
const jsonKeys = flattenJSON(eventData); const jsonKeys = flattenJSON(eventData);
const messages = jsonKeys.map(a => ({ const messages = jsonKeys.map(({ key, value, dataType }) => {
website_id: websiteId, return {
session_id: sessionId, website_id: websiteId,
event_id: eventId, session_id: sessionId,
url_path: urlPath, event_id: eventId,
event_name: eventName, url_path: urlPath,
event_key: a.key, event_name: eventName,
string_value: getStringValue(a.value, a.dataType), data_key: key,
number_value: a.dataType === DATA_TYPE.number ? a.value : null, data_type: dataType,
date_value: a.dataType === DATA_TYPE.date ? getDateFormat(a.value) : null, string_value: getStringValue(value, dataType),
data_type: a.dataType, number_value: dataType === DATA_TYPE.number ? value : null,
created_at: createdAt, date_value: dataType === DATA_TYPE.date ? getDateFormat(value) : null,
})); created_at: createdAt,
};
});
await sendMessages(messages, 'event_data'); await sendMessages(messages, 'event_data');

View File

@ -3,13 +3,27 @@ import { uuid } from 'lib/crypto';
import { flattenJSON, getStringValue } from 'lib/data'; import { flattenJSON, getStringValue } from 'lib/data';
import prisma from 'lib/prisma'; import prisma from 'lib/prisma';
import { DynamicData } from 'lib/types'; import { DynamicData } from 'lib/types';
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
import kafka from 'lib/kafka';
export async function saveSessionData(data: { export async function saveSessionData(data: {
websiteId: string; websiteId: string;
sessionId: string; sessionId: string;
sessionData: DynamicData; sessionData: DynamicData;
createdAt?: string;
}) { }) {
const { client, transaction } = prisma; return runQuery({
[PRISMA]: () => relationalQuery(data),
[CLICKHOUSE]: () => clickhouseQuery(data),
});
}
export async function relationalQuery(data: {
websiteId: string;
sessionId: string;
sessionData: DynamicData;
}) {
const { client } = prisma;
const { websiteId, sessionId, sessionData } = data; const { websiteId, sessionId, sessionData } = data;
const jsonKeys = flattenJSON(sessionData); const jsonKeys = flattenJSON(sessionData);
@ -18,21 +32,73 @@ export async function saveSessionData(data: {
id: uuid(), id: uuid(),
websiteId, websiteId,
sessionId, sessionId,
key: a.key, sessionKey: a.key,
stringValue: getStringValue(a.value, a.dataType), stringValue: getStringValue(a.value, a.dataType),
numberValue: a.dataType === DATA_TYPE.number ? a.value : null, numberValue: a.dataType === DATA_TYPE.number ? a.value : null,
dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null, dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null,
dataType: a.dataType, dataType: a.dataType,
})); }));
return transaction([ const existing = await client.sessionData.findMany({
client.sessionData.deleteMany({ where: {
where: { sessionId,
sessionId, },
}, select: {
}), id: true,
client.sessionData.createMany({ sessionId: true,
data: flattenedData as any, sessionKey: true,
}), },
]); });
for (const data of flattenedData) {
const { sessionId, sessionKey, ...props } = data;
const record = existing.find(e => e.sessionId === sessionId && e.sessionKey === sessionKey);
if (record) {
await client.sessionData.update({
where: {
id: record.id,
},
data: {
...props,
},
});
} else {
await client.sessionData.create({
data,
});
}
}
return flattenedData;
}
async function clickhouseQuery(data: {
websiteId: string;
sessionId: string;
sessionData: DynamicData;
createdAt?: string;
}) {
const { websiteId, sessionId, sessionData, createdAt } = data;
const { getDateFormat, sendMessages } = kafka;
const jsonKeys = flattenJSON(sessionData);
const messages = jsonKeys.map(({ key, value, dataType }) => {
return {
website_id: websiteId,
session_id: sessionId,
data_key: key,
data_type: dataType,
string_value: getStringValue(value, dataType),
number_value: dataType === DATA_TYPE.number ? value : null,
date_value: dataType === DATA_TYPE.date ? getDateFormat(value) : null,
created_at: createdAt,
};
});
await sendMessages(messages, 'session_data');
return data;
} }