From d827bf1417dd989c314f0253b1bafe55c0ccf6d6 Mon Sep 17 00:00:00 2001 From: Francis Cao Date: Sun, 7 May 2023 23:13:40 -0700 Subject: [PATCH] update CH kafka engine tables for error handling --- db/clickhouse/schema.sql | 42 +++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/db/clickhouse/schema.sql b/db/clickhouse/schema.sql index 77176413..8f48b434 100644 --- a/db/clickhouse/schema.sql +++ b/db/clickhouse/schema.sql @@ -58,7 +58,10 @@ CREATE TABLE umami.website_event_queue ( --event event_type UInt32, event_name String, - created_at DateTime('UTC') + created_at DateTime('UTC'), + --virtual columns + _error String, + _raw_message String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input broker list @@ -66,7 +69,7 @@ SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input bro kafka_group_name = 'event_consumer_group', kafka_format = 'JSONEachRow', kafka_max_block_size = 1048576, - kafka_skip_broken_messages = 100; + kafka_handle_error_mode = 'stream' CREATE MATERIALIZED VIEW umami.website_event_queue_mv TO umami.website_event AS SELECT website_id, @@ -93,6 +96,19 @@ SELECT website_id, created_at FROM umami.website_event_queue; +CREATE MATERIALIZED VIEW umami.website_event_errors_mv +( + error String, + raw String +) +ENGINE = MergeTree +ORDER BY (error, raw) +SETTINGS index_granularity = 8192 AS +SELECT _error AS error, + _raw_message AS raw +FROM umami.website_event_queue +WHERE length(_error) > 0 + CREATE TABLE umami.event_data ( website_id UUID, @@ -122,7 +138,10 @@ CREATE TABLE umami.event_data_queue ( event_numeric_value Nullable(Decimal64(4)), --922337203685477.5625 event_date_value Nullable(DateTime('UTC')), event_data_type UInt32, - created_at DateTime('UTC') + created_at DateTime('UTC'), + --virtual columns + _error String, + _raw_message String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input broker list @@ -130,7 +149,7 @@ SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input bro kafka_group_name = 'event_data_consumer_group', kafka_format = 'JSONEachRow', kafka_max_block_size = 1048576, - kafka_skip_broken_messages = 100; + kafka_handle_error_mode = 'stream' CREATE MATERIALIZED VIEW umami.event_data_queue_mv TO umami.event_data AS SELECT website_id, @@ -144,4 +163,17 @@ SELECT website_id, event_date_value, event_data_type, created_at -FROM umami.event_data_queue; \ No newline at end of file +FROM umami.event_data_queue; + +CREATE MATERIALIZED VIEW umami.event_data_errors_mv +( + error String, + raw String +) +ENGINE = MergeTree +ORDER BY (error, raw) +SETTINGS index_granularity = 8192 AS +SELECT _error AS error, + _raw_message AS raw +FROM umami.event_data_queue +WHERE length(_error) > 0 \ No newline at end of file