Updated Kafka loading process.

This commit is contained in:
Mike Cao 2022-09-22 10:36:23 -07:00
parent 0b163274ba
commit dfac7e1af5

View File

@ -8,16 +8,15 @@ const log = debug('umami:kafka');
function getClient() { function getClient() {
const { username, password } = new URL(process.env.KAFKA_URL); const { username, password } = new URL(process.env.KAFKA_URL);
const brokers = process.env.KAFKA_BROKER.split(','); const brokers = process.env.KAFKA_BROKER.split(',');
const fs = require('fs');
const ssl = const ssl =
username && password username && password
? { ? {
ssl: { ssl: {
checkServerIdentity: () => undefined, checkServerIdentity: () => undefined,
ca: [fs.readFileSync('./lib/cert/ca_cert.pem', 'utf-8')], ca: [process.env.CA_CERT],
key: fs.readFileSync('./lib/cert/client_key.pem', 'utf-8'), key: process.env.CLIENT_KEY,
cert: fs.readFileSync('./lib/cert/client_cert.pem', 'utf-8'), cert: process.env.CLIENT_CERT,
}, },
sasl: { sasl: {
mechanism: 'plain', mechanism: 'plain',
@ -62,6 +61,8 @@ function getDateFormat(date) {
} }
async function sendMessage(params, topic) { async function sendMessage(params, topic) {
await getKafka();
await producer.send({ await producer.send({
topic, topic,
messages: [ messages: [
@ -73,21 +74,25 @@ async function sendMessage(params, topic) {
}); });
} }
async function getKafka() {
if (!kafka) {
kafka = process.env.KAFKA_URL && process.env.KAFKA_BROKER && (global[KAFKA] || getClient());
if (kafka) {
producer = global[KAFKA_PRODUCER] || (await getProducer());
}
}
return kafka;
}
// Initialization // Initialization
let kafka; let kafka;
let producer; let producer;
(async () => {
kafka = process.env.KAFKA_URL && process.env.KAFKA_BROKER && (global[KAFKA] || getClient());
if (kafka) {
producer = global[KAFKA_PRODUCER] || (await getProducer());
}
})();
export default { export default {
client: kafka, client: kafka,
producer: producer, producer,
log, log,
getDateFormat, getDateFormat,
sendMessage, sendMessage,