Kafka publisher for use with Solana's plugin framework.
Streams three types of Solana data to Kafka:
- Account Updates: Balance changes, data modifications, owner changes
- Transactions: Complete transaction details, instructions, fees, success/failure
- Slot Status: Network progress, confirmation status, health metrics
Want to see data flowing immediately? Use this minimal config:
{
"libpath": "target/release/libsolana_accountsdb_plugin_kafka.so",
"kafka": {
"bootstrap.servers": "localhost:9092"
},
"filters": [{
"update_account_topic": "solana.testnet.account_updates",
"slot_status_topic": "solana.testnet.slot_status",
"transaction_topic": "solana.testnet.transactions",
"publish_all_accounts": true
}]
}
This will publish all account updates, transactions, and slot status to Kafka. Perfect for testing and development.
Find binary releases here.
You will need version 3.15 or later of the protobuf compiler protoc
installed, since it is required for the --experimental_allow_proto3_optional
option.
Note that as of this writing, Ubuntu 22.04 still has an obsolete of protoc
.
Previously, you could use protocbuf-compiiler
from Debian trixie, but trixie's libc6 is no longer compatible with Ubuntu 22.04.
You will need to use Ubuntu 24.04 or later, or Debian 12 or later.
cargo build --release
- Linux:
./target/release/libsolana_accountsdb_plugin_kafka.so
- macOS:
./target/release/libsolana_accountsdb_plugin_kafka.dylib
Important: Solana's plugin interface requires the build environment of the Solana validator and this plugin to be identical.
This includes the Solana version and Rust compiler version. Loading a plugin targeting wrong versions will result in memory corruption and crashes.
Config is specified via the plugin's JSON config file.
The following config is a minimal example that demonstrates the structure, but with publish_all_accounts: false
and no program_filters
, you'll only see slot status updates. For testing, consider using the scenarios below.
{
"libpath": "target/release/libsolana_accountsdb_plugin_kafka.so",
"kafka": {
"bootstrap.servers": "localhost:9092",
"request.required.acks": "1",
"message.timeout.ms": "30000",
"compression.type": "lz4",
"partitioner": "murmur2_random",
"statistics.interval.ms": "1000"
},
"shutdown_timeout_ms": 30000,
"filters": [{
"update_account_topic": "solana.testnet.account_updates",
"slot_status_topic": "solana.testnet.slot_status",
"transaction_topic": "solana.testnet.transactions",
"program_ignores": [
"Sysvar1111111111111111111111111111111111111",
"Vote111111111111111111111111111111111111111"
],
"publish_all_accounts": false,
"wrap_messages": false
}]
}
For Testing/Development (Recommended):
{
"libpath": "target/release/libsolana_accountsdb_plugin_kafka.so",
"kafka": {
"bootstrap.servers": "localhost:9092"
},
"filters": [{
"update_account_topic": "solana.testnet.account_updates",
"slot_status_topic": "solana.testnet.slot_status",
"transaction_topic": "solana.testnet.transactions",
"publish_all_accounts": true
}]
}
libpath
: Path to Kafka pluginkafka
:librdkafka
config options. Common options include:statistics.interval.ms
: Enables Prometheus metrics collection (set to 1000ms or higher)queue.buffering.max.messages
: Controls producer buffer sizequeue.buffering.max.kbytes
: Controls producer buffer size in KB
shutdown_timeout_ms
: Time the plugin is given to flush out all messages to Kafka upon exit request.prometheus
: Optional port to provide metrics in Prometheus format.filters
: Vec of filters with next fields:update_account_topic
: Topic name of account updates. Omit to disable.slot_status_topic
: Topic name of slot status update. Omit to disable.transaction_topic
: Topic name of transaction update. Omit to disable.program_ignores
: Account addresses to ignore (see Filtering below).program_filters
: Solana program IDs to include.account_filters
: Solana accounts to include.publish_all_accounts
: Publish all accounts on startup. Omit to disable.include_vote_transactions
: Include Vote transactions.include_failed_transactions
: Include failed transactions.wrap_messages
: Wrap all messages in a unified wrapper object. Omit to disable (see Message Wrapping below).
The message types are keyed as follows:
- Account update: account address (public key)
- Slot status: slot number
- Transaction notification: transaction signature
The plugin uses a whitelist approach for filtering. By default, most events are filtered out unless you explicitly configure what you want to see.
- Account Updates: Only published if the account's owner program is in
program_filters
OR the account address is inaccount_filters
- Transactions: Only published if they involve accounts from programs in
program_filters
OR specific accounts inaccount_filters
- Slot Status: Always published (not affected by filters)
- Program Ignores: Blacklist of programs/accounts to exclude (applied after whitelist filtering)
Scenario 1: See Everything (Recommended for testing)
{
"publish_all_accounts": true,
"program_filters": [],
"account_filters": []
}
Scenario 2: See Specific Programs Only
{
"publish_all_accounts": false,
"program_filters": [
"11111111111111111111111111111111", // System Program
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" // Token Program
]
}
Scenario 3: See Specific Accounts Only
{
"publish_all_accounts": false,
"account_filters": [
"YourAccountAddressHere111111111111111111111111"
]
}
If you're not seeing messages in Kafka despite successful slot processing:
- Check your filters: Make sure you have either
publish_all_accounts: true
or specificprogram_filters
/account_filters
- Verify topics: Ensure your topic names are correct and Kafka is running
- Check program ignores: Make sure you're not accidentally filtering out everything with overly restrictive
program_ignores
- Test with minimal config: Start with
publish_all_accounts: true
to verify the plugin is working
In some cases it may be desirable to send multiple types of messages to the same topic,
for instance to preserve relative order. In this case it is helpful if all messages conform to a single schema.
Setting wrap_messages
to true will wrap all three message types in a uniform wrapper object so that they
conform to a single schema.
Note that if wrap_messages
is true, in order to avoid key collision, the message keys are prefixed with a single byte,
which is dependent on the type of the message being wrapped. Account update message keys are prefixed with
65 (A), slot status keys with 83 (S), and transaction keys with 84 (T).
The Kafka producer acts strictly non-blocking to allow the Solana validator to sync without much induced lag. This means incoming events from the Solana validator will get buffered and published asynchronously.
When the publishing buffer is exhausted any additional events will get dropped. This can happen when Kafka brokers are too slow or the connection to Kafka fails. Therefore it is crucial to choose a sufficiently large buffer.
The buffer size can be controlled using librdkafka
config options, including:
queue.buffering.max.messages
: Maximum number of messages allowed on the producer queue.queue.buffering.max.kbytes
: Maximum total message size sum allowed on the producer queue.
The plugin now provides significantly richer analytics data for blockchain monitoring and analysis, enhancing all supported Solana transaction formats.
The plugin supports multiple Solana transaction formats:
- Legacy Transactions: Traditional Solana message format
- V0 Transactions: Versioned transactions with address lookup tables (LUTs)
All transaction types are enhanced with comprehensive analytics metadata.
All transactions provide additional analytics data that can be used for:
- Compute Units: Actual compute units consumed by transactions
- Pricing Information: Compute unit pricing from ComputeBudget instructions
- Cost Analysis: Transaction fees and compute costs
- Error Detection: Reliable error status from transaction metadata
- Success Tracking: Transaction success/failure status
- Error Details: Structured error information without log parsing
- Address Lookup Tables: Support for V0 LUT transactions
- Loaded Address Details: Index and writable status for loaded addresses
- Account Metadata: Enhanced account information and versioning
- Confirmation Status: Smart confirmation counting based on slot status
- Status Descriptions: Human-readable slot status descriptions
- Progress Tracking: Slot progression monitoring
The protobuf schema has been enhanced with analytics fields:
data_version
: Account data version for change trackingis_startup
: Whether this is a startup account updateaccount_age
: Approximate account age in slots
is_confirmed
: Whether slot is confirmed by supermajorityconfirmation_count
: Confirmation level (0-2 based on status)status_description
: Human-readable status description
compute_units_consumed
: Actual compute units usedcompute_units_price
: Compute unit pricing in micro-lamportstotal_cost
: Total transaction cost (fee + compute)instruction_count
: Number of instructions in transactionaccount_count
: Number of accounts involvedexecution_time_ns
: Execution time in nanosecondsis_successful
: Transaction success statusexecution_logs
: Detailed execution logserror_details
: Detailed error informationconfirmation_count
: Number of confirmations
writable_info
: Detailed writable address informationreadonly_info
: Detailed readonly address information
Analytics features are enabled by default and require no additional configuration. The plugin automatically:
- Detects transaction format (Legacy or V0)
- Extracts available metadata fields
- Provides enhanced analytics where data is available
- Maintains backwards compatibility with existing consumers
Analytics enhancements enable new use cases:
- Performance Monitoring: Track compute unit usage and costs
- Error Analysis: Monitor transaction failures and success rates
- Network Analytics: Analyze slot confirmation patterns
- Address Intelligence: Monitor address lookup table usage
- Cost Optimization: Analyze transaction pricing and efficiency
Symptoms: Solana validator shows slot processing but no messages appear in Kafka topics.
Causes & Solutions:
- Filtering too restrictive: Set
publish_all_accounts: true
or add specificprogram_filters
- Wrong topic names: Verify your topic names match exactly
- Kafka connection issues: Check if Kafka is running and accessible
- Plugin not loaded: Verify the plugin path in
libpath
is correct
Quick Test: Use the Quick Start config above to verify the plugin works.
Cause: This is expected behavior with the default example config! Slot status is always published, but account updates and transactions require explicit filter configuration.
Solution: Add publish_all_accounts: true
or configure program_filters
.
Common Causes:
- Version mismatch: Ensure Solana and plugin are built with identical Rust/Solana versions
- Library path: Check
libpath
points to the correct.so
or.dylib
file - Permissions: Ensure the plugin file is readable by the Solana process
Cause: Large Kafka producer buffers can consume significant memory.
Solution: Adjust buffer settings:
{
"kafka": {
"queue.buffering.max.messages": "10000",
"queue.buffering.max.kbytes": "1048576"
}
}
- Start Simple: Begin with
publish_all_accounts: true
to verify basic functionality - Check Topics: Use Kafdrop or
kafka-console-consumer
to verify topics exist - Monitor Metrics: Enable Prometheus metrics to see message counts and errors
- Verify Filters: Double-check your filter configuration matches your expectations
If you're still having issues:
- Check this troubleshooting section
- Review the filtering documentation above
- Try the Quick Start configuration
- Open an issue with your config and error details