check in producer test logic

This commit is contained in:
Francis Cao 2022-08-01 00:28:38 -07:00
parent 6f4824582d
commit d952214536
10 changed files with 338 additions and 17 deletions

76
kafka/eventProducer.js Normal file
View File

@ -0,0 +1,76 @@
// import the `Kafka` instance from the kafkajs library
const { Kafka } = require('kafkajs');
// the client ID lets kafka know who's producing the messages
const clientId = 'my-app';
// we can define the list of brokers in the cluster
const brokers = ['localhost:9092', 'localhost:9093', 'localhost:9094'];
// this is the topic to which we want to write messages
const topic = 'event';
// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers });
const { Partitioners } = require('kafkajs');
const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner });
// we define an async function that writes a new message each second
async function produce_event() {
await producer.connect();
let i = 0;
// after the produce has connected, we start an interval timer
setInterval(async () => {
try {
// send a message to the configured topic with
// the key and value formed from the current value of `i`
let y = Math.random()
.toString(36)
.replace(/[^a-z]+/g, '')
.substr(0, 5);
let z = Math.random()
.toString(36)
.replace(/[^a-z]+/g, '')
.substr(0, 5);
let x = {
event_uuid: '00fea66e-a433-536d-a13d-2d873fab0a08',
website_id: i,
session_uuid: '00fea66e-a433-536d-a13d-2d873fab0a08',
created_at: '2020-07-18 11:53:33',
url: y,
event_data: z,
};
await producer.send({
topic,
messages: [
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
],
});
i++;
} catch (err) {
console.error('could not write message ' + err);
}
}, 4);
}
module.exports = produce_event;

75
kafka/pageviewProducer.js Normal file
View File

@ -0,0 +1,75 @@
// import the `Kafka` instance from the kafkajs library
const { Kafka } = require('kafkajs');
// the client ID lets kafka know who's producing the messages
const clientId = 'my-app';
// we can define the list of brokers in the cluster
const brokers = ['localhost:9092', 'localhost:9093', 'localhost:9094'];
// this is the topic to which we want to write messages
const topic = 'pageview';
// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers });
const { Partitioners } = require('kafkajs');
const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner });
// we define an async function that writes a new message each second
async function produce_pageview() {
await producer.connect();
let i = 0;
// after the produce has connected, we start an interval timer
setInterval(async () => {
try {
// send a message to the configured topic with
// the key and value formed from the current value of `i`
let y = Math.random()
.toString(36)
.replace(/[^a-z]+/g, '')
.substr(0, 5);
let z = Math.random()
.toString(36)
.replace(/[^a-z]+/g, '')
.substr(0, 5);
let x = {
website_id: i,
session_uuid: '00fea66e-a433-536d-a13d-2d873fab0a08',
created_at: '2020-07-18 11:53:33',
url: y,
referrer: z,
};
await producer.send({
topic,
messages: [
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
],
});
i++;
} catch (err) {
console.error('could not write message ' + err);
}
}, 4);
}
module.exports = produce_pageview;

79
kafka/sessionProducer.js Normal file
View File

@ -0,0 +1,79 @@
// import the `Kafka` instance from the kafkajs library
const { Kafka } = require('kafkajs');
// the client ID lets kafka know who's producing the messages
const clientId = 'my-app';
// we can define the list of brokers in the cluster
const brokers = ['localhost:9092', 'localhost:9093', 'localhost:9094'];
// this is the topic to which we want to write messages
const topic = 'session';
// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers });
const { Partitioners } = require('kafkajs');
const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner });
// we define an async function that writes a new message each second
async function produce_session() {
await producer.connect();
let i = 0;
// after the produce has connected, we start an interval timer
setInterval(async () => {
try {
// send a message to the configured topic with
// the key and value formed from the current value of `i`
let y = Math.random()
.toString(36)
.replace(/[^a-z]+/g, '')
.substr(0, 5);
let z = Math.random()
.toString(36)
.replace(/[^a-z]+/g, '')
.substr(0, 5);
const x = {
session_uuid: '00fea66e-a433-536d-a13d-2d873fab0a08',
website_id: i,
hostname: z,
browser: y,
os: z,
device: y,
screen: z,
language: y,
country: z,
};
await producer.send({
topic,
messages: [
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
{
key: 'my-key',
value: JSON.stringify(x),
},
],
});
i++;
} catch (err) {
console.error('could not write message ' + err);
}
}, 4);
}
module.exports = produce_session;

29
kafka/testrun.js Normal file
View File

@ -0,0 +1,29 @@
const produce_session = require('./sessionProducer');
const produce_pageview = require('./pageviewProducer');
const produce_event = require('./eventProducer');
// call the `produce` function and log an error if it occurs
produce_pageview().catch(err => {
console.error('error in producer_pageview: ', err);
});
produce_session().catch(err => {
console.error('error in producer_session: ', err);
});
produce_event().catch(err => {
console.error('error in producer_event: ', err);
});
// const { Kafka } = require('kafkajs')
// const KAFKA_URL="kafka://localhost:9092/";
// const KAFKA_BROKER="localhost:9092,localhost:9093,localhost:9094"
// const url = new URL(KAFKA_URL);
// const database = url.pathname.replace('/', '');
// var brokers = KAFKA_BROKER.split(',');
// console.log(url);
// console.log(database);
// console.log(brokers);

