clickhouse inserts

This commit is contained in:
Brian Cao 2022-07-22 14:43:19 -07:00
parent 6ea2282f82
commit 65910c7348
13 changed files with 146 additions and 40 deletions

View File

@ -72,19 +72,6 @@ function initializeClickhouse() {
database,
},
});
// return new ClickHouse({
// url: 'http://164.92.95.2',
// port: 8123,
// basicAuth: {
// username: 'default',
// password: 'shhhthisissupersecret!',
// },
// format: 'json',
// config: {
// database: 'umami_dev',
// },
// });
}
const prisma = initializePrisma(options);
@ -243,7 +230,7 @@ export function getFilterQuery(table, filters = {}, params = []) {
return query.join('\n');
}
export function parseFilters(table, filters = {}, params = []) {
export function parseFilters(table, filters = {}, params = [], sessionKey = 'session_id') {
const { domain, url, event_url, referrer, os, browser, device, country, event_type } = filters;
const pageviewFilters = { domain, url, referrer };
@ -257,7 +244,7 @@ export function parseFilters(table, filters = {}, params = []) {
event: { event_type },
joinSession:
os || browser || device || country
? `inner join session on ${table}.session_id = session.session_id`
? `inner join session on ${table}.${sessionKey} = session.${sessionKey}`
: '',
pageviewQuery: getFilterQuery('pageview', pageviewFilters, params),
sessionQuery: getFilterQuery('session', sessionFilters, params),

View File

@ -37,6 +37,8 @@ export async function getSession(req) {
let session = await getSessionByUuid(session_uuid);
session = Array.isArray(session) && session[0] ? session[0] : null;
if (!session) {
try {
session = await createSession(website_id, {
@ -50,6 +52,8 @@ export async function getSession(req) {
device,
});
console.log(session);
if (!session) {
return null;
}
@ -65,5 +69,6 @@ export async function getSession(req) {
return {
website_id,
session_id,
session_uuid,
};
}

View File

@ -60,7 +60,7 @@ export default async (req, res) => {
await useSession(req, res);
const {
session: { website_id, session_id },
session: { website_id, session_id, session_uuid },
} = req;
const { type, payload } = getJsonBody(req);
@ -72,9 +72,9 @@ export default async (req, res) => {
}
if (type === 'pageview') {
await savePageView(website_id, session_id, url, referrer);
await savePageView(website_id, { session_id, session_uuid, url, referrer });
} else if (type === 'event') {
await saveEvent(website_id, session_id, url, event_type, event_value);
await saveEvent(website_id, { session_id, session_uuid, url, event_type, event_value });
} else {
return badRequest(res);
}

View File

@ -34,7 +34,7 @@ export default async (req, res) => {
device,
country,
}),
getPageviewStats(websiteId, startDate, endDate, tz, unit, 'distinct pageview.session_id', {
getPageviewStats(websiteId, startDate, endDate, tz, unit, 'distinct pageview.', {
url,
os,
browser,

View File

@ -1,8 +1,20 @@
import { prisma, runQuery } from 'lib/db';
import { CLICKHOUSE, RELATIONAL, URL_LENGTH } from 'lib/constants';
import {
getDateFormatClickhouse,
prisma,
rawQueryClickhouse,
runAnalyticsQuery,
runQuery,
} from 'lib/db';
import { URL_LENGTH } from 'lib/constants';
export async function saveEvent(...args) {
return runAnalyticsQuery({
[`${RELATIONAL}`]: () => relationalQuery(...args),
[`${CLICKHOUSE}`]: () => clickhouseQuery(...args),
});
}
export async function saveEvent(website_id, session_id, url, event_type, event_value) {
async function relationalQuery(website_id, { session_id, url, event_type, event_value }) {
return runQuery(
prisma.event.create({
data: {
@ -15,3 +27,20 @@ export async function saveEvent(website_id, session_id, url, event_type, event_v
}),
);
}
async function clickhouseQuery(website_id, { session_uuid, url, event_type, event_value }) {
const params = [
website_id,
session_uuid,
url?.substr(0, URL_LENGTH),
event_type?.substr(0, 50),
event_value?.substr(0, 50),
];
return rawQueryClickhouse(
`
insert into umami_dev.event (created_at, website_id, session_uuid, url, event_type, event_value)
values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4, $5);`,
params,
);
}

View File

@ -45,6 +45,7 @@ async function clickhouseQuery(website_id, start_at, end_at, field, table, filte
table,
filters,
params,
'session_uuid',
);
return rawQueryClickhouse(

View File

@ -26,6 +26,7 @@ async function relationalQuery(
unit = 'day',
count = '*',
filters = {},
sessionKey = 'session_id',
) {
const params = [website_id, start_at, end_at];
const { pageviewQuery, sessionQuery, joinSession } = parseFilters('pageview', filters, params);
@ -37,7 +38,7 @@ async function relationalQuery(
g.y as y
from
(select ${getDateQuery('pageview.created_at', unit, timezone)} t,
count(${count}) y
count(${count != '*' ? `${count}${sessionKey}` : count}) y
from pageview
${joinSession}
where pageview.website_id=$1
@ -59,9 +60,15 @@ async function clickhouseQuery(
unit = 'day',
count = '*',
filters = {},
sessionKey = 'session_uuid',
) {
const params = [website_id];
const { pageviewQuery, sessionQuery, joinSession } = parseFilters('pageview', filters, params);
const { pageviewQuery, sessionQuery, joinSession } = parseFilters(
'pageview',
filters,
params,
sessionKey,
);
return rawQueryClickhouse(
`
@ -71,7 +78,7 @@ async function clickhouseQuery(
from
(select
${getDateQueryClickhouse('created_at', unit, timezone)} t,
count(${count}) y
count(${count != '*' ? `${count}${sessionKey}` : count}) y
from pageview
${joinSession}
where pageview.website_id= $1

View File

@ -1,8 +1,20 @@
import { prisma, runQuery } from 'lib/db';
import { CLICKHOUSE, RELATIONAL, URL_LENGTH } from 'lib/constants';
import {
getDateFormatClickhouse,
prisma,
rawQueryClickhouse,
runAnalyticsQuery,
runQuery,
} from 'lib/db';
import { URL_LENGTH } from 'lib/constants';
export async function savePageView(...args) {
return runAnalyticsQuery({
[`${RELATIONAL}`]: () => relationalQuery(...args),
[`${CLICKHOUSE}`]: () => clickhouseQuery(...args),
});
}
export async function savePageView(website_id, session_id, url, referrer) {
async function relationalQuery(website_id, { session_id, url, referrer }) {
return runQuery(
prisma.pageview.create({
data: {
@ -14,3 +26,19 @@ export async function savePageView(website_id, session_id, url, referrer) {
}),
);
}
async function clickhouseQuery(website_id, { session_uuid, url, referrer }) {
const params = [
website_id,
session_uuid,
url?.substr(0, URL_LENGTH),
referrer?.substr(0, URL_LENGTH),
];
return rawQueryClickhouse(
`
insert into umami_dev.pageview (created_at, website_id, session_uuid, url, referrer)
values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4);`,
params,
);
}

View File

@ -1,6 +1,21 @@
import { prisma, runQuery } from 'lib/db';
import { CLICKHOUSE, RELATIONAL } from 'lib/constants';
import {
getDateFormatClickhouse,
prisma,
rawQueryClickhouse,
runAnalyticsQuery,
runQuery,
} from 'lib/db';
import { getSessionByUuid } from 'queries';
export async function createSession(website_id, data) {
export async function createSession(...args) {
return runAnalyticsQuery({
[`${RELATIONAL}`]: () => relationalQuery(...args),
[`${CLICKHOUSE}`]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(website_id, data) {
return runQuery(
prisma.session.create({
data: {
@ -13,3 +28,28 @@ export async function createSession(website_id, data) {
}),
);
}
async function clickhouseQuery(
website_id,
{ session_uuid, hostname, browser, os, screen, language, country, device },
) {
const params = [
session_uuid,
website_id,
hostname,
browser,
os,
device,
screen,
language,
country ? country : null,
];
await rawQueryClickhouse(
`insert into umami_dev.session (created_at, session_uuid, website_id, hostname, browser, os, device, screen, language, country)
values (${getDateFormatClickhouse(new Date())}, $1, $2, $3, $4, $5, $6, $7, $8, $9);`,
params,
);
return getSessionByUuid(session_uuid);
}

View File

@ -24,7 +24,6 @@ async function clickhouseQuery(session_uuid) {
return rawQueryClickhouse(
`
select
session_id,
session_uuid,
website_id,
created_at,
@ -36,7 +35,7 @@ async function clickhouseQuery(session_uuid) {
"language",
country
from session
where session_id = $1
where session_uuid = $1
`,
params,
);

View File

@ -40,14 +40,19 @@ async function relationalQuery(website_id, start_at, end_at, field, filters = {}
async function clickhouseQuery(website_id, start_at, end_at, field, filters = {}) {
const params = [website_id];
const { pageviewQuery, sessionQuery, joinSession } = parseFilters('pageview', filters, params);
const { pageviewQuery, sessionQuery, joinSession } = parseFilters(
'pageview',
filters,
params,
'session_uuid',
);
return rawQueryClickhouse(
`
select ${field} x, count(*) y
from session as x
where x.session_id in (
select pageview.session_id
where x.session_uuid in (
select pageview.session_uuid
from pageview
${joinSession}
where pageview.website_id=$1

View File

@ -29,7 +29,7 @@ async function clickhouseQuery(website_id) {
return rawQueryClickhouse(
`
select count(distinct session_id) x
select count(distinct session_uuid) x
from pageview
where website_id = $1
and created_at >= ${getDateFormatClickhouse(subMinutes(new Date(), 5))}

View File

@ -53,17 +53,22 @@ async function relationalQuery(website_id, start_at, end_at, filters = {}) {
async function clickhouseQuery(website_id, start_at, end_at, filters = {}) {
const params = [website_id];
const { pageviewQuery, sessionQuery, joinSession } = parseFilters('pageview', filters, params);
const { pageviewQuery, sessionQuery, joinSession } = parseFilters(
'pageview',
filters,
params,
'session_uuid',
);
return rawQueryClickhouse(
`
select
sum(t.c) as "pageviews",
count(distinct t.session_id) as "uniques",
count(distinct t.session_uuid) as "uniques",
sum(if(t.c = 1, 1, 0)) as "bounces",
sum(if(max_time < min_time + interval 1 hour, max_time-min_time, 0)) as "totaltime"
from (
select pageview.session_id,
select pageview.session_uuid,
${getDateQueryClickhouse('pageview.created_at', 'day')} time_series,
count(*) c,
min(created_at) min_time,
@ -74,7 +79,7 @@ async function clickhouseQuery(website_id, start_at, end_at, filters = {}) {
and ${getBetweenDatesClickhouse('pageview.created_at', start_at, end_at)}
${pageviewQuery}
${sessionQuery}
group by pageview.session_id, time_series
group by pageview.session_uuid, time_series
) t;
`,
params,