From 46b4b98d407cd3958d0eeadb94b59613ff59931b Mon Sep 17 00:00:00 2001 From: Brian Cao Date: Thu, 25 Aug 2022 11:07:47 -0700 Subject: [PATCH] update kafka messaging --- lib/db.js | 2 +- lib/{ => db}/kafka.js | 27 +++++++++++++++------- pages/api/heartbeat.js | 5 ++++ queries/analytics/event/saveEvent.js | 4 ++-- queries/analytics/pageview/savePageView.js | 4 ++-- queries/analytics/session/createSession.js | 4 ++-- 6 files changed, 31 insertions(+), 15 deletions(-) rename lib/{ => db}/kafka.js (69%) create mode 100644 pages/api/heartbeat.js diff --git a/lib/db.js b/lib/db.js index ff62da03..30fa79f6 100644 --- a/lib/db.js +++ b/lib/db.js @@ -2,7 +2,7 @@ import { PrismaClient } from '@prisma/client'; import { ClickHouse } from 'clickhouse'; import dateFormat from 'dateformat'; import chalk from 'chalk'; -import { getKafkaService } from './kafka'; +import { getKafkaService } from './db/kafka'; import { MYSQL, MYSQL_DATE_FORMATS, diff --git a/lib/kafka.js b/lib/db/kafka.js similarity index 69% rename from lib/kafka.js rename to lib/db/kafka.js index 988062b9..e2256d27 100644 --- a/lib/kafka.js +++ b/lib/db/kafka.js @@ -1,4 +1,4 @@ -import { Kafka } from 'kafkajs'; +import { Kafka, logLevel } from 'kafkajs'; import dateFormat from 'dateformat'; export function getKafkaClient() { @@ -14,6 +14,7 @@ export function getKafkaClient() { clientId: 'umami', brokers: brokers, connectionTimeout: 3000, + logLevel: logLevel.ERROR, }); } else { return new Kafka({ @@ -29,20 +30,29 @@ export function getKafkaClient() { }); } } - const kafka = global.kafka || getKafkaClient(); +let kafkaProducer = null; -if (process.env.NODE_ENV !== 'production') { - global.kafka = kafka; -} +(async () => { + kafkaProducer = global.kakfaProducer || (await getKafkaProducer()); -export { kafka }; + if (process.env.NODE_ENV !== 'production') { + global.kafka = kafka; + global.kakfaProducer = kafkaProducer; + } +})(); -export async function kafkaProducer(params, topic) { +export { kafka, kafkaProducer }; + +export async function getKafkaProducer() { const producer = kafka.producer(); await producer.connect(); - await producer.send({ + return producer; +} + +export async function sendKafkaMessage(params, topic) { + await kafkaProducer.send({ topic, messages: [ { @@ -50,6 +60,7 @@ export async function kafkaProducer(params, topic) { value: JSON.stringify(params), }, ], + acks: 0, }); } diff --git a/pages/api/heartbeat.js b/pages/api/heartbeat.js new file mode 100644 index 00000000..a53213ad --- /dev/null +++ b/pages/api/heartbeat.js @@ -0,0 +1,5 @@ +import { ok } from 'lib/response'; + +export default async (req, res) => { + return ok(res, 'nice'); +}; diff --git a/queries/analytics/event/saveEvent.js b/queries/analytics/event/saveEvent.js index 0786042c..ff4d5256 100644 --- a/queries/analytics/event/saveEvent.js +++ b/queries/analytics/event/saveEvent.js @@ -6,7 +6,7 @@ import { runAnalyticsQuery, runQuery, } from 'lib/db'; -import { kafkaProducer, getDateFormatKafka } from 'lib/kafka'; +import { sendKafkaMessage, getDateFormatKafka } from 'lib/db/kafka'; export async function saveEvent(...args) { return runAnalyticsQuery({ @@ -66,5 +66,5 @@ async function kafkaQuery(website_id, { event_uuid, session_uuid, url, event_nam event_name: event_name?.substr(0, 50), }; - await kafkaProducer(params, 'event'); + await sendKafkaMessage(params, 'event'); } diff --git a/queries/analytics/pageview/savePageView.js b/queries/analytics/pageview/savePageView.js index 0dd93b44..af0d88f6 100644 --- a/queries/analytics/pageview/savePageView.js +++ b/queries/analytics/pageview/savePageView.js @@ -6,7 +6,7 @@ import { runAnalyticsQuery, runQuery, } from 'lib/db'; -import { kafkaProducer, getDateFormatKafka } from 'lib/kafka'; +import { sendKafkaMessage, getDateFormatKafka } from 'lib/db/kafka'; export async function savePageView(...args) { return runAnalyticsQuery({ @@ -54,5 +54,5 @@ async function kafkaQuery(website_id, { session_uuid, url, referrer }) { referrer: referrer?.substr(0, URL_LENGTH), }; - await kafkaProducer(params, 'pageview'); + await sendKafkaMessage(params, 'pageview'); } diff --git a/queries/analytics/session/createSession.js b/queries/analytics/session/createSession.js index 5c4b982c..565d45a2 100644 --- a/queries/analytics/session/createSession.js +++ b/queries/analytics/session/createSession.js @@ -6,7 +6,7 @@ import { runAnalyticsQuery, runQuery, } from 'lib/db'; -import { kafkaProducer, getDateFormatKafka } from 'lib/kafka'; +import { sendKafkaMessage, getDateFormatKafka } from 'lib/db/kafka'; import { getSessionByUuid } from 'queries'; export async function createSession(...args) { @@ -73,7 +73,7 @@ async function kafkaQuery( country: country ? country : null, }; - await kafkaProducer(params, 'session'); + await sendKafkaMessage(params, 'session'); return getSessionByUuid(session_uuid); }