Skip to content

Kafka Message Format Reference Documentation

This document provides comprehensive information about the message formats used in Kafka topics within the Teranode ecosystem. All message formats are defined using Protocol Buffers (protobuf), providing a structured and efficient serialization mechanism.

Index

Protobuf Overview

Protocol Buffers (protobuf) is a language-neutral, platform-neutral, extensible mechanism for serializing structured data. In Teranode, all Kafka messages are defined and serialized using protobuf.

The protobuf definitions for Kafka messages are located in util/kafka/kafka_message/kafka_messages.proto.

Block Notification Message Format

Block Topic

kafka_blocksConfig is the Kafka topic used for broadcasting block notifications. This topic notifies subscribers about new blocks as they are added to the blockchain.

Message Structure

The block notification message is defined in protobuf as KafkaBlockTopicMessage:

message KafkaBlockTopicMessage {
  string hash = 1;  // Block hash (as hex string)
  string URL = 2;  // URL pointing to block data
}

Field Specifications

hash

  • Type: string
  • Description: Hexadecimal string representation of the BSV block hash
  • Required: Yes

URL

  • Type: string
  • Description: URL pointing to the location where the full block data can be retrieved
  • Required: Yes

Example

Here's a JSON representation of the message content (for illustration purposes only; actual messages are protobuf-encoded):

{
  "hash": "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f",
  "URL": "https://datahub.example.com/blocks/123"
}

Code Examples

Sending Messages

// Create the block notification message
blockHash := block.Hash() // assumes this returns *chainhash.Hash
dataHubUrl := "https://datahub.example.com/blocks/123"

// Create a new protobuf message
message := &kafkamessage.KafkaBlockTopicMessage{
    Hash: blockHash.String(), // convert the hash to a string
    URL:  dataHubUrl,
}

// Serialize to protobuf format
data, err := proto.Marshal(message)
if err != nil {
    return fmt.Errorf("failed to serialize block message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: data,
})

Receiving Messages

// Handle incoming block notification message
func handleBlockMessage(msg *kafka.Message) error {
    if msg == nil {
        return nil
    }

    // Deserialize from protobuf format
    blockMessage := &kafkamessage.KafkaBlockTopicMessage{}
    if err := proto.Unmarshal(msg.Value, blockMessage); err != nil {
        return fmt.Errorf("failed to deserialize block message: %w", err)
    }

    // Convert string hash to chainhash.Hash
    blockHash, err := chainhash.NewHashFromStr(blockMessage.Hash)
    if err != nil {
        return fmt.Errorf("invalid block hash: %w", err)
    }

    // Extract DataHub URL
    dataHubUrl := blockMessage.URL

    // Process the block notification...
    log.Printf("Received block notification for %s, data at: %s", blockHash.String(), dataHubUrl)
    return nil
}

Error Cases

  • Invalid message format: Message cannot be unmarshaled to KafkaBlockTopicMessage
  • Empty or malformed hash: Hash is not a valid hexadecimal string representation of a block hash
  • Invalid URL: DataHub URL is empty or not properly formatted

Subtree Notification Message Format

Subtree Topic

kafka_subtreesConfig is the Kafka topic used for broadcasting subtree notifications. This topic notifies subscribers about new subtrees as they are created.

Message Structure

The subtree notification message is defined in protobuf as KafkaSubtreeTopicMessage:

message KafkaSubtreeTopicMessage {
  string hash = 1;  // Subtree hash (as hex string)
  string URL = 2;  // URL pointing to subtree data
}

Field Specifications

hash

  • Type: string
  • Description: Hexadecimal string representation of the BSV subtree hash
  • Required: Yes

URL

  • Type: string
  • Description: URL pointing to the location where the full subtree data can be retrieved
  • Required: Yes

Example

Here's a JSON representation of the message content (for illustration purposes only; actual messages are protobuf-encoded):

