update CH kafka engine tables for error handling

This commit is contained in:
Francis Cao 2023-05-07 23:13:40 -07:00
parent e92d958b24
commit d827bf1417

View File

@ -58,7 +58,10 @@ CREATE TABLE umami.website_event_queue (
--event
event_type UInt32,
event_name String,
created_at DateTime('UTC')
created_at DateTime('UTC'),
--virtual columns
_error String,
_raw_message String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input broker list
@ -66,7 +69,7 @@ SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input bro
kafka_group_name = 'event_consumer_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1048576,
kafka_skip_broken_messages = 100;
kafka_handle_error_mode = 'stream'
CREATE MATERIALIZED VIEW umami.website_event_queue_mv TO umami.website_event AS
SELECT website_id,
@ -93,6 +96,19 @@ SELECT website_id,
created_at
FROM umami.website_event_queue;
CREATE MATERIALIZED VIEW umami.website_event_errors_mv
(
error String,
raw String
)
ENGINE = MergeTree
ORDER BY (error, raw)
SETTINGS index_granularity = 8192 AS
SELECT _error AS error,
_raw_message AS raw
FROM umami.website_event_queue
WHERE length(_error) > 0
CREATE TABLE umami.event_data
(
website_id UUID,
@ -122,7 +138,10 @@ CREATE TABLE umami.event_data_queue (
event_numeric_value Nullable(Decimal64(4)), --922337203685477.5625
event_date_value Nullable(DateTime('UTC')),
event_data_type UInt32,
created_at DateTime('UTC')
created_at DateTime('UTC'),
--virtual columns
_error String,
_raw_message String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input broker list
@ -130,7 +149,7 @@ SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input bro
kafka_group_name = 'event_data_consumer_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1048576,
kafka_skip_broken_messages = 100;
kafka_handle_error_mode = 'stream'
CREATE MATERIALIZED VIEW umami.event_data_queue_mv TO umami.event_data AS
SELECT website_id,
@ -144,4 +163,17 @@ SELECT website_id,
event_date_value,
event_data_type,
created_at
FROM umami.event_data_queue;
FROM umami.event_data_queue;
CREATE MATERIALIZED VIEW umami.event_data_errors_mv
(
error String,
raw String
)
ENGINE = MergeTree
ORDER BY (error, raw)
SETTINGS index_granularity = 8192 AS
SELECT _error AS error,
_raw_message AS raw
FROM umami.event_data_queue
WHERE length(_error) > 0