update kafka messaging

This commit is contained in:
Brian Cao 2022-08-25 11:07:47 -07:00
parent 9fea2fc77c
commit 46b4b98d40
6 changed files with 31 additions and 15 deletions

View File

@ -2,7 +2,7 @@ import { PrismaClient } from '@prisma/client';
import { ClickHouse } from 'clickhouse'; import { ClickHouse } from 'clickhouse';
import dateFormat from 'dateformat'; import dateFormat from 'dateformat';
import chalk from 'chalk'; import chalk from 'chalk';
import { getKafkaService } from './kafka'; import { getKafkaService } from './db/kafka';
import { import {
MYSQL, MYSQL,
MYSQL_DATE_FORMATS, MYSQL_DATE_FORMATS,

View File

@ -1,4 +1,4 @@
import { Kafka } from 'kafkajs'; import { Kafka, logLevel } from 'kafkajs';
import dateFormat from 'dateformat'; import dateFormat from 'dateformat';
export function getKafkaClient() { export function getKafkaClient() {
@ -14,6 +14,7 @@ export function getKafkaClient() {
clientId: 'umami', clientId: 'umami',
brokers: brokers, brokers: brokers,
connectionTimeout: 3000, connectionTimeout: 3000,
logLevel: logLevel.ERROR,
}); });
} else { } else {
return new Kafka({ return new Kafka({
@ -29,20 +30,29 @@ export function getKafkaClient() {
}); });
} }
} }
const kafka = global.kafka || getKafkaClient(); const kafka = global.kafka || getKafkaClient();
let kafkaProducer = null;
if (process.env.NODE_ENV !== 'production') { (async () => {
global.kafka = kafka; kafkaProducer = global.kakfaProducer || (await getKafkaProducer());
}
export { kafka }; if (process.env.NODE_ENV !== 'production') {
global.kafka = kafka;
global.kakfaProducer = kafkaProducer;
}
})();
export async function kafkaProducer(params, topic) { export { kafka, kafkaProducer };
export async function getKafkaProducer() {
const producer = kafka.producer(); const producer = kafka.producer();
await producer.connect(); await producer.connect();
await producer.send({ return producer;
}
export async function sendKafkaMessage(params, topic) {
await kafkaProducer.send({
topic, topic,
messages: [ messages: [
{ {
@ -50,6 +60,7 @@ export async function kafkaProducer(params, topic) {
value: JSON.stringify(params), value: JSON.stringify(params),
}, },
], ],
acks: 0,
}); });
} }

5
pages/api/heartbeat.js Normal file
View File

@ -0,0 +1,5 @@
import { ok } from 'lib/response';
export default async (req, res) => {
return ok(res, 'nice');
};

View File

@ -6,7 +6,7 @@ import {
runAnalyticsQuery, runAnalyticsQuery,
runQuery, runQuery,
} from 'lib/db'; } from 'lib/db';
import { kafkaProducer, getDateFormatKafka } from 'lib/kafka'; import { sendKafkaMessage, getDateFormatKafka } from 'lib/db/kafka';
export async function saveEvent(...args) { export async function saveEvent(...args) {
return runAnalyticsQuery({ return runAnalyticsQuery({
@ -66,5 +66,5 @@ async function kafkaQuery(website_id, { event_uuid, session_uuid, url, event_nam
event_name: event_name?.substr(0, 50), event_name: event_name?.substr(0, 50),
}; };
await kafkaProducer(params, 'event'); await sendKafkaMessage(params, 'event');
} }

View File

@ -6,7 +6,7 @@ import {
runAnalyticsQuery, runAnalyticsQuery,
runQuery, runQuery,
} from 'lib/db'; } from 'lib/db';
import { kafkaProducer, getDateFormatKafka } from 'lib/kafka'; import { sendKafkaMessage, getDateFormatKafka } from 'lib/db/kafka';
export async function savePageView(...args) { export async function savePageView(...args) {
return runAnalyticsQuery({ return runAnalyticsQuery({
@ -54,5 +54,5 @@ async function kafkaQuery(website_id, { session_uuid, url, referrer }) {
referrer: referrer?.substr(0, URL_LENGTH), referrer: referrer?.substr(0, URL_LENGTH),
}; };
await kafkaProducer(params, 'pageview'); await sendKafkaMessage(params, 'pageview');
} }

View File

@ -6,7 +6,7 @@ import {
runAnalyticsQuery, runAnalyticsQuery,
runQuery, runQuery,
} from 'lib/db'; } from 'lib/db';
import { kafkaProducer, getDateFormatKafka } from 'lib/kafka'; import { sendKafkaMessage, getDateFormatKafka } from 'lib/db/kafka';
import { getSessionByUuid } from 'queries'; import { getSessionByUuid } from 'queries';
export async function createSession(...args) { export async function createSession(...args) {
@ -73,7 +73,7 @@ async function kafkaQuery(
country: country ? country : null, country: country ? country : null,
}; };
await kafkaProducer(params, 'session'); await sendKafkaMessage(params, 'session');
return getSessionByUuid(session_uuid); return getSessionByUuid(session_uuid);
} }