From 9bf34bd5e48a86d7ee3038203b865022d75514e3 Mon Sep 17 00:00:00 2001 From: Mike Cao Date: Thu, 4 Jul 2024 20:30:15 -0700 Subject: [PATCH] Support Clickhouse insert without Kafka. --- src/lib/clickhouse.ts | 11 +++++++++-- src/queries/analytics/eventData/saveEventData.ts | 8 +++++++- src/queries/analytics/events/saveEvent.ts | 8 +++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/lib/clickhouse.ts b/src/lib/clickhouse.ts index b6d0fc3f..cd057cd7 100644 --- a/src/lib/clickhouse.ts +++ b/src/lib/clickhouse.ts @@ -32,7 +32,7 @@ function getClient() { } = new URL(process.env.CLICKHOUSE_URL); const client = createClient({ - host: `${protocol}//${hostname}:${port}`, + url: `${protocol}//${hostname}:${port}`, database: pathname.replace('/', ''), username: username, password, @@ -136,7 +136,13 @@ async function rawQuery( format: 'JSONEachRow', }); - return resultSet.json(); + return resultSet.json() as T; +} + +async function insert(table: string, values: any[]) { + await connect(); + + return clickhouse.insert({ table, values, format: 'JSONEachRow' }); } async function findUnique(data: any[]) { @@ -172,4 +178,5 @@ export default { findUnique, findFirst, rawQuery, + insert, }; diff --git a/src/queries/analytics/eventData/saveEventData.ts b/src/queries/analytics/eventData/saveEventData.ts index 0ed3c8b0..222e8b73 100644 --- a/src/queries/analytics/eventData/saveEventData.ts +++ b/src/queries/analytics/eventData/saveEventData.ts @@ -3,6 +3,7 @@ import { DATA_TYPE } from 'lib/constants'; import { uuid } from 'lib/crypto'; import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db'; import { flattenJSON, getStringValue } from 'lib/data'; +import clickhouse from 'lib/clickhouse'; import kafka from 'lib/kafka'; import prisma from 'lib/prisma'; import { DynamicData } from 'lib/types'; @@ -59,6 +60,7 @@ async function clickhouseQuery(data: { }) { const { websiteId, sessionId, eventId, urlPath, eventName, eventData, createdAt } = data; + const { insert } = clickhouse; const { getDateFormat, sendMessages } = kafka; const jsonKeys = flattenJSON(eventData); @@ -79,7 +81,11 @@ async function clickhouseQuery(data: { }; }); - await sendMessages(messages, 'event_data'); + if (kafka.enabled) { + await sendMessages(messages, 'event_data'); + } else { + await insert('event_data', messages); + } return data; } diff --git a/src/queries/analytics/events/saveEvent.ts b/src/queries/analytics/events/saveEvent.ts index 25bcf9e7..9a5787a3 100644 --- a/src/queries/analytics/events/saveEvent.ts +++ b/src/queries/analytics/events/saveEvent.ts @@ -1,5 +1,6 @@ import { EVENT_NAME_LENGTH, URL_LENGTH, EVENT_TYPE, PAGE_TITLE_LENGTH } from 'lib/constants'; import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db'; +import clickhouse from 'lib/clickhouse'; import kafka from 'lib/kafka'; import prisma from 'lib/prisma'; import { uuid } from 'lib/crypto'; @@ -134,6 +135,7 @@ async function clickhouseQuery(data: { city, ...args } = data; + const { insert } = clickhouse; const { getDateFormat, sendMessage } = kafka; const eventId = uuid(); const createdAt = getDateFormat(new Date()); @@ -164,7 +166,11 @@ async function clickhouseQuery(data: { created_at: createdAt, }; - await sendMessage(message, 'event'); + if (kafka.enabled) { + await sendMessage(message, 'event'); + } else { + await insert('website_event', [message]); + } if (eventData) { await saveEventData({