From 192dd218cb16f4a3c748880ca2c25e3ddbbf67dd Mon Sep 17 00:00:00 2001 From: Francis Cao Date: Fri, 12 Aug 2022 09:21:43 -0700 Subject: [PATCH] move kafka logic into own file --- .../migrations/01_init/migration.sql | 4 +- lib/db.js | 58 +---------------- lib/kafka.js | 62 +++++++++++++++++++ 3 files changed, 67 insertions(+), 57 deletions(-) create mode 100644 lib/kafka.js diff --git a/db/clickhouse/migrations/01_init/migration.sql b/db/clickhouse/migrations/01_init/migration.sql index e6c39aa2..b155eb87 100644 --- a/db/clickhouse/migrations/01_init/migration.sql +++ b/db/clickhouse/migrations/01_init/migration.sql @@ -95,6 +95,7 @@ CREATE TABLE event created_at DateTime('UTC'), url String, event_name String + event_data JSONB, ) engine = MergeTree PRIMARY KEY (event_uuid, created_at) ORDER BY (event_uuid, created_at) @@ -106,7 +107,8 @@ CREATE TABLE event_queue ( session_uuid UUID, created_at DateTime('UTC'), url String, - event_name String + event_name String, + event_data String, ) ENGINE = Kafka SETTINGS kafka_broker_list = '', -- input broker list diff --git a/lib/db.js b/lib/db.js index 04f368dc..ff62da03 100644 --- a/lib/db.js +++ b/lib/db.js @@ -1,8 +1,8 @@ import { PrismaClient } from '@prisma/client'; import { ClickHouse } from 'clickhouse'; -import { Kafka } from 'kafkajs'; import dateFormat from 'dateformat'; import chalk from 'chalk'; +import { getKafkaService } from './kafka'; import { MYSQL, MYSQL_DATE_FORMATS, @@ -67,44 +67,15 @@ function getClickhouseClient() { }); } -function getKafkaClient() { - if (!process.env.KAFKA_URL) { - return null; - } - - const url = new URL(process.env.KAFKA_URL); - const brokers = process.env.KAFKA_BROKER.split(','); - - if (url.username.length === 0 && url.password.length === 0) { - return new Kafka({ - clientId: 'umami', - brokers: brokers, - }); - } else { - return new Kafka({ - clientId: 'umami', - brokers: brokers, - ssl: true, - sasl: { - mechanism: 'plain', - username: url.username, - password: url.password, - }, - }); - } -} - const prisma = global.prisma || getPrismaClient(options); const clickhouse = global.clickhouse || getClickhouseClient(); -const kafka = global.kafka || getKafkaClient(); if (process.env.NODE_ENV !== 'production') { global.prisma = prisma; global.clickhouse = clickhouse; - global.kafka = kafka; } -export { prisma, clickhouse, kafka }; +export { prisma, clickhouse }; export function getDatabase() { const type = @@ -132,12 +103,6 @@ export function getAnalyticsDatabase() { return type; } -export function getKafkaService() { - const type = process.env.KAFKA_URL && process.env.KAFKA_URL.split(':')[0]; - - return type; -} - export function getDateStringQueryClickhouse(data, unit) { return `formatDateTime(${data}, '${CLICKHOUSE_DATE_FORMATS[unit]}')`; } @@ -341,22 +306,3 @@ export async function runAnalyticsQuery(queries) { return queries[CLICKHOUSE](); } } - -export async function kafkaProducer(params, topic) { - const producer = kafka.producer(); - await producer.connect(); - - await producer.send({ - topic, - messages: [ - { - key: 'key', - value: JSON.stringify(params), - }, - ], - }); -} - -export function getDateFormatKafka(date) { - return dateFormat(date, 'UTC:yyyy-mm-dd HH:MM:ss'); -} diff --git a/lib/kafka.js b/lib/kafka.js new file mode 100644 index 00000000..475109bd --- /dev/null +++ b/lib/kafka.js @@ -0,0 +1,62 @@ +import { Kafka } from 'kafkajs'; +import dateFormat from 'dateformat'; + +export function getKafkaClient() { + if (!process.env.KAFKA_URL) { + return null; + } + + const url = new URL(process.env.KAFKA_URL); + const brokers = process.env.KAFKA_BROKER.split(','); + + if (url.username.length === 0 && url.password.length === 0) { + return new Kafka({ + clientId: 'umami', + brokers: brokers, + }); + } else { + return new Kafka({ + clientId: 'umami', + brokers: brokers, + ssl: true, + sasl: { + mechanism: 'plain', + username: url.username, + password: url.password, + }, + }); + } +} + +const kafka = global.kafka || getKafkaClient(); + +if (process.env.NODE_ENV !== 'production') { + global.kafka = kafka; +} + +export { kafka }; + +export async function kafkaProducer(params, topic) { + const producer = kafka.producer(); + await producer.connect(); + + await producer.send({ + topic, + messages: [ + { + key: 'key', + value: JSON.stringify(params), + }, + ], + }); +} + +export function getDateFormatKafka(date) { + return dateFormat(date, 'UTC:yyyy-mm-dd HH:MM:ss'); +} + +export function getKafkaService() { + const type = process.env.KAFKA_URL && process.env.KAFKA_URL.split(':')[0]; + + return type; +}