Some environment setup note for myself. We start on debian11
comfort tooling:
apt -y install fish ripgrep rsync
docker to run binary-builder:
apt -y install apparmor-{utils,profiles}
docker pull
# long wait
code tooling & build CH:
mkdir -p /root/ch-binary/
apt -y install build-essential ccache git ca-certificates
git clone --recursive
# long wait
cd ClickHouse
git checkout v22.3.11.12-lts --recurse-submodules
# long wait, with manual cleaning
git status
# very long wait
docker/packager/packager --cache=ccache --output-dir=/root/ch-binary/ --package-type=binary --build-type=debug --compiler=clang-13 --docker-image-version=39450-amd64
kafka barebone, from
apt -y install default-jre-headless curl
curl -LO
tar xf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1/
bin/ config/ &
bin/ config/ &
cd kafka_2.13-3.2.1/
bin/ --create --topic 2-partitions --partitions 2 --bootstrap-server localhost:9092
bin/ --create --topic 1-partition --bootstrap-server localhost:9092
bin/ --bootstrap-server 127.1:9092 --describe
Topic: 1-partition TopicId: M70Y5dsDQeu7oE-nIZeDbg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: 1-partition Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: 2-partitions TopicId: 8nzR9D6hSJWiyolwy_DN_Q PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: 2-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: 2-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
optionally otel/zipkin export
docker run --network host otel/opentelemetry-collector:0.54.0
CREATE MATERIALIZED VIEW IF NOT EXISTS system.zipkin_spans ENGINE = URL('', 'JSONEachRow') SETTINGS output_format_json_named_tuples_as_objects = 1, output_format_json_quote_64bit_integers = 0, output_format_json_array_of_rows = 1 AS SELECT lower(hex(reinterpretAsFixedString(trace_id))) AS traceId, lower(hex(parent_span_id)) AS parentId, lower(hex(span_id)) AS id, operation_name AS name, start_time_us AS timestamp, finish_time_us - start_time_us AS duration, cast(tuple('clickhouse'), 'Tuple(serviceName text)') AS localEndpoint, cast(tuple(attribute.values[indexOf(attribute.names, 'db.statement')]), 'Tuple("db.statement" text)') AS tags FROM system.opentelemetry_span_log;
CREATE MATERIALIZED VIEW IF NOT EXISTS system.zipkin_spans (`traceId` String, `parentId` String, `id` String, `name` String, `timestamp` UInt64, `duration` Int64, `localEndpoint` Tuple(serviceName String), `tags` Tuple(`db.statement` String, `` String, `ch.thread_id` String, `ch.query_id` String, `ch.query_status` String)) ENGINE = URL('', 'JSONEachRow') SETTINGS output_format_json_named_tuples_as_objects = 1, output_format_json_quote_64bit_integers = 0, output_format_json_array_of_rows = 1 AS SELECT lower(hex(reinterpretAsFixedString(trace_id))) AS traceId, lower(hex(parent_span_id)) AS parentId, lower(hex(span_id)) AS id, operation_name AS name, start_time_us AS timestamp, finish_time_us - start_time_us AS duration, CAST(tuple('vm415v'), 'Tuple(serviceName text)') AS localEndpoint, CAST((attribute.values[indexOf(attribute.names, 'db.statement')], FQDN(), attribute.values[indexOf(attribute.names, 'clickhouse.thread_id')], attribute.values[indexOf(attribute.names, 'clickhouse.query_id')], attribute.values[indexOf(attribute.names, 'clickhouse.query_status')]), 'Tuple("db.statement" text,"" text,"ch.thread_id" text,"ch.query_id" text, "ch.query_status" text)') AS tags FROM system.opentelemetry_span_log;
CREATE TABLE IF NOT EXISTS etl.trashbin(s String) ENGINE = Null;
rr, bytehound (nee memory-profiler) tooling:
apt -y install yarnpkg gdb libc6-dbg heaptrack lsof
curl -LO$(uname -m).deb
apt install ./rr*deb
# git version
git clone
cd bytehound
cargo b --release -p bytehound-preload
cargo b --release -p bytehound-cli
# 0.9.0
mkdir 0.9.0/
curl -LO | tar -C 0.9.0/ zxf -
bytehound environment:
silent-ish CH config, though the opentelemetry table doesn't show up:
engine MergeTree
partition by toYYYYMM(finish_date)
order by (finish_date, finish_time_us, trace_id)
<query_thread_log remove="1" />
<query_log remove="1" />
<query_views_log remove="1" />
<part_log remove="1"/>
<session_log remove="1"/>
<text_log remove="1" />
<trace_log remove="1"/>
<crash_log remove="1"/>
<zookeeper_log remove="1"/>
internal graphite metrics logging:
Now the leak:
CREATE TABLE IF NOT EXISTS etl.trashbin(s String) ENGINE = Null;
CREATE TABLE IF NOT EXISTS etl.consume_kafka1 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '', kafka_topic_list = '1-partition'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline1 TO etl.trashbin AS SELECT s FROM etl.consume_kafka1;
CREATE TABLE IF NOT EXISTS etl.consume_kafka2 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '', kafka_topic_list = '1-partition'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline2 TO etl.trashbin AS SELECT s FROM etl.consume_kafka2;
when 1 pipeline is idle, something keeps leaking memory. With more `kafka_num_consumers` per pipeline, it also leaks faster. Dropping either MV stops the polling, thus stops the leaking. I hoped to inspect the threads more easily with a 2 pipelines setup.
how about a error streaming table?
CREATE TABLE IF NOT EXISTS etl.consume_kafka1 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '', kafka_topic_list = '1-partition'
, kafka_handle_error_mode = 'stream'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline1 TO etl.trashbin AS SELECT s FROM etl.consume_kafka1
WHERE length(_error) >0
CREATE TABLE IF NOT EXISTS etl.consume_kafka2 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '', kafka_topic_list = '1-partition'
, kafka_handle_error_mode = 'stream'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline2 TO etl.trashbin AS SELECT s FROM etl.consume_kafka2
WHERE length(_error) >0