add Clickhouse migration files. update saveEvent to include event_uuid

This commit is contained in:
Francis Cao 2022-08-04 20:15:21 -07:00
parent 1ffd5775f8
commit 1a84608e2b
3 changed files with 148 additions and 6 deletions

View File

@ -0,0 +1,126 @@
-- Create Pageview
CREATE TABLE pageview
(
website_id UInt32,
session_uuid UUID,
created_at DateTime,
url String,
referrer String
)
engine = MergeTree PRIMARY KEY (session_uuid, created_at)
ORDER BY (session_uuid, created_at)
SETTINGS index_granularity = 8192;
CREATE TABLE pageview_queue (
website_id UInt32,
session_uuid UUID,
created_at DateTime,
url String,
referrer String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '', -- input broker list
kafka_topic_list = 'pageview',
kafka_group_name = 'pageview_consumer_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1048576,
kafka_skip_broken_messages = 1;
CREATE MATERIALIZED VIEW pageview_queue_mv TO pageview AS
SELECT website_id,
session_uuid,
created_at,
url,
referrer
FROM umami.pageview_queue;
-- Create Session
CREATE TABLE session
(
session_uuid UUID,
website_id UInt32,
created_at DateTime,
hostname LowCardinality(String),
browser LowCardinality(String),
os LowCardinality(String),
device LowCardinality(String),
screen LowCardinality(String),
language LowCardinality(String),
country LowCardinality(String)
)
engine = MergeTree PRIMARY KEY (session_uuid, created_at)
ORDER BY (session_uuid, created_at)
SETTINGS index_granularity = 8192;
CREATE TABLE session_queue (
session_uuid UUID,
website_id UInt32,
created_at DateTime,
hostname LowCardinality(String),
browser LowCardinality(String),
os LowCardinality(String),
device LowCardinality(String),
screen LowCardinality(String),
language LowCardinality(String),
country LowCardinality(String)
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '', -- input broker list
kafka_topic_list = 'session',
kafka_group_name = 'session_consumer_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1048576,
kafka_skip_broken_messages = 1;
CREATE MATERIALIZED VIEW session_queue_mv TO session AS
SELECT session_uuid,
website_id,
created_at,
hostname,
browser,
os,
device,
screen,
language,
country
FROM session_queue;
-- Create event
CREATE TABLE event
(
event_uuid UUID,
website_id UInt32,
session_uuid UUID,
created_at DateTime,
url String,
event_name String
)
engine = MergeTree PRIMARY KEY (event_uuid, created_at)
ORDER BY (event_uuid, created_at)
SETTINGS index_granularity = 8192;
CREATE TABLE event_queue (
event_uuid UUID,
website_id UInt32,
session_uuid UUID,
created_at DateTime,
url String,
event_name String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '', -- input broker list
kafka_topic_list = 'event',
kafka_group_name = 'event_consumer_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1048576,
kafka_skip_broken_messages = 1;
CREATE MATERIALIZED VIEW event_queue_mv TO event AS
SELECT event_uuid,
website_id,
session_uuid,
created_at,
url,
event_name
FROM event_queue;

View File

@ -7,6 +7,7 @@ import { getJsonBody, getIpAddress } from 'lib/request';
import { ok, send, badRequest, forbidden } from 'lib/response';
import { createToken } from 'lib/crypto';
import { removeTrailingSlash } from 'lib/url';
import { uuid } from 'lib/crypto';
export default async (req, res) => {
await useCors(req, res);
@ -71,10 +72,19 @@ export default async (req, res) => {
url = removeTrailingSlash(url);
}
const event_uuid = uuid(website_id, url, session_uuid, event_name);
if (type === 'pageview') {
await savePageView(website_id, { session_id, session_uuid, url, referrer });
} else if (type === 'event') {
await saveEvent(website_id, { session_id, session_uuid, url, event_name, event_data });
await saveEvent(website_id, {
event_uuid,
session_id,
session_uuid,
url,
event_name,
event_data,
});
} else {
return badRequest(res);
}

View File

@ -39,8 +39,14 @@ async function relationalQuery(website_id, { session_id, url, event_name, event_
);
}
async function clickhouseQuery(website_id, { session_uuid, url, event_name }) {
const params = [website_id, session_uuid, url?.substr(0, URL_LENGTH), event_name?.substr(0, 50)];
async function clickhouseQuery(website_id, { event_uuid, session_uuid, url, event_name }) {
const params = [
website_id,
event_uuid,
session_uuid,
url?.substr(0, URL_LENGTH),
event_name?.substr(0, 50),
];
return rawQueryClickhouse(
`
@ -50,13 +56,13 @@ async function clickhouseQuery(website_id, { session_uuid, url, event_name }) {
);
}
async function kafkaQuery(website_id, { session_uuid, url, event_type, event_value }) {
async function kafkaQuery(website_id, { event_uuid, session_uuid, url, event_name }) {
const params = {
event_uuid: event_uuid,
website_id: website_id,
session_uuid: session_uuid,
url: url?.substr(0, URL_LENGTH),
event_type: event_type?.substr(0, 50),
event_value: event_value?.substr(0, 50),
event_name: event_name?.substr(0, 50),
};
await kafkaProducer(params, 'event');