mirror of
https://github.com/kremalicious/umami.git
synced 2025-02-14 21:10:34 +01:00
Add connect methods to libraries.
This commit is contained in:
parent
186f484ff1
commit
e442617421
@ -14,6 +14,9 @@ export const CLICKHOUSE_DATE_FORMATS = {
|
|||||||
|
|
||||||
const log = debug('umami:clickhouse');
|
const log = debug('umami:clickhouse');
|
||||||
|
|
||||||
|
let clickhouse;
|
||||||
|
const enabled = Boolean(process.env.CLICKHOUSE_URL);
|
||||||
|
|
||||||
function getClient() {
|
function getClient() {
|
||||||
const {
|
const {
|
||||||
hostname,
|
hostname,
|
||||||
@ -144,6 +147,8 @@ async function rawQuery(query, params = []) {
|
|||||||
log(formattedQuery);
|
log(formattedQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await connect();
|
||||||
|
|
||||||
return clickhouse.query(formattedQuery).toPromise();
|
return clickhouse.query(formattedQuery).toPromise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,12 +164,19 @@ async function findFirst(data) {
|
|||||||
return data[0] ?? null;
|
return data[0] ?? null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialization
|
async function connect() {
|
||||||
const clickhouse = process.env.CLICKHOUSE_URL && (global[CLICKHOUSE] || getClient());
|
if (!clickhouse) {
|
||||||
|
clickhouse = process.env.CLICKHOUSE_URL && (global[CLICKHOUSE] || getClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
return clickhouse;
|
||||||
|
}
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
|
enabled,
|
||||||
client: clickhouse,
|
client: clickhouse,
|
||||||
log,
|
log,
|
||||||
|
connect,
|
||||||
getDateStringQuery,
|
getDateStringQuery,
|
||||||
getDateQuery,
|
getDateQuery,
|
||||||
getDateFormat,
|
getDateFormat,
|
||||||
|
14
lib/kafka.js
14
lib/kafka.js
@ -5,6 +5,10 @@ import { KAFKA, KAFKA_PRODUCER } from 'lib/db';
|
|||||||
|
|
||||||
const log = debug('umami:kafka');
|
const log = debug('umami:kafka');
|
||||||
|
|
||||||
|
let kafka;
|
||||||
|
let producer;
|
||||||
|
const enabled = Boolean(process.env.KAFKA_URL && process.env.KAFKA_BROKER);
|
||||||
|
|
||||||
function getClient() {
|
function getClient() {
|
||||||
const { username, password } = new URL(process.env.KAFKA_URL);
|
const { username, password } = new URL(process.env.KAFKA_URL);
|
||||||
const brokers = process.env.KAFKA_BROKER.split(',');
|
const brokers = process.env.KAFKA_BROKER.split(',');
|
||||||
@ -61,7 +65,7 @@ function getDateFormat(date) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function sendMessage(params, topic) {
|
async function sendMessage(params, topic) {
|
||||||
await getKafka();
|
await connect();
|
||||||
|
|
||||||
await producer.send({
|
await producer.send({
|
||||||
topic,
|
topic,
|
||||||
@ -74,7 +78,7 @@ async function sendMessage(params, topic) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async function getKafka() {
|
async function connect() {
|
||||||
if (!kafka) {
|
if (!kafka) {
|
||||||
kafka = process.env.KAFKA_URL && process.env.KAFKA_BROKER && (global[KAFKA] || getClient());
|
kafka = process.env.KAFKA_URL && process.env.KAFKA_BROKER && (global[KAFKA] || getClient());
|
||||||
|
|
||||||
@ -86,14 +90,12 @@ async function getKafka() {
|
|||||||
return kafka;
|
return kafka;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialization
|
|
||||||
let kafka;
|
|
||||||
let producer;
|
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
|
enabled,
|
||||||
client: kafka,
|
client: kafka,
|
||||||
producer,
|
producer,
|
||||||
log,
|
log,
|
||||||
|
connect,
|
||||||
getDateFormat,
|
getDateFormat,
|
||||||
sendMessage,
|
sendMessage,
|
||||||
};
|
};
|
||||||
|
36
lib/redis.js
36
lib/redis.js
@ -8,6 +8,9 @@ const log = debug('umami:redis');
|
|||||||
const INITIALIZED = 'redis:initialized';
|
const INITIALIZED = 'redis:initialized';
|
||||||
export const DELETED = 'deleted';
|
export const DELETED = 'deleted';
|
||||||
|
|
||||||
|
let redis;
|
||||||
|
const enabled = Boolean(process.env.REDIS_URL);
|
||||||
|
|
||||||
function getClient() {
|
function getClient() {
|
||||||
if (!process.env.REDIS_URL) {
|
if (!process.env.REDIS_URL) {
|
||||||
return null;
|
return null;
|
||||||
@ -40,26 +43,41 @@ async function stageData() {
|
|||||||
return { key: `website:${a.website_uuid}`, value: Number(a.website_id) };
|
return { key: `website:${a.website_uuid}`, value: Number(a.website_id) };
|
||||||
});
|
});
|
||||||
|
|
||||||
await addRedis(sessionUuids);
|
await addSet(sessionUuids);
|
||||||
await addRedis(websiteIds);
|
await addSet(websiteIds);
|
||||||
|
|
||||||
await redis.set(INITIALIZED, 1);
|
await redis.set(INITIALIZED, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function addRedis(ids) {
|
async function addSet(ids) {
|
||||||
for (let i = 0; i < ids.length; i++) {
|
for (let i = 0; i < ids.length; i++) {
|
||||||
const { key, value } = ids[i];
|
const { key, value } = ids[i];
|
||||||
await redis.set(key, value);
|
await redis.set(key, value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialization
|
async function get(key) {
|
||||||
const redis = process.env.REDIS_URL && (global[REDIS] || getClient());
|
await connect();
|
||||||
|
|
||||||
(async () => {
|
return redis.get(key);
|
||||||
if (redis && !(await redis.get(INITIALIZED))) {
|
}
|
||||||
|
|
||||||
|
async function set(key, value) {
|
||||||
|
await connect();
|
||||||
|
|
||||||
|
return redis.set(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function connect() {
|
||||||
|
if (!redis) {
|
||||||
|
process.env.REDIS_URL && (global[REDIS] || getClient());
|
||||||
|
|
||||||
|
if (!(await redis.get(INITIALIZED))) {
|
||||||
await stageData();
|
await stageData();
|
||||||
}
|
}
|
||||||
})();
|
}
|
||||||
|
|
||||||
export default { client: redis, stageData, log };
|
return redis;
|
||||||
|
}
|
||||||
|
|
||||||
|
export default { enabled, client: redis, log, connect, get, set, stageData };
|
||||||
|
@ -1,14 +1,13 @@
|
|||||||
import { parseToken } from 'next-basics';
|
import { parseToken } from 'next-basics';
|
||||||
import { validate } from 'uuid';
|
import { validate } from 'uuid';
|
||||||
import { uuid } from 'lib/crypto';
|
import { secret, uuid } from 'lib/crypto';
|
||||||
import redis, { DELETED } from 'lib/redis';
|
import redis, { DELETED } from 'lib/redis';
|
||||||
|
import clickhouse from 'lib/clickhouse';
|
||||||
import { getClientInfo, getJsonBody } from 'lib/request';
|
import { getClientInfo, getJsonBody } from 'lib/request';
|
||||||
import { createSession, getSessionByUuid, getWebsiteByUuid } from 'queries';
|
import { createSession, getSessionByUuid, getWebsiteByUuid } from 'queries';
|
||||||
|
|
||||||
export async function getSession(req) {
|
export async function getSession(req) {
|
||||||
const { payload } = getJsonBody(req);
|
const { payload } = getJsonBody(req);
|
||||||
const hasRedis = process.env.REDIS_URL;
|
|
||||||
const hasClickhouse = process.env.CLICKHOUSE_URL;
|
|
||||||
|
|
||||||
if (!payload) {
|
if (!payload) {
|
||||||
throw new Error('Invalid request');
|
throw new Error('Invalid request');
|
||||||
@ -17,7 +16,7 @@ export async function getSession(req) {
|
|||||||
const cache = req.headers['x-umami-cache'];
|
const cache = req.headers['x-umami-cache'];
|
||||||
|
|
||||||
if (cache) {
|
if (cache) {
|
||||||
const result = await parseToken(cache);
|
const result = await parseToken(cache, secret());
|
||||||
|
|
||||||
if (result) {
|
if (result) {
|
||||||
return result;
|
return result;
|
||||||
@ -33,8 +32,8 @@ export async function getSession(req) {
|
|||||||
let websiteId = null;
|
let websiteId = null;
|
||||||
|
|
||||||
// Check if website exists
|
// Check if website exists
|
||||||
if (hasRedis) {
|
if (redis.enabled) {
|
||||||
websiteId = Number(await redis.client.get(`website:${website_uuid}`));
|
websiteId = Number(await redis.get(`website:${website_uuid}`));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check database if does not exists in Redis
|
// Check database if does not exists in Redis
|
||||||
@ -53,10 +52,10 @@ export async function getSession(req) {
|
|||||||
let sessionId = null;
|
let sessionId = null;
|
||||||
let session = null;
|
let session = null;
|
||||||
|
|
||||||
if (!hasClickhouse) {
|
if (!clickhouse.enabled) {
|
||||||
// Check if session exists
|
// Check if session exists
|
||||||
if (hasRedis) {
|
if (redis.enabled) {
|
||||||
sessionId = Number(await redis.client.get(`session:${session_uuid}`));
|
sessionId = Number(await redis.get(`session:${session_uuid}`));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check database if does not exists in Redis
|
// Check database if does not exists in Redis
|
||||||
|
@ -41,7 +41,7 @@ export async function deleteAccount(user_id) {
|
|||||||
.then(async res => {
|
.then(async res => {
|
||||||
if (redis.client) {
|
if (redis.client) {
|
||||||
for (let i = 0; i < websiteUuids.length; i++) {
|
for (let i = 0; i < websiteUuids.length; i++) {
|
||||||
await redis.client.set(`website:${websiteUuids[i]}`, DELETED);
|
await redis.set(`website:${websiteUuids[i]}`, DELETED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user