{
  "hash": "45a2b856743012ce25a4dabddd5f5bdf534c27c9347b34862bca5a14176d07",
  "URL": "https://datahub.example.com/subtrees/123"
}

Code Examples

Sending Messages

// Create the subtree notification message
subtreeHash := subtree.Hash() // assumes this returns *chainhash.Hash
dataHubUrl := "https://datahub.example.com/subtrees/123"

// Create a new protobuf message
message := &kafkamessage.KafkaSubtreeTopicMessage{
    Hash: subtreeHash.String(), // convert the hash to a string
    URL:  dataHubUrl,
}

// Serialize to protobuf format
data, err := proto.Marshal(message)
if err != nil {
    return fmt.Errorf("failed to serialize subtree message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: data,
})

Receiving Messages

// Handle incoming subtree notification message
func handleSubtreeMessage(msg *kafka.Message) error {
    if msg == nil {
        return nil
    }

    // Deserialize from protobuf format
    subtreeMessage := &kafkamessage.KafkaSubtreeTopicMessage{}
    if err := proto.Unmarshal(msg.Value, subtreeMessage); err != nil {
        return fmt.Errorf("failed to deserialize subtree message: %w", err)
    }

    // Convert string hash to chainhash.Hash
    subtreeHash, err := chainhash.NewHashFromStr(subtreeMessage.Hash)
    if err != nil {
        return fmt.Errorf("invalid subtree hash: %w", err)
    }

    // Extract DataHub URL
    dataHubUrl := subtreeMessage.URL

    // Process the subtree notification...
    log.Printf("Received subtree notification for %s, data at: %s", subtreeHash.String(), dataHubUrl)
    return nil
}

Error Cases

  • Invalid message format: Message cannot be unmarshaled to KafkaSubtreeTopicMessage
  • Empty or malformed hash: Hash is not a valid hexadecimal string representation of a subtree hash
  • Invalid URL: DataHub URL is empty or not properly formatted

Transaction Validation Message Format

Transaction Validation Topic

kafka_validatortxsConfig is the Kafka topic used for sending transactions from the Propagation service to the Validator for validation.

Message Structure

The transaction validation message is defined in protobuf as KafkaTxValidationTopicMessage:

message KafkaTxValidationTopicMessage {
  bytes tx = 1;                     // Complete BSV transaction
  uint32 height = 2;                // Current blockchain height
  KafkaTxValidationOptions options = 3;  // Optional validation options
}

message KafkaTxValidationOptions {
  bool skipUtxoCreation = 1;        // Skip UTXO creation if true
  bool addTXToBlockAssembly = 2;    // Add transaction to block assembly if true
  bool skipPolicyChecks = 3;        // Skip policy checks if true
  bool createConflicting = 4;       // Allow conflicting transactions if true
}

Field Specifications

tx

  • Type: bytes
  • Description: Raw bytes of the complete BSV transaction
  • Required: Yes

height

  • Type: uint32
  • Description: Current blockchain height, used for validation rules that depend on height
  • Required: Yes

options

  • Type: KafkaTxValidationOptions
  • Description: Special options that modify the validation behavior
  • Required: No (if not provided, default values are used)
KafkaTxValidationOptions
skipUtxoCreation
  • Type: bool
  • Description: When true, the validator will not create UTXO entries for this transaction
  • Default: false
addTXToBlockAssembly
  • Type: bool
  • Description: When true, the validated transaction will be added to block assembly
  • Default: true
skipPolicyChecks
  • Type: bool
  • Description: When true, certain policy validation checks will be skipped
  • Default: false
createConflicting
  • Type: bool
  • Description: When true, the validator may create a transaction that conflicts with existing UTXOs
  • Default: false

Example

Here's a JSON representation of the message content (for illustration purposes only; actual messages are protobuf-encoded):

{
  "tx": "<binary data - variable length>",
  "height": 12345,
  "options": {
    "skipUtxoCreation": false,
    "addTXToBlockAssembly": true,
    "skipPolicyChecks": false,
    "createConflicting": false
  }
}

