move kafka logic into own file

This commit is contained in:
Francis Cao 2022-08-12 09:21:43 -07:00
parent 34e9ab811b
commit 192dd218cb
3 changed files with 67 additions and 57 deletions

View File

@ -95,6 +95,7 @@ CREATE TABLE event
created_at DateTime('UTC'),
url String,
event_name String
event_data JSONB,
)
engine = MergeTree PRIMARY KEY (event_uuid, created_at)
ORDER BY (event_uuid, created_at)
@ -106,7 +107,8 @@ CREATE TABLE event_queue (
session_uuid UUID,
created_at DateTime('UTC'),
url String,
event_name String
event_name String,
event_data String,
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '', -- input broker list

View File

@ -1,8 +1,8 @@
import { PrismaClient } from '@prisma/client';
import { ClickHouse } from 'clickhouse';
import { Kafka } from 'kafkajs';
import dateFormat from 'dateformat';
import chalk from 'chalk';
import { getKafkaService } from './kafka';
import {
MYSQL,
MYSQL_DATE_FORMATS,
@ -67,44 +67,15 @@ function getClickhouseClient() {
});
}
function getKafkaClient() {
if (!process.env.KAFKA_URL) {
return null;
}
const url = new URL(process.env.KAFKA_URL);
const brokers = process.env.KAFKA_BROKER.split(',');
if (url.username.length === 0 && url.password.length === 0) {
return new Kafka({
clientId: 'umami',
brokers: brokers,
});
} else {
return new Kafka({
clientId: 'umami',
brokers: brokers,
ssl: true,
sasl: {
mechanism: 'plain',
username: url.username,
password: url.password,
},
});
}
}
const prisma = global.prisma || getPrismaClient(options);
const clickhouse = global.clickhouse || getClickhouseClient();
const kafka = global.kafka || getKafkaClient();
if (process.env.NODE_ENV !== 'production') {
global.prisma = prisma;
global.clickhouse = clickhouse;
global.kafka = kafka;
}
export { prisma, clickhouse, kafka };
export { prisma, clickhouse };
export function getDatabase() {
const type =
@ -132,12 +103,6 @@ export function getAnalyticsDatabase() {
return type;
}
export function getKafkaService() {
const type = process.env.KAFKA_URL && process.env.KAFKA_URL.split(':')[0];
return type;
}
export function getDateStringQueryClickhouse(data, unit) {
return `formatDateTime(${data}, '${CLICKHOUSE_DATE_FORMATS[unit]}')`;
}
@ -341,22 +306,3 @@ export async function runAnalyticsQuery(queries) {
return queries[CLICKHOUSE]();
}
}
export async function kafkaProducer(params, topic) {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic,
messages: [
{
key: 'key',
value: JSON.stringify(params),
},
],
});
}
export function getDateFormatKafka(date) {
return dateFormat(date, 'UTC:yyyy-mm-dd HH:MM:ss');
}

62
lib/kafka.js Normal file
View File

@ -0,0 +1,62 @@
import { Kafka } from 'kafkajs';
import dateFormat from 'dateformat';
export function getKafkaClient() {
if (!process.env.KAFKA_URL) {
return null;
}
const url = new URL(process.env.KAFKA_URL);
const brokers = process.env.KAFKA_BROKER.split(',');
if (url.username.length === 0 && url.password.length === 0) {
return new Kafka({
clientId: 'umami',
brokers: brokers,
});
} else {
return new Kafka({
clientId: 'umami',
brokers: brokers,
ssl: true,
sasl: {
mechanism: 'plain',
username: url.username,
password: url.password,
},
});
}
}
const kafka = global.kafka || getKafkaClient();
if (process.env.NODE_ENV !== 'production') {
global.kafka = kafka;
}
export { kafka };
export async function kafkaProducer(params, topic) {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic,
messages: [
{
key: 'key',
value: JSON.stringify(params),
},
],
});
}
export function getDateFormatKafka(date) {
return dateFormat(date, 'UTC:yyyy-mm-dd HH:MM:ss');
}
export function getKafkaService() {
const type = process.env.KAFKA_URL && process.env.KAFKA_URL.split(':')[0];
return type;
}