This commit is contained in:
Mike Cao 2022-10-11 23:48:05 -07:00
commit c33729e185
12 changed files with 42 additions and 28 deletions

View File

@ -44,7 +44,7 @@ CREATE TABLE event_queue (
created_at DateTime('UTC')
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'dev-01.umami.dev:9092,dev-01.umami.dev:9093,dev-01.umami.dev:9094', -- input broker list
SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input broker list
kafka_topic_list = 'event',
kafka_group_name = 'event_consumer_group',
kafka_format = 'JSONEachRow',

View File

@ -61,7 +61,7 @@ function getDateFormat(date) {
}
function getCommaSeparatedStringFormat(data) {
return data.map(a => `'${a}'`).join(',');
return data.map(a => `'${a}'`).join(',') || '';
}
function getBetweenDates(field, start_at, end_at) {

View File

@ -39,7 +39,7 @@ export async function getSession(req) {
// Check database if does not exists in Redis
if (!websiteId) {
const website = await getWebsiteByUuid(websiteUuid);
websiteId = website ? website.websiteId : null;
websiteId = website ? website.id : null;
}
if (!websiteId || websiteId === DELETED) {
@ -47,7 +47,7 @@ export async function getSession(req) {
}
const { userAgent, browser, os, ip, country, device } = await getClientInfo(req, payload);
const sessionUuid = uuid(websiteId, hostname, ip, userAgent);
const sessionUuid = uuid(websiteUuid, hostname, ip, userAgent);
let sessionId = null;
let session = null;
@ -61,7 +61,7 @@ export async function getSession(req) {
// Check database if does not exists in Redis
if (!sessionId) {
session = await getSessionByUuid(sessionUuid);
sessionId = session ? session.sessionId : null;
sessionId = session ? session.id : null;
}
if (!sessionId) {
@ -97,7 +97,10 @@ export async function getSession(req) {
}
return {
websiteId: websiteId,
website: {
websiteId,
websiteUuid,
},
session,
};
}

View File

@ -59,7 +59,7 @@ export default async (req, res) => {
await useSession(req, res);
const {
session: { websiteId, session },
session: { website, session },
} = req;
const { type, payload } = getJsonBody(req);
@ -73,9 +73,9 @@ export default async (req, res) => {
const eventUuid = uuid();
if (type === 'pageview') {
await savePageView(websiteId, { session, url, referrer });
await savePageView(website, { session, url, referrer });
} else if (type === 'event') {
await saveEvent(websiteId, {
await saveEvent(website, {
session,
eventUuid,
url,
@ -87,7 +87,11 @@ export default async (req, res) => {
}
const token = createToken(
{ websiteId, sessionId: session.sessionId, sessionUuid: session.sessionUuid },
{
websiteId: website.websiteUuid,
sessionId: session.sessionId,
sessionUuid: session.sessionUuid,
},
secret(),
);

View File

@ -11,7 +11,7 @@ export default async (req, res) => {
const { userId } = req.auth;
const websites = await getUserWebsites(userId);
const ids = websites.map(({ id }) => id);
const ids = websites.map(({ websiteUuid }) => websiteUuid);
const token = createToken({ websites: ids }, secret());
const data = await getRealtimeData(ids, subMinutes(new Date(), 30));

View File

@ -13,7 +13,7 @@ function relationalQuery(websites, start_at) {
return prisma.client.event.findMany({
where: {
website: {
id: {
websiteUuid: {
in: websites,
},
},

View File

@ -10,7 +10,10 @@ export async function saveEvent(...args) {
});
}
async function relationalQuery(websiteId, { sessionId, url, eventName, eventData }) {
async function relationalQuery(
{ websiteId },
{ session: { id: sessionId }, url, eventName, eventData },
) {
const data = {
websiteId,
sessionId,
@ -32,7 +35,7 @@ async function relationalQuery(websiteId, { sessionId, url, eventName, eventData
}
async function clickhouseQuery(
websiteId,
{ websiteUuid: websiteId },
{ session: { country, sessionUuid, ...sessionArgs }, eventUuid, url, eventName, eventData },
) {
const { getDateFormat, sendMessage } = kafka;

View File

@ -13,7 +13,7 @@ async function relationalQuery(websites, start_at) {
return prisma.client.pageview.findMany({
where: {
website: {
id: {
websiteUuid: {
in: websites,
},
},
@ -25,9 +25,9 @@ async function relationalQuery(websites, start_at) {
}
async function clickhouseQuery(websites, start_at) {
const { getCommaSeparatedStringFormat } = clickhouse;
const { rawQuery, getCommaSeparatedStringFormat } = clickhouse;
return clickhouse.rawQuery(
return rawQuery(
`select
website_id,
session_id,

View File

@ -10,7 +10,7 @@ export async function savePageView(...args) {
});
}
async function relationalQuery(websiteId, { session: { sessionId }, url, referrer }) {
async function relationalQuery({ websiteId }, { session: { id: sessionId }, url, referrer }) {
return prisma.client.pageview.create({
data: {
websiteId,
@ -22,7 +22,7 @@ async function relationalQuery(websiteId, { session: { sessionId }, url, referre
}
async function clickhouseQuery(
websiteId,
{ websiteUuid: websiteId },
{ session: { country, sessionUuid, ...sessionArgs }, url, referrer },
) {
const { getDateFormat, sendMessage } = kafka;

View File

@ -18,7 +18,7 @@ async function relationalQuery(websiteId, data) {
...data,
},
select: {
sessionId: true,
id: true,
sessionUuid: true,
hostname: true,
browser: true,
@ -31,7 +31,7 @@ async function relationalQuery(websiteId, data) {
})
.then(async res => {
if (redis.client && res) {
await redis.client.set(`session:${res.sessionUuid}`, res.id);
await redis.client.set(`session:${res.sessionUuid}`, 1);
}
return res;

View File

@ -19,7 +19,7 @@ async function relationalQuery(sessionUuid) {
})
.then(async res => {
if (redis.client && res) {
await redis.client.set(`session:${res.sessionUuid}`, res.sessionId);
await redis.client.set(`session:${res.sessionUuid}`, 1);
}
return res;
@ -32,7 +32,7 @@ async function clickhouseQuery(sessionUuid) {
return rawQuery(
`select distinct
session_uuid,
session_id,
website_id,
created_at,
hostname,
@ -43,7 +43,7 @@ async function clickhouseQuery(sessionUuid) {
language,
country
from event
where session_uuid = $1`,
where session_id = $1`,
params,
)
.then(result => findFirst(result))

View File

@ -15,7 +15,7 @@ async function relationalQuery(websites, start_at) {
...(websites && websites.length > 0
? {
website: {
id: {
websiteUuid: {
in: websites,
},
},
@ -29,11 +29,11 @@ async function relationalQuery(websites, start_at) {
}
async function clickhouseQuery(websites, start_at) {
const { rawQuery, getDateFormat } = clickhouse;
const { rawQuery, getDateFormat, getCommaSeparatedStringFormat } = clickhouse;
return rawQuery(
`select distinct
session_uuid,
session_id,
website_id,
created_at,
hostname,
@ -44,7 +44,11 @@ async function clickhouseQuery(websites, start_at) {
language,
country
from event
where ${websites && websites.length > 0 ? `website_id in (${websites.join(',')})` : '0 = 0'}
where ${
websites && websites.length > 0
? `website_id in (${getCommaSeparatedStringFormat(websites)})`
: '0 = 0'
}
and created_at >= ${getDateFormat(start_at)}`,
);
}