Code Examples

Sending Messages

// Create the transaction validation message
transactionBytes := tx.Serialize() // serialized transaction bytes
currentHeight := uint32(12345)     // current blockchain height

// Create options (using defaults in this example)
options := &kafkamessage.KafkaTxValidationOptions{
    SkipUtxoCreation:     false,
    AddTXToBlockAssembly: true,
    SkipPolicyChecks:     false,
    CreateConflicting:    false,
}

// Create a new protobuf message
message := &kafkamessage.KafkaTxValidationTopicMessage{
    Tx:      transactionBytes,
    Height:  currentHeight,
    Options: options,
}

// Serialize to protobuf format
data, err := proto.Marshal(message)
if err != nil {
    return fmt.Errorf("failed to serialize transaction validation message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: data,
})

Receiving Messages

// Handle incoming transaction validation message
func handleTxValidationMessage(msg *kafka.Message) error {
    if msg == nil {
        return nil
    }

    // Deserialize from protobuf format
    txValidationMessage := &kafkamessage.KafkaTxValidationTopicMessage{}
    if err := proto.Unmarshal(msg.Value, txValidationMessage); err != nil {
        return fmt.Errorf("failed to deserialize transaction validation message: %w", err)
    }

    // Extract transaction data
    txBytes := txValidationMessage.Tx
    height := txValidationMessage.Height
    options := txValidationMessage.Options

    // Parse the transaction
    tx, err := bsvutil.NewTxFromBytes(txBytes)
    if err != nil {
        return fmt.Errorf("invalid transaction data: %w", err)
    }

    // Process the transaction with the provided options...
    skipUtxoCreation := options.SkipUtxoCreation
    addToBlockAssembly := options.AddTXToBlockAssembly
    skipPolicyChecks := options.SkipPolicyChecks
    createConflicting := options.CreateConflicting

    // Perform validation based on options...
    return validateTransaction(tx, height, skipUtxoCreation, addToBlockAssembly, skipPolicyChecks, createConflicting)
}

Error Cases

  • Invalid message format: Message cannot be unmarshaled to KafkaTxValidationTopicMessage
  • Empty or invalid transaction: Transaction bytes cannot be parsed
  • Invalid height: Height is too high compared to the current blockchain height

Transaction Metadata Message Format

TxMeta Topic

kafka_txmetaConfig is the Kafka topic used for broadcasting transaction metadata for validated transactions. This topic allows the Validator to either add new transaction metadata or request deletion of previously shared metadata.

Message Structure

The transaction metadata message is defined in protobuf as KafkaTxMetaTopicMessage:

enum KafkaTxMetaActionType {
  ADD = 0;    // Add or update transaction metadata
  DELETE = 1; // Delete transaction metadata
}

message KafkaTxMetaTopicMessage {
  bytes txHash = 1;                 // Transaction hash (32 bytes)
  KafkaTxMetaActionType action = 2; // Action type (add or delete)
  bytes content = 3;                // Serialized transaction metadata (only used for ADD)
}

Field Specifications

txHash

  • Type: bytes
  • Description: Raw bytes representation of the transaction hash (32 bytes)
  • Required: Yes

action

  • Type: KafkaTxMetaActionType (enum)
  • Description: Specifies whether to add/update metadata (ADD) or delete metadata (DELETE)
  • Required: Yes
  • Values:

    • ADD (0): Add or update transaction metadata
    • DELETE (1): Delete transaction metadata

content

  • Type: bytes
  • Description: Serialized transaction metadata
  • Required: Only when action is ADD; should be empty when action is DELETE
  • Content: Serialized transaction metadata that includes transaction details, parent transaction hashes, block IDs, fees, and other relevant information

Transaction Metadata

