Updated realtime data fetch.

This commit is contained in:
Mike Cao 2024-08-20 23:58:20 -07:00
parent 04e0b33622
commit e35c11c3d6
9 changed files with 126 additions and 87 deletions

View File

@ -10,7 +10,6 @@ import { safeDecodeURI } from 'next-basics';
import { useContext, useMemo, useState } from 'react';
import { Icon, SearchField, StatusLight, Text } from 'react-basics';
import { FixedSizeList } from 'react-window';
import thenby from 'thenby';
import { WebsiteContext } from '../WebsiteProvider';
import styles from './RealtimeLog.module.css';
@ -141,15 +140,7 @@ export function RealtimeLog({ data }: { data: RealtimeData }) {
return [];
}
const { events, visitors } = data;
let logs = [
...events.map(e => ({
__type: e.eventName ? TYPE_EVENT : TYPE_PAGEVIEW,
...e,
})),
...visitors.map(v => ({ __type: TYPE_SESSION, ...v })),
].sort(thenby.firstBy('createdAt', -1));
let logs = data.events;
if (search) {
logs = logs.filter(({ eventName, urlPath, browser, os, country, device }) => {

View File

@ -1,15 +1,13 @@
import { RealtimeData } from 'lib/types';
import { useApi } from './useApi';
import { REALTIME_INTERVAL } from 'lib/constants';
import { useTimezone } from '../useTimezone';
export function useRealtime(websiteId: string) {
const { get, useQuery } = useApi();
const { timezone } = useTimezone();
const { data, isLoading, error } = useQuery<RealtimeData>({
queryKey: ['realtime', websiteId],
queryFn: async () => {
return get(`/realtime/${websiteId}`, { timezone });
return get(`/realtime/${websiteId}`);
},
enabled: !!websiteId,
refetchInterval: REALTIME_INTERVAL,

View File

@ -187,7 +187,10 @@ async function rawQuery<T = unknown>(
query: query,
query_params: params,
format: 'JSONEachRow',
clickhouse_settings: { output_format_json_quote_64bit_integers: 0 },
clickhouse_settings: {
date_time_output_format: 'iso',
output_format_json_quote_64bit_integers: 0,
},
});
return (await resultSet.json()) as T;

View File

@ -10,13 +10,11 @@ import { REALTIME_RANGE } from 'lib/constants';
export interface RealtimeRequestQuery {
websiteId: string;
timezone: string;
}
const schema = {
GET: yup.object().shape({
websiteId: yup.string().uuid().required(),
timezone: yup.string().required(),
}),
};
@ -25,7 +23,7 @@ export default async (req: NextApiRequestQueryBody<RealtimeRequestQuery>, res: N
await useValidate(schema, req, res);
if (req.method === 'GET') {
const { websiteId, timezone } = req.query;
const { websiteId } = req.query;
if (!(await canViewWebsite(req.auth, websiteId))) {
return unauthorized(res);
@ -33,7 +31,7 @@ export default async (req: NextApiRequestQueryBody<RealtimeRequestQuery>, res: N
const startDate = subMinutes(startOfMinute(new Date()), REALTIME_RANGE);
const data = await getRealtimeData(websiteId, { startDate, timezone });
const data = await getRealtimeData(websiteId, { startDate });
return ok(res, data);
}

View File

@ -0,0 +1,67 @@
import prisma from 'lib/prisma';
import clickhouse from 'lib/clickhouse';
import { runQuery, CLICKHOUSE, PRISMA } from 'lib/db';
import { QueryFilters } from 'lib/types';
export async function getRealtimeActivity(...args: [websiteId: string, filters: QueryFilters]) {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(websiteId: string, filters: QueryFilters) {
const { rawQuery, parseFilters } = prisma;
const { params, filterQuery, dateQuery } = await parseFilters(websiteId, filters);
return rawQuery(
`
select
website_event.session_id as sessionId,
website_event.event_name as eventName,
website_event.created_at as createdAt,
session.browser,
session.os,
session.device,
session.country,
session.url_path as urlPath,
session.referrer_domain as referrerDomain
from website_event
inner join session
on session.session_id = website_event.session_id
where website_id = {{websiteId::uuid}}
${filterQuery}
${dateQuery}
order by created_at asc
limit 100
`,
params,
);
}
async function clickhouseQuery(websiteId: string, filters: QueryFilters): Promise<{ x: number }> {
const { rawQuery, parseFilters } = clickhouse;
const { params, filterQuery, dateQuery } = await parseFilters(websiteId, filters);
return rawQuery(
`
select
session_id as sessionId,
event_name as eventName,
created_at as createdAt,
browser,
os,
device,
country,
url_path as urlPath,
referrer_domain as referrerDomain
from website_event
where website_id = {websiteId:UUID}
${filterQuery}
${dateQuery}
order by createdAt asc
limit 100
`,
params,
);
}

View File

@ -1,11 +1,4 @@
import {
getWebsiteSessions,
getWebsiteEvents,
getPageviewStats,
getSessionStats,
} from 'queries/index';
const MAX_SIZE = 50;
import { getRealtimeActivity, getPageviewStats, getSessionStats } from 'queries/index';
function increment(data: object, key: string) {
if (key) {
@ -17,61 +10,47 @@ function increment(data: object, key: string) {
}
}
export async function getRealtimeData(
websiteId: string,
criteria: { startDate: Date; timezone: string },
) {
const { startDate, timezone } = criteria;
const filters = { startDate, endDate: new Date(), unit: 'minute', timezone };
const [events, sessions, pageviews, sessionviews] = await Promise.all([
getWebsiteEvents(websiteId, { startDate, timezone }, { pageSize: 10000 }),
getWebsiteSessions(websiteId, { startDate, timezone }, { pageSize: 10000 }),
export async function getRealtimeData(websiteId: string, criteria: { startDate: Date }) {
const { startDate } = criteria;
const filters = { startDate, endDate: new Date(), unit: 'minute' };
const [activity, pageviews, sessions] = await Promise.all([
getRealtimeActivity(websiteId, filters),
getPageviewStats(websiteId, filters),
getSessionStats(websiteId, filters),
]);
const uniques = new Set();
const sessionStats = sessions.data.reduce(
(obj: { visitors: any; countries: any }, session: { id: any; country: any }) => {
const { countries, visitors } = obj;
const { id, country } = session;
const { countries, urls, referrers, events } = activity.reduce(
(
obj: { countries: any; urls: any; referrers: any; events: any },
event: {
sessionId: string;
urlPath: string;
referrerDomain: string;
country: string;
eventName: string;
},
) => {
const { countries, urls, referrers, events } = obj;
const { sessionId, urlPath, referrerDomain, country, eventName } = event;
if (!uniques.has(id)) {
uniques.add(id);
if (!uniques.has(sessionId)) {
uniques.add(sessionId);
increment(countries, country);
if (visitors.length < MAX_SIZE) {
visitors.push(session);
}
events.push({ __type: 'session', ...event });
}
increment(urls, urlPath);
increment(referrers, referrerDomain);
events.push({ __type: eventName ? 'event' : 'pageview', ...event });
return obj;
},
{
countries: {},
visitors: [],
},
);
const eventStats = events.data.reduce(
(
obj: { urls: any; referrers: any; events: any },
event: { urlPath: any; referrerDomain: any },
) => {
const { urls, referrers, events } = obj;
const { urlPath, referrerDomain } = event;
increment(urls, urlPath);
increment(referrers, referrerDomain);
if (events.length < MAX_SIZE) {
events.push(event);
}
return obj;
},
{
urls: {},
referrers: {},
events: [],
@ -79,17 +58,19 @@ export async function getRealtimeData(
);
return {
...sessionStats,
...eventStats,
countries,
urls,
referrers,
events: events.reverse(),
series: {
views: pageviews,
visitors: sessionviews,
visitors: sessions,
},
totals: {
views: events.data.filter(e => !e.eventName).length,
visitors: uniques.size,
events: events.data.filter(e => e.eventName).length,
countries: Object.keys(sessionStats.countries).length,
views: pageviews.reduce((sum: number, { y }: { y: number }) => sum + y, 0),
visitors: sessions.reduce((sum: number, { y }: { y: number }) => sum + y, 0),
events: activity.filter(e => e.eventName).length,
countries: Object.keys(countries).length,
},
timestamp: Date.now(),
};

View File

@ -40,8 +40,8 @@ async function clickhouseQuery(
websiteId: string,
filters: QueryFilters,
): Promise<{ x: string; y: number }[]> {
const { timezone = 'UTC', unit = 'day' } = filters;
const { parseFilters, rawQuery, getDateStringSQL, getDateSQL } = clickhouse;
const { unit = 'day' } = filters;
const { parseFilters, rawQuery } = clickhouse;
const { filterQuery, params } = await parseFilters(websiteId, {
...filters,
eventType: EVENT_TYPE.pageView,
@ -52,11 +52,11 @@ async function clickhouseQuery(
if (EVENT_COLUMNS.some(item => Object.keys(filters).includes(item)) || unit === 'minute') {
sql = `
select
${getDateStringSQL('g.t', unit)} as x,
g.t as x,
g.y as y
from (
select
${getDateSQL('created_at', unit, timezone)} as t,
date_trunc('${unit}', created_at) as t,
count(*) as y
from website_event
where website_id = {websiteId:UUID}
@ -70,11 +70,11 @@ async function clickhouseQuery(
} else {
sql = `
select
${getDateStringSQL('g.t', unit)} as x,
g.t as x,
g.y as y
from (
select
${getDateSQL('created_at', unit, timezone)} as t,
date_trunc('${unit}', created_at) as t,
sum(views)as y
from website_event_stats_hourly website_event
where website_id = {websiteId:UUID}

View File

@ -40,8 +40,8 @@ async function clickhouseQuery(
websiteId: string,
filters: QueryFilters,
): Promise<{ x: string; y: number }[]> {
const { timezone = 'UTC', unit = 'day' } = filters;
const { parseFilters, rawQuery, getDateStringSQL, getDateSQL } = clickhouse;
const { unit = 'day' } = filters;
const { parseFilters, rawQuery } = clickhouse;
const { filterQuery, params } = await parseFilters(websiteId, {
...filters,
eventType: EVENT_TYPE.pageView,
@ -52,11 +52,11 @@ async function clickhouseQuery(
if (EVENT_COLUMNS.some(item => Object.keys(filters).includes(item)) || unit === 'minute') {
sql = `
select
${getDateStringSQL('g.t', unit)} as x,
g.t as x,
g.y as y
from (
select
${getDateSQL('created_at', unit, timezone)} as t,
select
date_trunc('${unit}', created_at) as t,
count(distinct session_id) as y
from website_event
where website_id = {websiteId:UUID}
@ -70,11 +70,11 @@ async function clickhouseQuery(
} else {
sql = `
select
${getDateStringSQL('g.t', unit)} as x,
g.t as x,
g.y as y
from (
select
${getDateSQL('created_at', unit, timezone)} as t,
select
date_trunc('${unit}', created_at) as t,
uniq(session_id) as y
from website_event_stats_hourly website_event
where website_id = {websiteId:UUID}

View File

@ -31,6 +31,7 @@ export * from './analytics/sessions/getSessionActivity';
export * from './analytics/sessions/getSessionStats';
export * from './analytics/sessions/saveSessionData';
export * from './analytics/getActiveVisitors';
export * from './analytics/getRealtimeActivity';
export * from './analytics/getRealtimeData';
export * from './analytics/getValues';
export * from './analytics/getWebsiteDateRange';