This commit is contained in:
Brian Cao 2022-08-25 22:43:22 -07:00
parent 7e9ed704cc
commit f499af5491
3 changed files with 32 additions and 14 deletions

View File

@ -35,27 +35,27 @@ if (process.env.NODE_ENV !== 'production') {
export { clickhouse };
export function getDateStringQuery(data, unit) {
function getDateStringQuery(data, unit) {
return `formatDateTime(${data}, '${CLICKHOUSE_DATE_FORMATS[unit]}')`;
}
export function getDateQuery(field, unit, timezone) {
function getDateQuery(field, unit, timezone) {
if (timezone) {
return `date_trunc('${unit}', ${field}, '${timezone}')`;
}
return `date_trunc('${unit}', ${field})`;
}
export function getDateFormat(date) {
function getDateFormat(date) {
return `'${dateFormat(date, 'UTC:yyyy-mm-dd HH:MM:ss')}'`;
}
export function getBetweenDates(field, start_at, end_at) {
function getBetweenDates(field, start_at, end_at) {
return `${field} between ${getDateFormat(start_at)}
and ${getDateFormat(end_at)}`;
}
export function getFilterQuery(table, column, filters = {}, params = []) {
function getFilterQuery(table, column, filters = {}, params = []) {
const query = Object.keys(filters).reduce((arr, key) => {
const filter = filters[key];
@ -115,7 +115,7 @@ export function getFilterQuery(table, column, filters = {}, params = []) {
return query.join('\n');
}
export function parseFilters(table, column, filters = {}, params = [], sessionKey = 'session_id') {
function parseFilters(table, column, filters = {}, params = [], sessionKey = 'session_id') {
const { domain, url, event_url, referrer, os, browser, device, country, event_name, query } =
filters;
@ -138,7 +138,7 @@ export function parseFilters(table, column, filters = {}, params = [], sessionKe
};
}
export function replaceQuery(string, params = []) {
function replaceQuery(string, params = []) {
let formattedString = string;
params.forEach((a, i) => {
@ -154,7 +154,7 @@ export function replaceQuery(string, params = []) {
return formattedString;
}
export async function rawQuery(query, params = [], debug = false) {
async function rawQuery(query, params = [], debug = false) {
let formattedQuery = replaceQuery(query, params);
if (debug || process.env.LOG_QUERY) {
@ -164,7 +164,7 @@ export async function rawQuery(query, params = [], debug = false) {
return clickhouse.query(formattedQuery).toPromise();
}
export async function findUnique(data) {
async function findUnique(data) {
if (data.length > 1) {
throw `${data.length} records found when expecting 1.`;
}
@ -172,6 +172,19 @@ export async function findUnique(data) {
return data[0] ?? null;
}
export async function findFirst(data) {
async function findFirst(data) {
return data[0] ?? null;
}
export default {
getDateStringQuery,
getDateQuery,
getDateFormat,
getBetweenDates,
getFilterQuery,
parseFilters,
replaceQuery,
rawQuery,
findUnique,
findFirst,
};

View File

@ -44,18 +44,18 @@ let kafkaProducer = null;
export { kafka, kafkaProducer };
export async function getProducer() {
async function getProducer() {
const producer = kafka.producer();
await producer.connect();
return producer;
}
export function getDateFormat(date) {
function getDateFormat(date) {
return dateFormat(date, 'UTC:yyyy-mm-dd HH:MM:ss');
}
export async function sendMessage(params, topic) {
async function sendMessage(params, topic) {
await kafkaProducer.send({
topic,
messages: [
@ -67,3 +67,8 @@ export async function sendMessage(params, topic) {
acks: 0,
});
}
export default {
getDateFormat,
sendMessage,
};

View File

@ -55,7 +55,7 @@ async function clickhouseQuery(
sessionKey = 'session_uuid',
) {
const params = [website_id];
const { pageviewQuery, sessionQuery, joinSession } = clickhouse.parseFilters(
const { pageviewQuery, sessionQuery, joinSession } = parseFilters(
'pageview',
null,
filters,