Kafka Settings
Related Topic: Kafka
Kafka configuration in Teranode is primarily specified through URLs. Each Kafka topic has its own URL with parameters that control its behavior. The URL format supports both production Kafka and in-memory testing.
Kafka URL Format
Production Kafka URL Format
kafka://host1,host2,.../topic?param1=value1¶m2=value2&...
Components of the URL:
- Scheme: Always
kafka:// - Hosts: Comma-separated list of Kafka brokers (e.g.,
localhost:9092,kafka2:9092) - Topic: The Kafka topic name (specified as the path component)
- Parameters: Query parameters that configure specific behavior
Example:
kafka://localhost:9092/blocks?partitions=4&consumer_ratio=2&replication=3
In-Memory Kafka URL Format (Testing)
memory://topic?param1=value1¶m2=value2&...
Components of the URL:
- Scheme: Always
memory:// - Topic: The in-memory topic name (specified as the path component)
- Parameters: Same query parameters as production Kafka
Example:
memory://test_blocks?partitions=2&consumer_ratio=1
Usage: The memory scheme is automatically detected by the Kafka utilities and enables in-memory message passing for unit tests and development environments. This eliminates the need for a running Kafka cluster during testing.
URL Parameters
Consumer Configuration Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
partitions |
int | 1 | Number of topic partitions |
replay |
int | 1 | Start from beginning (1) or latest (0) |
maxProcessingTime |
int | 100 | Max message processing time (ms) |
sessionTimeout |
int | 10000 | Session timeout (ms) |
heartbeatInterval |
int | 3000 | Heartbeat interval (ms) |
rebalanceTimeout |
int | 60000 | Rebalance timeout (ms) |
channelBufferSize |
int | 256 | Internal buffer size |
consumerTimeout |
int | 90000 | Watchdog timeout (ms) |
offsetReset |
string | "latest" | "latest", "earliest", or "" |
Example Consumer URL:
kafka://localhost:9092/transactions?partitions=4&replay=0&sessionTimeout=15000
Producer Configuration Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
partitions |
int | 1 | Number of topic partitions |
replication |
int | 1 | Replication factor |
retention |
string | "600000" | Retention period (ms) |
segment_bytes |
string | "1073741824" | Segment size in bytes |
flush_bytes |
int | 1048576 | Flush threshold in bytes |
flush_messages |
int | 50000 | Messages before flush |
flush_frequency |
string | "10s" | Flush frequency |
Example Producer URL:
kafka://localhost:9092/blocks?partitions=2&replication=3&retention=3600000&flush_frequency=5s
Individual Settings
Topic Names
| Setting | Default | Environment Variable | Usage |
|---|---|---|---|
| Blocks | "blocks" | KAFKA_BLOCKS | Block data messages |
| BlocksFinal | "blocks-final" | KAFKA_BLOCKS_FINAL | Finalized block announcements |
| InvalidBlocks | "invalid-blocks" | KAFKA_INVALID_BLOCKS | Invalid block notifications |
| InvalidSubtrees | "invalid-subtrees" | KAFKA_INVALID_SUBTREES | Invalid subtree notifications |
| LegacyInv | "legacy-inv" | KAFKA_LEGACY_INV | Legacy inventory messages |
| RejectedTx | "rejectedtx" | KAFKA_REJECTEDTX | Rejected transaction notifications |
| Subtrees | "subtrees" | KAFKA_SUBTREES | Subtree data messages |
| TxMeta | "txmeta" | KAFKA_TXMETA | Transaction metadata |
| UnitTest | "unittest" | KAFKA_UNITTEST | Unit testing |
Connection Settings
| Setting | Default | Environment Variable | Usage |
|---|---|---|---|
| Hosts | "localhost:9092" | KAFKA_HOSTS | Comma-separated broker addresses |
| Port | 9092 | KAFKA_PORT | Default port when not in hosts |
| Partitions | 1 | KAFKA_PARTITIONS | Default partition count |
| ReplicationFactor | 1 | KAFKA_REPLICATION_FACTOR | Default replication factor |
TLS Settings
| Setting | Default | Environment Variable | Usage |
|---|---|---|---|
| EnableTLS | false | KAFKA_ENABLE_TLS | Enable TLS encryption |
| TLSSkipVerify | false | KAFKA_TLS_SKIP_VERIFY | Skip certificate verification |
| TLSCAFile | "" | KAFKA_TLS_CA_FILE | CA certificate file path |
| TLSCertFile | "" | KAFKA_TLS_CERT_FILE | Client certificate file path |
| TLSKeyFile | "" | KAFKA_TLS_KEY_FILE | Client key file path |
Debug Settings
| Setting | Default | Environment Variable | Usage |
|---|---|---|---|
| EnableDebugLogging | false | kafka_enable_debug_logging | Verbose Sarama logging |
URL-Based Configuration
Config URL Settings
| Setting | Environment Variable | Usage |
|---|---|---|
| ValidatorTxsConfig | kafka_validatortxsConfig | Validator transaction messages |
| TxMetaConfig | kafka_txmetaConfig | Transaction metadata |
| LegacyInvConfig | kafka_legacyInvConfig | Legacy inventory messages |
| BlocksFinalConfig | kafka_blocksFinalConfig | Finalized blocks |
| RejectedTxConfig | kafka_rejectedTxConfig | Rejected transactions |
| InvalidBlocksConfig | kafka_invalidBlocksConfig | Invalid blocks |
| InvalidSubtreesConfig | kafka_invalidSubtreesConfig | Invalid subtrees |
| SubtreesConfig | kafka_subtreesConfig | Subtrees |
| BlocksConfig | kafka_blocksConfig | Blocks |
Configuration Priority
URL-based configuration overrides individual settings when provided:
- URL Config (e.g.,
InvalidBlocksConfig) - highest priority - Individual Settings (e.g.,
InvalidBlocks,Hosts,Port) - fallback
Timeout Validation
Consumer timeout parameters must satisfy: sessionTimeout >= 3 * heartbeatInterval
Service Usage
P2P Service
- Uses
InvalidBlocksConfigor constructs URL fromInvalidBlocks,Hosts,Port - Applies TLS settings from KafkaSettings
- Consumer group:
{topic}-consumer
Legacy Service
- Uses
LegacyInvConfig,BlocksFinalConfig,TxMetaConfig - Applies TLS settings from KafkaSettings
Blockchain Service
- Uses async producer for block notifications
- Applies TLS settings from KafkaSettings
Configuration Examples
Producer Configuration
kafka://localhost:9092/blocks?partitions=4&replication=3&retention=3600000&flush_frequency=5s
Consumer Configuration
kafka://localhost:9092/subtrees?partitions=8&sessionTimeout=15000&heartbeatInterval=5000
Memory Testing
memory://test_blocks?partitions=2&replay=1