diff --git a/lib/db.js b/lib/db.js index 34017aef..8d8091fa 100644 --- a/lib/db.js +++ b/lib/db.js @@ -1,5 +1,6 @@ import { PrismaClient } from '@prisma/client'; import { ClickHouse } from 'clickhouse'; +import { Kafka } from 'kafkajs'; import chalk from 'chalk'; import { MYSQL, @@ -9,6 +10,7 @@ import { CLICKHOUSE, RELATIONAL, FILTER_IGNORED, + KAFKA, } from 'lib/constants'; import moment from 'moment-timezone'; import { CLICKHOUSE_DATE_FORMATS } from './constants'; @@ -69,7 +71,6 @@ function getKafkaClient() { return null; } - const { Kafka } = require('kafkajs'); const url = new URL(process.env.KAFKA_URL); const brokers = process.env.KAFKA_BROKER.split(','); @@ -117,9 +118,7 @@ export function getDatabase() { } export function getAnalyticsDatabase() { - const type = - process.env.ANALYTICS_TYPE || - (process.env.ANALYTICS_URL && process.env.ANALYTICS_URL.split(':')[0]); + const type = process.env.ANALYTICS_URL && process.env.ANALYTICS_URL.split(':')[0]; if (type === 'postgres') { return POSTGRESQL; @@ -132,6 +131,12 @@ export function getAnalyticsDatabase() { return type; } +export function getKafkaService() { + const type = process.env.KAFKA_URL && process.env.KAFKA_URL.split(':')[0]; + + return type; +} + export function getDateStringQueryClickhouse(data, unit) { return `formatDateTime(${data}, '${CLICKHOUSE_DATE_FORMATS[unit]}')`; } @@ -321,9 +326,12 @@ export async function runAnalyticsQuery(queries) { if (db === POSTGRESQL || db === MYSQL) { return queries[RELATIONAL](); } - 59; if (db === CLICKHOUSE) { + const kafka = getKafkaService(); + if (kafka === KAFKA && queries[KAFKA]) { + return queries[KAFKA](); + } return queries[CLICKHOUSE](); } } diff --git a/queries/analytics/event/saveEvent.js b/queries/analytics/event/saveEvent.js index 41c068c3..47659f64 100644 --- a/queries/analytics/event/saveEvent.js +++ b/queries/analytics/event/saveEvent.js @@ -1,16 +1,18 @@ -import { CLICKHOUSE, RELATIONAL, URL_LENGTH } from 'lib/constants'; +import { CLICKHOUSE, RELATIONAL, KAFKA, URL_LENGTH } from 'lib/constants'; import { getDateFormatClickhouse, prisma, rawQueryClickhouse, runAnalyticsQuery, runQuery, + kafkaProducer, } from 'lib/db'; export async function saveEvent(...args) { return runAnalyticsQuery({ [RELATIONAL]: () => relationalQuery(...args), [CLICKHOUSE]: () => clickhouseQuery(...args), + [KAFKA]: () => kafkaQuery(...args), }); } @@ -44,3 +46,15 @@ async function clickhouseQuery(website_id, { session_uuid, url, event_type, even params, ); } + +async function kafkaQuery(website_id, { session_uuid, url, event_type, event_value }) { + const params = { + website_id: website_id, + session_uuid: session_uuid, + url: url?.substr(0, URL_LENGTH), + event_type: event_type?.substr(0, 50), + event_value: event_value?.substr(0, 50), + }; + + await kafkaProducer(params, 'event'); +} diff --git a/queries/analytics/pageview/savePageView.js b/queries/analytics/pageview/savePageView.js index c165b56d..2eb45606 100644 --- a/queries/analytics/pageview/savePageView.js +++ b/queries/analytics/pageview/savePageView.js @@ -1,16 +1,18 @@ -import { CLICKHOUSE, RELATIONAL, URL_LENGTH } from 'lib/constants'; +import { CLICKHOUSE, RELATIONAL, KAFKA, URL_LENGTH } from 'lib/constants'; import { getDateFormatClickhouse, prisma, rawQueryClickhouse, runAnalyticsQuery, runQuery, + kafkaProducer, } from 'lib/db'; export async function savePageView(...args) { return runAnalyticsQuery({ [RELATIONAL]: () => relationalQuery(...args), [CLICKHOUSE]: () => clickhouseQuery(...args), + [KAFKA]: () => kafkaQuery(...args), }); } @@ -42,3 +44,14 @@ async function clickhouseQuery(website_id, { session_uuid, url, referrer }) { params, ); } + +async function kafkaQuery(website_id, { session_uuid, url, referrer }) { + const params = { + website_id: website_id, + session_uuid: session_uuid, + url: url?.substr(0, URL_LENGTH), + referrer: referrer?.substr(0, URL_LENGTH), + }; + + await kafkaProducer(params, 'pageview'); +} diff --git a/queries/analytics/session/createSession.js b/queries/analytics/session/createSession.js index f5942712..069d123f 100644 --- a/queries/analytics/session/createSession.js +++ b/queries/analytics/session/createSession.js @@ -1,4 +1,4 @@ -import { CLICKHOUSE, RELATIONAL } from 'lib/constants'; +import { CLICKHOUSE, RELATIONAL, KAFKA } from 'lib/constants'; import { getDateFormatClickhouse, prisma, @@ -13,6 +13,7 @@ export async function createSession(...args) { return runAnalyticsQuery({ [RELATIONAL]: () => relationalQuery(...args), [CLICKHOUSE]: () => clickhouseQuery(...args), + [KAFKA]: () => kafkaQuery(...args), }); } @@ -33,6 +34,31 @@ async function relationalQuery(website_id, data) { async function clickhouseQuery( website_id, { session_uuid, hostname, browser, os, screen, language, country, device }, +) { + const params = [ + session_uuid, + website_id, + hostname, + browser, + os, + device, + screen, + language, + country ? country : null, + ]; + + await rawQueryClickhouse( + `insert into umami_dev.session (created_at, session_uuid, website_id, hostname, browser, os, device, screen, language, country) + values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4, $5, $6, $7, $8, $9);`, + params, + ); + + return getSessionByUuid(session_uuid); +} + +async function kafkaQuery( + website_id, + { session_uuid, hostname, browser, os, screen, language, country, device }, ) { const params = { session_uuid: session_uuid, @@ -46,17 +72,7 @@ async function clickhouseQuery( country: country ? country : null, }; - if (process.env.KAFKA_URL) { - await kafkaProducer(params, 'session'); - } else { - const paramsValue = Object.keys(params); - - await rawQueryClickhouse( - `insert into umami_dev.session (created_at, session_uuid, website_id, hostname, browser, os, device, screen, language, country) - values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4, $5, $6, $7, $8, $9);`, - paramsValue, - ); - } + await kafkaProducer(params, 'session'); return getSessionByUuid(session_uuid); }