From d952214536cdd863c9eca0e3505216057b6e7ebe Mon Sep 17 00:00:00 2001 From: Francis Cao Date: Mon, 1 Aug 2022 00:28:38 -0700 Subject: [PATCH] check in producer test logic --- kafka/eventProducer.js | 76 +++++++++++++++++++++ kafka/pageviewProducer.js | 75 ++++++++++++++++++++ kafka/sessionProducer.js | 79 ++++++++++++++++++++++ kafka/testrun.js | 29 ++++++++ lib/constants.js | 1 + lib/db.js | 48 ++++++++++++- package.json | 1 + process.env | 2 + queries/analytics/session/createSession.js | 39 ++++++----- yarn.lock | 5 ++ 10 files changed, 338 insertions(+), 17 deletions(-) create mode 100644 kafka/eventProducer.js create mode 100644 kafka/pageviewProducer.js create mode 100644 kafka/sessionProducer.js create mode 100644 kafka/testrun.js diff --git a/kafka/eventProducer.js b/kafka/eventProducer.js new file mode 100644 index 00000000..065ae282 --- /dev/null +++ b/kafka/eventProducer.js @@ -0,0 +1,76 @@ +// import the `Kafka` instance from the kafkajs library +const { Kafka } = require('kafkajs'); + +// the client ID lets kafka know who's producing the messages +const clientId = 'my-app'; +// we can define the list of brokers in the cluster +const brokers = ['localhost:9092', 'localhost:9093', 'localhost:9094']; +// this is the topic to which we want to write messages +const topic = 'event'; + +// initialize a new kafka client and initialize a producer from it +const kafka = new Kafka({ clientId, brokers }); +const { Partitioners } = require('kafkajs'); + +const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner }); + +// we define an async function that writes a new message each second +async function produce_event() { + await producer.connect(); + let i = 0; + + // after the produce has connected, we start an interval timer + setInterval(async () => { + try { + // send a message to the configured topic with + // the key and value formed from the current value of `i` + let y = Math.random() + .toString(36) + .replace(/[^a-z]+/g, '') + .substr(0, 5); + let z = Math.random() + .toString(36) + .replace(/[^a-z]+/g, '') + .substr(0, 5); + let x = { + event_uuid: '00fea66e-a433-536d-a13d-2d873fab0a08', + website_id: i, + session_uuid: '00fea66e-a433-536d-a13d-2d873fab0a08', + created_at: '2020-07-18 11:53:33', + url: y, + event_data: z, + }; + + await producer.send({ + topic, + messages: [ + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + ], + }); + i++; + } catch (err) { + console.error('could not write message ' + err); + } + }, 4); +} + +module.exports = produce_event; diff --git a/kafka/pageviewProducer.js b/kafka/pageviewProducer.js new file mode 100644 index 00000000..16551b4c --- /dev/null +++ b/kafka/pageviewProducer.js @@ -0,0 +1,75 @@ +// import the `Kafka` instance from the kafkajs library +const { Kafka } = require('kafkajs'); + +// the client ID lets kafka know who's producing the messages +const clientId = 'my-app'; +// we can define the list of brokers in the cluster +const brokers = ['localhost:9092', 'localhost:9093', 'localhost:9094']; +// this is the topic to which we want to write messages +const topic = 'pageview'; + +// initialize a new kafka client and initialize a producer from it +const kafka = new Kafka({ clientId, brokers }); +const { Partitioners } = require('kafkajs'); + +const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner }); + +// we define an async function that writes a new message each second +async function produce_pageview() { + await producer.connect(); + let i = 0; + + // after the produce has connected, we start an interval timer + setInterval(async () => { + try { + // send a message to the configured topic with + // the key and value formed from the current value of `i` + let y = Math.random() + .toString(36) + .replace(/[^a-z]+/g, '') + .substr(0, 5); + let z = Math.random() + .toString(36) + .replace(/[^a-z]+/g, '') + .substr(0, 5); + let x = { + website_id: i, + session_uuid: '00fea66e-a433-536d-a13d-2d873fab0a08', + created_at: '2020-07-18 11:53:33', + url: y, + referrer: z, + }; + + await producer.send({ + topic, + messages: [ + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + ], + }); + i++; + } catch (err) { + console.error('could not write message ' + err); + } + }, 4); +} + +module.exports = produce_pageview; diff --git a/kafka/sessionProducer.js b/kafka/sessionProducer.js new file mode 100644 index 00000000..ebc15b93 --- /dev/null +++ b/kafka/sessionProducer.js @@ -0,0 +1,79 @@ +// import the `Kafka` instance from the kafkajs library +const { Kafka } = require('kafkajs'); + +// the client ID lets kafka know who's producing the messages +const clientId = 'my-app'; +// we can define the list of brokers in the cluster +const brokers = ['localhost:9092', 'localhost:9093', 'localhost:9094']; +// this is the topic to which we want to write messages +const topic = 'session'; + +// initialize a new kafka client and initialize a producer from it +const kafka = new Kafka({ clientId, brokers }); +const { Partitioners } = require('kafkajs'); + +const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner }); + +// we define an async function that writes a new message each second +async function produce_session() { + await producer.connect(); + let i = 0; + + // after the produce has connected, we start an interval timer + setInterval(async () => { + try { + // send a message to the configured topic with + // the key and value formed from the current value of `i` + let y = Math.random() + .toString(36) + .replace(/[^a-z]+/g, '') + .substr(0, 5); + let z = Math.random() + .toString(36) + .replace(/[^a-z]+/g, '') + .substr(0, 5); + const x = { + session_uuid: '00fea66e-a433-536d-a13d-2d873fab0a08', + website_id: i, + hostname: z, + browser: y, + os: z, + device: y, + screen: z, + language: y, + country: z, + }; + + await producer.send({ + topic, + messages: [ + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + { + key: 'my-key', + value: JSON.stringify(x), + }, + ], + }); + i++; + } catch (err) { + console.error('could not write message ' + err); + } + }, 4); +} + +module.exports = produce_session; diff --git a/kafka/testrun.js b/kafka/testrun.js new file mode 100644 index 00000000..3e4f540f --- /dev/null +++ b/kafka/testrun.js @@ -0,0 +1,29 @@ +const produce_session = require('./sessionProducer'); +const produce_pageview = require('./pageviewProducer'); +const produce_event = require('./eventProducer'); + +// call the `produce` function and log an error if it occurs +produce_pageview().catch(err => { + console.error('error in producer_pageview: ', err); +}); + +produce_session().catch(err => { + console.error('error in producer_session: ', err); +}); + +produce_event().catch(err => { + console.error('error in producer_event: ', err); +}); + +// const { Kafka } = require('kafkajs') + +// const KAFKA_URL="kafka://localhost:9092/"; +// const KAFKA_BROKER="localhost:9092,localhost:9093,localhost:9094" + +// const url = new URL(KAFKA_URL); +// const database = url.pathname.replace('/', ''); +// var brokers = KAFKA_BROKER.split(','); + +// console.log(url); +// console.log(database); +// console.log(brokers); diff --git a/lib/constants.js b/lib/constants.js index f971ee02..f9cdec53 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -69,6 +69,7 @@ export const RELATIONAL = 'relational'; export const POSTGRESQL = 'postgresql'; export const MYSQL = 'mysql'; export const CLICKHOUSE = 'clickhouse'; +export const KAFKA = 'kafka'; export const MYSQL_DATE_FORMATS = { minute: '%Y-%m-%d %H:%i:00', diff --git a/lib/db.js b/lib/db.js index cb9237bb..34017aef 100644 --- a/lib/db.js +++ b/lib/db.js @@ -64,15 +64,45 @@ function getClickhouseClient() { }); } +function getKafkaClient() { + if (!process.env.KAFKA_URL) { + return null; + } + + const { Kafka } = require('kafkajs'); + const url = new URL(process.env.KAFKA_URL); + const brokers = process.env.KAFKA_BROKER.split(','); + + if (url.username.length === 0 && url.password.length === 0) { + return new Kafka({ + clientId: 'umami', + brokers: brokers, + }); + } else { + return new Kafka({ + clientId: 'umami', + brokers: brokers, + ssl: true, + sasl: { + mechanism: 'plain', + username: url.username, + password: url.password, + }, + }); + } +} + const prisma = global.prisma || getPrismaClient(options); const clickhouse = global.clickhouse || getClickhouseClient(); +const kafka = global.kafka || getKafkaClient(); if (process.env.NODE_ENV !== 'production') { global.prisma = prisma; global.clickhouse = clickhouse; + global.kafka = kafka; } -export { prisma, clickhouse }; +export { prisma, clickhouse, kafka }; export function getDatabase() { const type = @@ -291,8 +321,24 @@ export async function runAnalyticsQuery(queries) { if (db === POSTGRESQL || db === MYSQL) { return queries[RELATIONAL](); } + 59; if (db === CLICKHOUSE) { return queries[CLICKHOUSE](); } } + +export async function kafkaProducer(params, topic) { + const producer = kafka.producer(); + await producer.connect(); + + await producer.send({ + topic, + messages: [ + { + key: 'key', + value: JSON.stringify(params), + }, + ], + }); +} diff --git a/package.json b/package.json index 5ddab6e7..58968945 100644 --- a/package.json +++ b/package.json @@ -78,6 +78,7 @@ "is-localhost-ip": "^1.4.0", "isbot": "^3.4.5", "jose": "2.0.5", + "kafkajs": "^2.1.0", "maxmind": "^4.3.6", "moment-timezone": "^0.5.33", "next": "^12.2.0", diff --git a/process.env b/process.env index d33ca31b..cc7da3f0 100644 --- a/process.env +++ b/process.env @@ -1,6 +1,8 @@ #DATABASE_URL="postgresql://umami:rainbow-unicorn-dog@5.161.101.112:5432/umami?schema=analytics&connection_limit=1" DATABASE_URL="postgresql://umami:shhhthisissupersecret!@164.92.98.175:5432/umami?schema=umami" ANALYTICS_URL="clickhouse://default:shhhthisissupersecret!@164.92.95.2:8123/umami_dev" +KAFKA_URL="kafka://username:password!@localhost/"; +KAFKA_BROKER="localhost:9092,localhost:9093,localhost:9094" HASH_SALT="mySalt!!~" LOG_QUERY=1 diff --git a/queries/analytics/session/createSession.js b/queries/analytics/session/createSession.js index 1af95fe6..f5942712 100644 --- a/queries/analytics/session/createSession.js +++ b/queries/analytics/session/createSession.js @@ -5,6 +5,7 @@ import { rawQueryClickhouse, runAnalyticsQuery, runQuery, + kafkaProducer, } from 'lib/db'; import { getSessionByUuid } from 'queries'; @@ -33,23 +34,29 @@ async function clickhouseQuery( website_id, { session_uuid, hostname, browser, os, screen, language, country, device }, ) { - const params = [ - session_uuid, - website_id, - hostname, - browser, - os, - device, - screen, - language, - country ? country : null, - ]; + const params = { + session_uuid: session_uuid, + website_id: website_id, + hostname: hostname, + browser: browser, + os: os, + device: device, + screen: screen, + language: language, + country: country ? country : null, + }; - await rawQueryClickhouse( - `insert into umami_dev.session (created_at, session_uuid, website_id, hostname, browser, os, device, screen, language, country) - values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4, $5, $6, $7, $8, $9);`, - params, - ); + if (process.env.KAFKA_URL) { + await kafkaProducer(params, 'session'); + } else { + const paramsValue = Object.keys(params); + + await rawQueryClickhouse( + `insert into umami_dev.session (created_at, session_uuid, website_id, hostname, browser, os, device, screen, language, country) + values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4, $5, $6, $7, $8, $9);`, + paramsValue, + ); + } return getSessionByUuid(session_uuid); } diff --git a/yarn.lock b/yarn.lock index 144118b7..17073ea0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4249,6 +4249,11 @@ jsx-ast-utils@^3.3.1: array-includes "^3.1.5" object.assign "^4.1.2" +kafkajs@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.1.0.tgz#32ede4e8080cc75586c5e4406eeb582fa73f7b1e" + integrity sha512-6IYiOdGWvFPbSbVB+AV3feT+A7vzw5sXm7Ze4QTfP7FRNdY8pGcpiNPvD2lfgYFD8Dm9KbMgBgTt2mf8KaIkzw== + kind-of@^6.0.2, kind-of@^6.0.3: version "6.0.3" resolved "https://registry.yarnpkg.com/kind-of/-/kind-of-6.0.3.tgz#07c05034a6c349fa06e24fa35aa76db4580ce4dd"