mirror of
https://github.com/kremalicious/umami.git
synced 2025-02-01 12:29:35 +01:00
Support Clickhouse insert without Kafka.
This commit is contained in:
parent
d501410a63
commit
9bf34bd5e4
@ -32,7 +32,7 @@ function getClient() {
|
|||||||
} = new URL(process.env.CLICKHOUSE_URL);
|
} = new URL(process.env.CLICKHOUSE_URL);
|
||||||
|
|
||||||
const client = createClient({
|
const client = createClient({
|
||||||
host: `${protocol}//${hostname}:${port}`,
|
url: `${protocol}//${hostname}:${port}`,
|
||||||
database: pathname.replace('/', ''),
|
database: pathname.replace('/', ''),
|
||||||
username: username,
|
username: username,
|
||||||
password,
|
password,
|
||||||
@ -136,7 +136,13 @@ async function rawQuery<T = unknown>(
|
|||||||
format: 'JSONEachRow',
|
format: 'JSONEachRow',
|
||||||
});
|
});
|
||||||
|
|
||||||
return resultSet.json();
|
return resultSet.json() as T;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function insert(table: string, values: any[]) {
|
||||||
|
await connect();
|
||||||
|
|
||||||
|
return clickhouse.insert({ table, values, format: 'JSONEachRow' });
|
||||||
}
|
}
|
||||||
|
|
||||||
async function findUnique(data: any[]) {
|
async function findUnique(data: any[]) {
|
||||||
@ -172,4 +178,5 @@ export default {
|
|||||||
findUnique,
|
findUnique,
|
||||||
findFirst,
|
findFirst,
|
||||||
rawQuery,
|
rawQuery,
|
||||||
|
insert,
|
||||||
};
|
};
|
||||||
|
@ -3,6 +3,7 @@ import { DATA_TYPE } from 'lib/constants';
|
|||||||
import { uuid } from 'lib/crypto';
|
import { uuid } from 'lib/crypto';
|
||||||
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
|
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
|
||||||
import { flattenJSON, getStringValue } from 'lib/data';
|
import { flattenJSON, getStringValue } from 'lib/data';
|
||||||
|
import clickhouse from 'lib/clickhouse';
|
||||||
import kafka from 'lib/kafka';
|
import kafka from 'lib/kafka';
|
||||||
import prisma from 'lib/prisma';
|
import prisma from 'lib/prisma';
|
||||||
import { DynamicData } from 'lib/types';
|
import { DynamicData } from 'lib/types';
|
||||||
@ -59,6 +60,7 @@ async function clickhouseQuery(data: {
|
|||||||
}) {
|
}) {
|
||||||
const { websiteId, sessionId, eventId, urlPath, eventName, eventData, createdAt } = data;
|
const { websiteId, sessionId, eventId, urlPath, eventName, eventData, createdAt } = data;
|
||||||
|
|
||||||
|
const { insert } = clickhouse;
|
||||||
const { getDateFormat, sendMessages } = kafka;
|
const { getDateFormat, sendMessages } = kafka;
|
||||||
|
|
||||||
const jsonKeys = flattenJSON(eventData);
|
const jsonKeys = flattenJSON(eventData);
|
||||||
@ -79,7 +81,11 @@ async function clickhouseQuery(data: {
|
|||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
await sendMessages(messages, 'event_data');
|
if (kafka.enabled) {
|
||||||
|
await sendMessages(messages, 'event_data');
|
||||||
|
} else {
|
||||||
|
await insert('event_data', messages);
|
||||||
|
}
|
||||||
|
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import { EVENT_NAME_LENGTH, URL_LENGTH, EVENT_TYPE, PAGE_TITLE_LENGTH } from 'lib/constants';
|
import { EVENT_NAME_LENGTH, URL_LENGTH, EVENT_TYPE, PAGE_TITLE_LENGTH } from 'lib/constants';
|
||||||
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
|
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
|
||||||
|
import clickhouse from 'lib/clickhouse';
|
||||||
import kafka from 'lib/kafka';
|
import kafka from 'lib/kafka';
|
||||||
import prisma from 'lib/prisma';
|
import prisma from 'lib/prisma';
|
||||||
import { uuid } from 'lib/crypto';
|
import { uuid } from 'lib/crypto';
|
||||||
@ -134,6 +135,7 @@ async function clickhouseQuery(data: {
|
|||||||
city,
|
city,
|
||||||
...args
|
...args
|
||||||
} = data;
|
} = data;
|
||||||
|
const { insert } = clickhouse;
|
||||||
const { getDateFormat, sendMessage } = kafka;
|
const { getDateFormat, sendMessage } = kafka;
|
||||||
const eventId = uuid();
|
const eventId = uuid();
|
||||||
const createdAt = getDateFormat(new Date());
|
const createdAt = getDateFormat(new Date());
|
||||||
@ -164,7 +166,11 @@ async function clickhouseQuery(data: {
|
|||||||
created_at: createdAt,
|
created_at: createdAt,
|
||||||
};
|
};
|
||||||
|
|
||||||
await sendMessage(message, 'event');
|
if (kafka.enabled) {
|
||||||
|
await sendMessage(message, 'event');
|
||||||
|
} else {
|
||||||
|
await insert('website_event', [message]);
|
||||||
|
}
|
||||||
|
|
||||||
if (eventData) {
|
if (eventData) {
|
||||||
await saveEventData({
|
await saveEventData({
|
||||||
|
Loading…
Reference in New Issue
Block a user