Kafka Settings
Related Topic: Kafka
Kafka configuration in Teranode is primarily specified through URLs. Each Kafka topic has its own URL with parameters that control its behavior. The URL format supports both production Kafka and in-memory testing.
Kafka URL Format
Production Kafka URL Format
kafka://host1,host2,.../topic?param1=value1¶m2=value2&...
Components of the URL:
- Scheme: Always
kafka:// - Hosts: Comma-separated list of Kafka brokers (e.g.,
localhost:9092,kafka2:9092) - Topic: The Kafka topic name (specified as the path component)
- Parameters: Query parameters that configure specific behavior
Example:
kafka://localhost:9092/blocks?partitions=4&consumer_ratio=2&replication=3
In-Memory Kafka URL Format (Testing)
memory://topic?param1=value1¶m2=value2&...
Components of the URL:
- Scheme: Always
memory:// - Topic: The in-memory topic name (specified as the path component)
- Parameters: Same query parameters as production Kafka
Example:
memory://test_blocks?partitions=2&consumer_ratio=1
Usage: Automatically enabled for dev/test contexts (KAFKA_SCHEMA.dev = memory in settings.conf). Eliminates need for running Kafka cluster during development. For Docker-based Kafka, override with KAFKA_SCHEMA.dev = kafka in settings_local.conf.
URL Parameters
Consumer Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
partitions |
int | 1 | Number of topic partitions |
replay |
int | 1 | Start from beginning (1) or latest (0) for new consumer groups |
offsetReset |
string | "" | Offset reset strategy: "latest", "earliest", or "" (uses replay). Overrides replay setting |
maxProcessingTime |
int | 100 | Max time (ms) to process a message before Sarama stops fetching. Must exceed actual processing time |
sessionTimeout |
int | 10000 | Time (ms) broker waits for heartbeat before considering consumer dead. Must be >= 3 * heartbeatInterval |
heartbeatInterval |
int | 3000 | Frequency (ms) of heartbeats sent to broker |
rebalanceTimeout |
int | 60000 | Max time (ms) for all consumers to join rebalance |
channelBufferSize |
int | 256 | Number of messages buffered in internal channels |
consumerTimeout |
int | 90000 | Watchdog timeout (ms). Triggers recovery if no messages received and Setup() not called |
Producer Parameters (Async)
| Parameter | Type | Default | Description |
|---|---|---|---|
partitions |
int | 1 | Number of topic partitions |
replication |
int | 1 | Replication factor |
retention |
string | "600000" | Message retention period in milliseconds (10 minutes default) |
segment_bytes |
string | "1073741824" | Maximum size of a single log segment file (1GB default) |
flush_bytes |
int | 1048576 | Bytes to accumulate before flushing (1MB default) |
flush_messages |
int | 50000 | Messages to accumulate before flushing |
flush_frequency |
duration | "10s" | Maximum time between flushes |
Producer Parameters (Sync)
| Parameter | Type | Default | Description |
|---|---|---|---|
partitions |
int | 1 | Number of topic partitions |
replication |
int | 1 | Replication factor |
retention |
string | "600000" | Message retention period in milliseconds (10 minutes default) |
segment_bytes |
string | "1073741824" | Maximum size of a single log segment file (1GB default) |
flush_bytes |
int | 1024 | Bytes to accumulate before flushing |
Example Consumer URL:
kafka://localhost:9092/subtrees?partitions=8&sessionTimeout=15000&heartbeatInterval=5000&maxProcessingTime=30000
Example Async Producer URL:
kafka://localhost:9092/blocks?partitions=4&replication=3&retention=3600000&flush_frequency=5s&flush_messages=100000
Individual Settings
Topic Names
| Setting | Default | Environment Variable | Usage |
|---|---|---|---|
| Blocks | "blocks" | KAFKA_BLOCKS | Block data messages |
| BlocksFinal | "blocks-final" | KAFKA_BLOCKS_FINAL | Finalized block announcements |
| InvalidBlocks | "invalid-blocks" | KAFKA_INVALID_BLOCKS | Invalid block notifications |
| InvalidSubtrees | "invalid-subtrees" | KAFKA_INVALID_SUBTREES | Invalid subtree notifications |
| LegacyInv | "legacy-inv" | KAFKA_LEGACY_INV | Legacy inventory messages |
| RejectedTx | "rejectedtx" | KAFKA_REJECTEDTX | Rejected transaction notifications |
| Subtrees | "subtrees" | KAFKA_SUBTREES | Subtree data messages |
| TxMeta | "txmeta" | KAFKA_TXMETA | Transaction metadata |
| UnitTest | "unittest" | KAFKA_UNITTEST | Unit testing |
Connection Settings
| Setting | Default | Environment Variable | Usage |
|---|---|---|---|
| Hosts | "localhost:9092" | KAFKA_HOSTS | Comma-separated broker addresses |
| Port | 9092 | KAFKA_PORT | Default port when not in hosts |
| Partitions | 1 | KAFKA_PARTITIONS | Default partition count |
| ReplicationFactor | 1 | KAFKA_REPLICATION_FACTOR | Default replication factor |
Consumer Group Settings
Consumer Group ID Pattern: {serviceName}.{clientName}
Examples:
- blockvalidation.defaultClientName
- subtreevalidation.mynode
- p2p.node1
Special Case - Block Persister TxMeta: {serviceName}.{clientName}.{random16chars}
The Block Persister appends a random 16-character suffix to its TxMeta consumer group ID, allowing multiple instances to independently process all messages.
Auto-Commit Behavior:
| Topic | Auto-Commit | Reason |
|---|---|---|
| Blocks | Disabled | Critical - must not miss messages |
| RejectedTx | Enabled | Can tolerate message loss |
| Subtrees | Enabled | Can tolerate message loss |
| TxMeta | Enabled | Cache population - can tolerate loss |
| ValidatorTxs | Enabled | Optional feature |
| InvalidBlocks | Enabled | Optional feature |
| InvalidSubtrees | Enabled | Optional feature |
TLS Settings
| Setting | Default | Environment Variable | Usage |
|---|---|---|---|
| EnableTLS | false | KAFKA_ENABLE_TLS | Enable TLS encryption |
| TLSSkipVerify | false | KAFKA_TLS_SKIP_VERIFY | Skip certificate verification (testing only) |
| TLSCAFile | "" | KAFKA_TLS_CA_FILE | CA certificate file path (optional for custom CA) |
| TLSCertFile | "" | KAFKA_TLS_CERT_FILE | Client certificate file path (required with TLSKeyFile for mutual TLS) |
| TLSKeyFile | "" | KAFKA_TLS_KEY_FILE | Client key file path (required with TLSCertFile for mutual TLS) |
TLS Requirements:
TLSCertFileandTLSKeyFilemust both be provided together for mutual TLS authenticationTLSSkipVerify=trueshould only be used in development/testing environments- When
EnableTLS=true, all Kafka connections use TLS encryption
Debug Settings
| Setting | Default | Environment Variable | Usage |
|---|---|---|---|
| EnableDebugLogging | false | kafka_enable_debug_logging | Verbose Sarama logging |
URL-Based Configuration
Config URL Settings
| Setting | Environment Variable | Usage |
|---|---|---|
| ValidatorTxsConfig | kafka_validatortxsConfig | Validator transaction messages |
| TxMetaConfig | kafka_txmetaConfig | Transaction metadata |
| LegacyInvConfig | kafka_legacyInvConfig | Legacy inventory messages |
| BlocksFinalConfig | kafka_blocksFinalConfig | Finalized blocks |
| RejectedTxConfig | kafka_rejectedTxConfig | Rejected transactions |
| InvalidBlocksConfig | kafka_invalidBlocksConfig | Invalid blocks |
| InvalidSubtreesConfig | kafka_invalidSubtreesConfig | Invalid subtrees |
| SubtreesConfig | kafka_subtreesConfig | Subtrees |
| BlocksConfig | kafka_blocksConfig | Blocks |
Configuration Priority
URL-based configuration overrides individual settings when provided:
- URL Config (e.g.,
InvalidBlocksConfig) - highest priority - Individual Settings (e.g.,
InvalidBlocks,Hosts,Port) - fallback
Consumer Timeout Constraints
Critical Validation Rule:
sessionTimeout >= 3 * heartbeatInterval
This constraint is enforced by Sarama. Consumer creation will fail if violated.
Example Valid Configuration:
heartbeatInterval=5000(5s)sessionTimeout=15000(15s) ✓ Valid: 15000 >= 3 * 5000
Example Invalid Configuration:
heartbeatInterval=5000(5s)sessionTimeout=10000(10s) ✗ Invalid: 10000 < 15000
Consumer Watchdog
The consumer watchdog monitors for stuck consumers and automatically recovers by recreating the consumer group.
Behavior:
- Checks every 30 seconds
- Triggers recovery if
Consume()is stuck for longer thanconsumerTimeout(default 90s) - Detects RefreshMetadata hangs and offset-related issues
- Automatically recreates consumer group on recovery
Configuration:
consumerTimeoutURL parameter (default: 90000ms)
Service Usage
Block Assembly Service
- Producer:
BlocksConfig- publishes blocks - Producer:
SubtreesConfig- publishes subtrees
Block Validation Service
- Consumer:
BlocksConfig- consumes blocks for validation - Producer:
InvalidBlocksConfig- publishes invalid blocks (optional)
Blockchain Service
- Producer:
BlocksFinalConfig- publishes finalized blocks
Subtree Validation Service
- Consumer:
SubtreesConfig- consumes subtrees for validation - Producer:
InvalidSubtreesConfig- publishes invalid subtrees (optional)
Validator Service
- Consumer:
ValidatorTxsConfig- consumes transactions for validation (optional) - Producer:
ValidatorTxsConfig- publishes validation results (optional) - Producer:
RejectedTxConfig- publishes rejected transactions
Propagation Service
- Consumer:
RejectedTxConfig- consumes rejected transactions
Block Persister Service
- Consumer:
TxMetaConfig- consumes transaction metadata (with random consumer group suffix)
Legacy Service
- Producer:
LegacyInvConfig- publishes legacy inventory messages - Consumer:
BlocksFinalConfig- consumes finalized blocks - Consumer:
TxMetaConfig- consumes transaction metadata
P2P Service
- Uses
InvalidBlocksConfigor constructs URL fromInvalidBlocks,Hosts,Port - Applies TLS settings from KafkaSettings
- Consumer group:
{topic}-consumer
Legacy Service
- Uses
LegacyInvConfig,BlocksFinalConfig,TxMetaConfig - Applies TLS settings from KafkaSettings
Blockchain Service
- Uses async producer for block notifications
- Applies TLS settings from KafkaSettings
Configuration Examples
High-Throughput Producer
kafka://localhost:9092/blocks?partitions=8&replication=3&retention=3600000&flush_bytes=10485760&flush_messages=100000&flush_frequency=5s
Slow Processing Consumer
kafka://localhost:9092/subtrees?partitions=4&maxProcessingTime=30000&sessionTimeout=60000&heartbeatInterval=20000&consumerTimeout=120000
Low-Latency Producer
kafka://localhost:9092/txmeta?partitions=4&flush_bytes=524288&flush_messages=1000&flush_frequency=1s
Offset Reset Consumer
kafka://localhost:9092/blocks?partitions=2&offsetReset=latest&replay=0
TLS-Enabled Configuration
Environment variables:
KAFKA_ENABLE_TLS=true
KAFKA_TLS_CA_FILE=/path/to/ca.pem
KAFKA_TLS_CERT_FILE=/path/to/client-cert.pem
KAFKA_TLS_KEY_FILE=/path/to/client-key.pem
kafka_blocksConfig=kafka://broker1:9093,broker2:9093/blocks?partitions=4
Note: KAFKA_TLS_CERT_FILE and KAFKA_TLS_KEY_FILE must both be provided for mutual TLS authentication. Omit both for server-only TLS.
Memory Testing
memory://test_blocks?partitions=2&replay=1