View File

@ -69,6 +69,7 @@ export const RELATIONAL = 'relational';
export const POSTGRESQL = 'postgresql';
export const MYSQL = 'mysql';
export const CLICKHOUSE = 'clickhouse';
export const KAFKA = 'kafka';
export const MYSQL_DATE_FORMATS = {
minute: '%Y-%m-%d %H:%i:00',

View File

@ -64,15 +64,45 @@ function getClickhouseClient() {
});
}
function getKafkaClient() {
if (!process.env.KAFKA_URL) {
return null;
}
const { Kafka } = require('kafkajs');
const url = new URL(process.env.KAFKA_URL);
const brokers = process.env.KAFKA_BROKER.split(',');
if (url.username.length === 0 && url.password.length === 0) {
return new Kafka({
clientId: 'umami',
brokers: brokers,
});
} else {
return new Kafka({
clientId: 'umami',
brokers: brokers,
ssl: true,
sasl: {
mechanism: 'plain',
username: url.username,
password: url.password,
},
});
}
}
const prisma = global.prisma || getPrismaClient(options);
const clickhouse = global.clickhouse || getClickhouseClient();
const kafka = global.kafka || getKafkaClient();
if (process.env.NODE_ENV !== 'production') {
global.prisma = prisma;
global.clickhouse = clickhouse;
global.kafka = kafka;
}
export { prisma, clickhouse };
export { prisma, clickhouse, kafka };
export function getDatabase() {
const type =
@ -291,8 +321,24 @@ export async function runAnalyticsQuery(queries) {
if (db === POSTGRESQL || db === MYSQL) {
return queries[RELATIONAL]();
}
59;
if (db === CLICKHOUSE) {
return queries[CLICKHOUSE]();
}
}
export async function kafkaProducer(params, topic) {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic,
messages: [
{
key: 'key',
value: JSON.stringify(params),
},
],
});
}

View File

@ -78,6 +78,7 @@
"is-localhost-ip": "^1.4.0",
"isbot": "^3.4.5",
"jose": "2.0.5",
"kafkajs": "^2.1.0",
"maxmind": "^4.3.6",
"moment-timezone": "^0.5.33",
"next": "^12.2.0",

View File

@ -1,6 +1,8 @@
#DATABASE_URL="postgresql://umami:rainbow-unicorn-dog@5.161.101.112:5432/umami?schema=analytics&connection_limit=1"
DATABASE_URL="postgresql://umami:shhhthisissupersecret!@164.92.98.175:5432/umami?schema=umami"
ANALYTICS_URL="clickhouse://default:shhhthisissupersecret!@164.92.95.2:8123/umami_dev"
KAFKA_URL="kafka://username:password!@localhost/";
KAFKA_BROKER="localhost:9092,localhost:9093,localhost:9094"
HASH_SALT="mySalt!!~"
LOG_QUERY=1

View File

@ -5,6 +5,7 @@ import {
rawQueryClickhouse,
runAnalyticsQuery,
runQuery,
kafkaProducer,
} from 'lib/db';
import { getSessionByUuid } from 'queries';
@ -33,23 +34,29 @@ async function clickhouseQuery(
website_id,
{ session_uuid, hostname, browser, os, screen, language, country, device },
) {
const params = [
session_uuid,
website_id,
hostname,
browser,
os,
device,
screen,
language,
country ? country : null,
];
const params = {
session_uuid: session_uuid,
website_id: website_id,
hostname: hostname,
browser: browser,
os: os,
device: device,
screen: screen,
language: language,
country: country ? country : null,
};
await rawQueryClickhouse(
`insert into umami_dev.session (created_at, session_uuid, website_id, hostname, browser, os, device, screen, language, country)
values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4, $5, $6, $7, $8, $9);`,
params,
);
if (process.env.KAFKA_URL) {
await kafkaProducer(params, 'session');
} else {
const paramsValue = Object.keys(params);
await rawQueryClickhouse(
`insert into umami_dev.session (created_at, session_uuid, website_id, hostname, browser, os, device, screen, language, country)
values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4, $5, $6, $7, $8, $9);`,
paramsValue,
);
}
return getSessionByUuid(session_uuid);
}

View File

@ -4249,6 +4249,11 @@ jsx-ast-utils@^3.3.1:
array-includes "^3.1.5"
object.assign "^4.1.2"
kafkajs@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.1.0.tgz#32ede4e8080cc75586c5e4406eeb582fa73f7b1e"
integrity sha512-6IYiOdGWvFPbSbVB+AV3feT+A7vzw5sXm7Ze4QTfP7FRNdY8pGcpiNPvD2lfgYFD8Dm9KbMgBgTt2mf8KaIkzw==
kind-of@^6.0.2, kind-of@^6.0.3:
version "6.0.3"
resolved "https://registry.yarnpkg.com/kind-of/-/kind-of-6.0.3.tgz#07c05034a6c349fa06e24fa35aa76db4580ce4dd"