Some environment setup note for myself. We start on debian11
comfort tooling:
```sh
apt -y install fish ripgrep rsync
```
docker to run binary-builder:
```sh
apt -y install docker.io apparmor-{utils,profiles}
docker pull docker.io/clickhouse/binary-builder:39450-amd64
# long wait
```
code tooling & build CH:
```sh
mkdir -p /root/ch-binary/
apt -y install build-essential ccache git ca-certificates
git clone https://github.com/ClickHouse/ClickHouse --recursive
# long wait
cd ClickHouse
git checkout v22.3.11.12-lts --recurse-submodules
# long wait, with manual cleaning
git status
# very long wait
docker/packager/packager --cache=ccache --output-dir=/root/ch-binary/ --package-type=binary --build-type=debug --compiler=clang-13 --docker-image-version=39450-amd64
```
kafka barebone, from https://kafka.apache.org/quickstart:
```sh
apt -y install default-jre-headless curl
curl -LO https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar xf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1/
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
```
```sh
cd kafka_2.13-3.2.1/
bin/kafka-topics.sh --create --topic 2-partitions --partitions 2 --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic 1-partition --bootstrap-server localhost:9092
bin/kafka-topics.sh --bootstrap-server 127.1:9092 --describe
```
```text
Topic: 1-partition TopicId: M70Y5dsDQeu7oE-nIZeDbg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: 1-partition Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: 2-partitions TopicId: 8nzR9D6hSJWiyolwy_DN_Q PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: 2-partitions Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: 2-partitions Partition: 1 Leader: 0 Replicas: 0 Isr: 0
```
optionally otel/zipkin export
```sh
docker run --network host otel/opentelemetry-collector:0.54.0
```
```sql
CREATE MATERIALIZED VIEW IF NOT EXISTS system.zipkin_spans ENGINE = URL('http://127.4.1.1:9411/api/v2/spans', 'JSONEachRow') SETTINGS output_format_json_named_tuples_as_objects = 1, output_format_json_quote_64bit_integers = 0, output_format_json_array_of_rows = 1 AS SELECT lower(hex(reinterpretAsFixedString(trace_id))) AS traceId, lower(hex(parent_span_id)) AS parentId, lower(hex(span_id)) AS id, operation_name AS name, start_time_us AS timestamp, finish_time_us - start_time_us AS duration, cast(tuple('clickhouse'), 'Tuple(serviceName text)') AS localEndpoint, cast(tuple(attribute.values[indexOf(attribute.names, 'db.statement')]), 'Tuple("db.statement" text)') AS tags FROM system.opentelemetry_span_log;
CREATE MATERIALIZED VIEW IF NOT EXISTS system.zipkin_spans (`traceId` String, `parentId` String, `id` String, `name` String, `timestamp` UInt64, `duration` Int64, `localEndpoint` Tuple(serviceName String), `tags` Tuple(`db.statement` String, `db.host` String, `ch.thread_id` String, `ch.query_id` String, `ch.query_status` String)) ENGINE = URL('http://127.4.1.1:9411/api/v2/spans', 'JSONEachRow') SETTINGS output_format_json_named_tuples_as_objects = 1, output_format_json_quote_64bit_integers = 0, output_format_json_array_of_rows = 1 AS SELECT lower(hex(reinterpretAsFixedString(trace_id))) AS traceId, lower(hex(parent_span_id)) AS parentId, lower(hex(span_id)) AS id, operation_name AS name, start_time_us AS timestamp, finish_time_us - start_time_us AS duration, CAST(tuple('vm415v'), 'Tuple(serviceName text)') AS localEndpoint, CAST((attribute.values[indexOf(attribute.names, 'db.statement')], FQDN(), attribute.values[indexOf(attribute.names, 'clickhouse.thread_id')], attribute.values[indexOf(attribute.names, 'clickhouse.query_id')], attribute.values[indexOf(attribute.names, 'clickhouse.query_status')]), 'Tuple("db.statement" text,"db.host" text,"ch.thread_id" text,"ch.query_id" text, "ch.query_status" text)') AS tags FROM system.opentelemetry_span_log;
```
```sql
CREATE DATABASE IF NOT EXISTS etl;
CREATE TABLE IF NOT EXISTS etl.trashbin(s String) ENGINE = Null;
```
rr, bytehound (nee memory-profiler) tooling:
```sh
apt -y install yarnpkg gdb libc6-dbg heaptrack lsof
curl -LO https://github.com/rr-debugger/rr/releases/download/5.6.0/rr-5.6.0-Linux-$(uname -m).deb
apt install ./rr*deb
# git version
git clone https://github.com/koute/bytehound
cd bytehound
cargo b --release -p bytehound-preload
cargo b --release -p bytehound-cli
# 0.9.0
mkdir 0.9.0/
curl -LO https://github.com/koute/bytehound/releases/download/0.9.0/bytehound-x86_64-unknown-linux-gnu.tgz | tar -C 0.9.0/ zxf -
```
bytehound environment:
```fish
set -x MEMORY_PROFILER_LOG=debug
```
silent-ish CH config, though the opentelemetry table doesn't show up:
```xml
<yandex>
<shutdown_wait_unfinished>60</shutdown_wait_unfinished>
<opentelemetry_span_log>
<engine>
engine MergeTree
partition by toYYYYMM(finish_date)
order by (finish_date, finish_time_us, trace_id)
</engine>
<database>system</database>
<table>opentelemetry_span_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</opentelemetry_span_log>
<logger><level>trace</level></logger>
<query_thread_log remove="1" />
<query_log remove="1" />
<query_views_log remove="1" />
<part_log remove="1"/>
<session_log remove="1"/>
<text_log remove="1" />
<trace_log remove="1"/>
<crash_log remove="1"/>
<zookeeper_log remove="1"/>
<disable_internal_dns_cache>1</disable_internal_dns_cache>
</yandex>
```
internal graphite metrics logging:
```xml
<yandex>
<graphite>
<host>in-carbon</host>
<port>2023</port>
<timeout>5</timeout>
<interval>60</interval>
<root_path>direct.vm415</root_path>
<metrics>true</metrics>
<events>true</events>
<asynchronous_metrics>true</asynchronous_metrics>
</graphite>
</yandex>
```
Now the leak:
```sql
CREATE DATABASE IF NOT EXISTS etl;
CREATE TABLE IF NOT EXISTS etl.trashbin(s String) ENGINE = Null;
```
```sql
CREATE TABLE IF NOT EXISTS etl.consume_kafka1 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '127.1.1.1:9092', kafka_topic_list = '1-partition'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline1 TO etl.trashbin AS SELECT s FROM etl.consume_kafka1;
CREATE TABLE IF NOT EXISTS etl.consume_kafka2 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '127.2.2.2:9092', kafka_topic_list = '1-partition'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline2 TO etl.trashbin AS SELECT s FROM etl.consume_kafka2;
```
when 1 pipeline is idle, something keeps leaking memory. With more `kafka_num_consumers` per pipeline, it also leaks faster. Dropping either MV stops the polling, thus stops the leaking. I hoped to inspect the threads more easily with a 2 pipelines setup.
how about a error streaming table?
```sql
CREATE TABLE IF NOT EXISTS etl.consume_kafka1 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '127.1.1.1:9092', kafka_topic_list = '1-partition'
, kafka_handle_error_mode = 'stream'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline1 TO etl.trashbin AS SELECT s FROM etl.consume_kafka1
WHERE length(_error) >0
;
CREATE TABLE IF NOT EXISTS etl.consume_kafka2 (s String) ENGINE = Kafka()
SETTINGS kafka_format = 'CSV', kafka_group_name = 'CHgroup'
, kafka_broker_list = '127.2.2.2:9092', kafka_topic_list = '1-partition'
, kafka_handle_error_mode = 'stream'
, kafka_num_consumers=1;
CREATE MATERIALIZED VIEW IF NOT EXISTS etl.pipeline2 TO etl.trashbin AS SELECT s FROM etl.consume_kafka2
WHERE length(_error) >0
;
```