The content field contains serialized transaction metadata, which typically includes:

  • Complete transaction content
  • Parent transaction hashes (inputs)
  • Block heights where the transaction appears
  • Transaction fee
  • Size in bytes
  • Flags (e.g., whether it's a coinbase transaction)
  • Lock time

Example

Here's a JSON representation of an ADD message (for illustration purposes only; actual messages are protobuf-encoded):

{
  "txHash": "<binary data - 32 bytes>",
  "action": 0,  // ADD
  "content": "<binary data - serialized transaction metadata>"
}

Here's a JSON representation of a DELETE message:

{
  "txHash": "<binary data - 32 bytes>",
  "action": 1,  // DELETE
  "content": ""  // Empty for DELETE operations
}

Code Examples

Sending Messages

// Example 1: Send ADD transaction metadata message
txHash := tx.TxID().Bytes() // returns 32-byte transaction hash
metadataContent := serializeMetadata(metadata) // serialize transaction metadata

// Create a new protobuf message for adding metadata
addMessage := &kafkamessage.KafkaTxMetaTopicMessage{
    TxHash: txHash,
    Action: kafkamessage.KafkaTxMetaActionType_ADD,
    Content: metadataContent,
}

// Serialize to protobuf format
addData, err := proto.Marshal(addMessage)
if err != nil {
    return fmt.Errorf("failed to serialize ADD metadata message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: addData,
})

// Example 2: Send DELETE transaction metadata message
txHash := tx.TxID().Bytes() // returns 32-byte transaction hash

// Create a new protobuf message for deleting metadata
deleteMessage := &kafkamessage.KafkaTxMetaTopicMessage{
    TxHash: txHash,
    Action: kafkamessage.KafkaTxMetaActionType_DELETE,
    // Content is empty for DELETE operations
}

// Serialize to protobuf format
deleteData, err := proto.Marshal(deleteMessage)
if err != nil {
    return fmt.Errorf("failed to serialize DELETE metadata message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: deleteData,
})

Receiving Messages

// Handle incoming transaction metadata message
func handleTxMetaMessage(msg *kafka.Message) error {
    if msg == nil {
        return nil
    }

    // Deserialize from protobuf format
    txMetaMessage := &kafkamessage.KafkaTxMetaTopicMessage{}
    if err := proto.Unmarshal(msg.Value, txMetaMessage); err != nil {
        return fmt.Errorf("failed to deserialize transaction metadata message: %w", err)
    }

    // Extract transaction hash
    var txHash chainhash.Hash
    copy(txHash[:], txMetaMessage.TxHash)

    // Process based on action type
    switch txMetaMessage.Action {
    case kafkamessage.KafkaTxMetaActionType_ADD:
        // Handle ADD operation
        metadata := deserializeMetadata(txMetaMessage.Content)
        return handleAddMetadata(txHash, metadata)

    case kafkamessage.KafkaTxMetaActionType_DELETE:
        // Handle DELETE operation
        return handleDeleteMetadata(txHash)

    default:
        return fmt.Errorf("unknown action type: %d", txMetaMessage.Action)
    }
}

Error Cases

  • Invalid message format: Message cannot be unmarshaled to KafkaTxMetaTopicMessage
  • Empty or invalid transaction hash: Hash does not contain exactly 32 bytes
  • Unknown action type: Action is not a recognized KafkaTxMetaActionType enum value
  • Missing content for ADD: Content field is empty when Action is ADD

Rejected Transaction Message Format

Rejected Transaction Topic

kafka_rejectedTxConfig is the Kafka topic used for broadcasting rejected transactions. This topic notifies subscribers about transactions that have been rejected during validation.

Message Structure

The rejected transaction message is defined in protobuf as KafkaRejectedTxTopicMessage:

message KafkaRejectedTxTopicMessage {
  bytes txHash = 1;  // Transaction hash (32 bytes)
  string reason = 2; // Rejection reason
}

Field Specifications

txHash

  • Type: bytes
  • Description: Raw bytes of the transaction hash (32 bytes), computed as double SHA256 hash
  • Computation: sha256(sha256(transaction_bytes))
  • Required: Yes

reason

  • Type: string
  • Description: Human-readable description of why the transaction was rejected
  • Required: Yes

Example

Here's a JSON representation of the message content (for illustration purposes only; actual messages are protobuf-encoded):

{
  "txHash": "<binary data - 32 bytes>",
  "reason": "Insufficient fee for transaction size"
}

Code Examples

Sending Messages

// Create the rejected transaction message
txHash := tx.TxID().Bytes() // returns 32-byte transaction hash
reasonStr := "Insufficient fee for transaction size"

// Create a new protobuf message
message := &kafkamessage.KafkaRejectedTxTopicMessage{
    TxHash: txHash,
    Reason: reasonStr,
}

// Serialize to protobuf format
data, err := proto.Marshal(message)
if err != nil {
    return fmt.Errorf("failed to serialize rejected transaction message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: data,
})

Receiving Messages

// Handle incoming rejected transaction message
func handleRejectedTxMessage(msg *kafka.Message) error {
    if msg == nil {
        return nil
    }

    // Deserialize from protobuf format
    rejectedTxMessage := &kafkamessage.KafkaRejectedTxTopicMessage{}
    if err := proto.Unmarshal(msg.Value, rejectedTxMessage); err != nil {
        return fmt.Errorf("failed to deserialize rejected transaction message: %w", err)
    }

    // Extract transaction hash
    var txHash chainhash.Hash
    copy(txHash[:], rejectedTxMessage.TxHash)

    // Extract rejection reason
    reason := rejectedTxMessage.Reason

    // Process the rejected transaction notification...
    log.Printf("Transaction %s was rejected: %s", txHash.String(), reason)
    return nil
}

Error Cases

  • Invalid message format: Message cannot be unmarshaled to KafkaRejectedTxTopicMessage
  • Empty or invalid transaction hash: Hash does not contain exactly 32 bytes
  • Missing reason: Reason field is empty

Inventory Message Format

Inventory Topic

The inventory message topic is used for broadcasting inventory vectors between components. This allows components to notify each other about available blocks, transactions, and other data.

Message Structure

The inventory message is defined in protobuf as KafkaInvTopicMessage:

message KafkaInvTopicMessage {
  string peerAddress = 1;  // Address of the peer
  repeated Inv inv = 2;    // List of inventory items
}

message Inv {
  InvType type = 1;  // Type of inventory item
  bytes hash = 2;    // Hash of the inventory item
}

enum InvType {
  Error         = 0;
  Tx            = 1;
  Block         = 2;
  FilteredBlock = 3;
}

Field Specifications

peerAddress

  • Type: string
  • Description: Network address of the peer that has the inventory item
  • Required: Yes

inv

  • Type: repeated Inv
  • Description: List of inventory items
  • Required: Yes
Inv
type
  • Type: InvType (enum)
  • Description: Type of inventory item
  • Required: Yes
  • Values:

    • Error (0): Error or unknown type
    • Tx (1): Transaction
    • Block (2): Block
    • FilteredBlock (3): Filtered block
hash
  • Type: bytes
  • Description: Hash of the inventory item (32 bytes)
  • Required: Yes

Example

Here's a JSON representation of the message content (for illustration purposes only; actual messages are protobuf-encoded):

{
  "peerAddress": "192.168.1.10:8333",
  "inv": [
    {
      "type": 1,  // Tx
      "hash": "<binary data - 32 bytes>"
    },
    {
      "type": 2,  // Block
      "hash": "<binary data - 32 bytes>"
    }
  ]
}

Code Examples

Sending Messages

// Create the inventory message
peerAddress := "192.168.1.10:8333"

// Create inventory items
invItems := []*kafkamessage.Inv{
    {
        Type: kafkamessage.InvType_Tx,
        Hash: txHash[:], // 32-byte transaction hash
    },
    {
        Type: kafkamessage.InvType_Block,
        Hash: blockHash[:], // 32-byte block hash
    },
}

// Create a new protobuf message
message := &kafkamessage.KafkaInvTopicMessage{
    PeerAddress: peerAddress,
    Inv:         invItems,
}

// Serialize to protobuf format
data, err := proto.Marshal(message)
if err != nil {
    return fmt.Errorf("failed to serialize inventory message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: data,
})

Receiving Messages

// Handle incoming inventory message
func handleInvMessage(msg *kafka.Message) error {
    if msg == nil {
        return nil
    }

    // Deserialize from protobuf format
    invMessage := &kafkamessage.KafkaInvTopicMessage{}
    if err := proto.Unmarshal(msg.Value, invMessage); err != nil {
        return fmt.Errorf("failed to deserialize inventory message: %w", err)
    }

    // Extract peer address
    peerAddr := invMessage.PeerAddress

    // Process inventory items
    for _, inv := range invMessage.Inv {
        var hash chainhash.Hash
        copy(hash[:], inv.Hash)

        switch inv.Type {
        case kafkamessage.InvType_Tx:
            log.Printf("Received transaction inventory from %s: %s", peerAddr, hash.String())
            // Process transaction inventory...

        case kafkamessage.InvType_Block:
            log.Printf("Received block inventory from %s: %s", peerAddr, hash.String())
            // Process block inventory...

        case kafkamessage.InvType_FilteredBlock:
            log.Printf("Received filtered block inventory from %s: %s", peerAddr, hash.String())
            // Process filtered block inventory...

        default:
            log.Printf("Received unknown inventory type %d from %s: %s", inv.Type, peerAddr, hash.String())
        }
    }

    return nil
}

Error Cases

  • Invalid message format: Message cannot be unmarshaled to KafkaInvTopicMessage
  • Empty peer address: PeerAddress field is empty
  • Invalid inventory item: Hash does not contain exactly 32 bytes or Type is unrecognized

Final Block Message Format

Final Block Topic

kafka_blocksFinalConfig is the Kafka topic used for broadcasting finalized blocks. This topic notifies subscribers about blocks that have been fully validated and accepted into the blockchain.

Message Structure

The final block message is defined in protobuf as KafkaBlocksFinalTopicMessage:

message KafkaBlocksFinalTopicMessage {
    bytes header = 1;                    // Block header bytes
    uint64 transaction_count = 2;        // Number of transactions in block
    uint64 size_in_bytes = 3;            // Size of block in bytes
    repeated bytes subtree_hashes = 4;   // Merkle tree subtree hashes
    bytes coinbase_tx = 5;               // Coinbase transaction bytes
    uint32 height = 6;                   // Block height
}

Field Specifications

  • Type: bytes
  • Description: Serialized block header
  • Required: Yes

transaction_count

  • Type: uint64
  • Description: Total number of transactions in the block
  • Required: Yes

size_in_bytes

  • Type: uint64
  • Description: Total size of the block in bytes
  • Required: Yes

subtree_hashes

  • Type: repeated bytes
  • Description: List of Merkle tree subtree hashes that compose the block
  • Required: Yes

coinbase_tx

  • Type: bytes
  • Description: Serialized coinbase transaction
  • Required: Yes

height

  • Type: uint32
  • Description: Block height in the blockchain
  • Required: Yes

Example

Here's a JSON representation of the message content (for illustration purposes only; actual messages are protobuf-encoded):

{
  "header": "<binary data - 80 bytes>",
  "transaction_count": 2500,
  "size_in_bytes": 1048576,
  "subtree_hashes": [
    "<binary data - 32 bytes>",
    "<binary data - 32 bytes>",
    "<binary data - 32 bytes>"
  ],
  "coinbase_tx": "<binary data - variable length>",
  "height": 12345
}

Code Examples

Sending Messages

// Create the final block message
blockHeader := block.Header.Serialize() // serialized block header bytes
txCount := uint64(block.Transactions.Len())
blockSize := uint64(block.SerializedSize())

// Get subtree hashes
subtreeHashes := make([][]byte, len(merkleTree.SubTrees))
for i, subtree := range merkleTree.SubTrees {
    subtreeHashes[i] = subtree.Hash()[:]
}

// Get coinbase transaction
coinbaseTx := block.Transactions[0].Serialize()
blockHeight := uint32(block.Height)

// Create a new protobuf message
message := &kafkamessage.KafkaBlocksFinalTopicMessage{
    Header:          blockHeader,
    TransactionCount: txCount,
    SizeInBytes:     blockSize,
    SubtreeHashes:   subtreeHashes,
    CoinbaseTx:      coinbaseTx,
    Height:          blockHeight,
}

// Serialize to protobuf format
data, err := proto.Marshal(message)
if err != nil {
    return fmt.Errorf("failed to serialize final block message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: data,
})

Receiving Messages

// Handle incoming final block message
func handleFinalBlockMessage(msg *kafka.Message) error {
    if msg == nil {
        return nil
    }

    // Deserialize from protobuf format
    finalBlockMessage := &kafkamessage.KafkaBlocksFinalTopicMessage{}
    if err := proto.Unmarshal(msg.Value, finalBlockMessage); err != nil {
        return fmt.Errorf("failed to deserialize final block message: %w", err)
    }

    // Parse block header
    header := wire.BlockHeader{}
    if err := header.Deserialize(bytes.NewReader(finalBlockMessage.Header)); err != nil {
        return fmt.Errorf("invalid block header: %w", err)
    }

    // Extract other fields
    txCount := finalBlockMessage.TransactionCount
    blockSize := finalBlockMessage.SizeInBytes
    subtreeHashes := finalBlockMessage.SubtreeHashes
    coinbaseTxBytes := finalBlockMessage.CoinbaseTx
    height := finalBlockMessage.Height

    // Parse coinbase transaction
    coinbaseTx, err := bsvutil.NewTxFromBytes(coinbaseTxBytes)
    if err != nil {
        return fmt.Errorf("invalid coinbase transaction: %w", err)
    }

    // Process the final block...
    log.Printf("Received final block at height %d with %d transactions (size: %d bytes)",
              height, txCount, blockSize)

    // Process subtree hashes...
    for i, hashBytes := range subtreeHashes {
        var hash chainhash.Hash
        copy(hash[:], hashBytes)
        log.Printf("  Subtree hash %d: %s", i, hash.String())
    }

    return nil
}

Error Cases

  • Invalid message format: Message cannot be unmarshaled to KafkaBlocksFinalTopicMessage
  • Invalid block header: Header bytes cannot be deserialized to a valid block header
  • Invalid coinbase transaction: Coinbase transaction bytes cannot be parsed
  • Missing subtree hashes: No subtree hashes provided

General Code Examples

Serializing Messages

Here's a general example of how to serialize a protobuf message for Kafka:

// Create a new message
message := &kafkamessage.KafkaBlockTopicMessage{
    Hash: blockHash[:],
    URL:  datahubUrl,
}

// Serialize the message to protobuf format
data, err := proto.Marshal(message)
if err != nil {
    return fmt.Errorf("failed to serialize message: %w", err)
}

// Send to Kafka
producer.Publish(&kafka.Message{
    Value: data,
})

Deserializing Messages

Here's a general example of how to deserialize a protobuf message from Kafka:

func handleBlockMessage(msg *kafka.Message) error {
    if msg == nil {
        return nil
    }

    // Create a new message container
    blockMessage := &kafkamessage.KafkaBlockTopicMessage{}

    // Deserialize the message from protobuf format
    if err := proto.Unmarshal(msg.Value, blockMessage); err != nil {
        return fmt.Errorf("failed to deserialize message: %w", err)
    }

    // Extract block hash
    var blockHash chainhash.Hash
    copy(blockHash[:], blockMessage.Hash)

    // Extract DataHub URL
    dataHubUrl := blockMessage.URL

    // Process the message...
    return nil
}

Other Resources