How we use Apache Kafka to improve event-driven architecture performance
In our previous article, we discussed the high-level architecture for addressing performance, one of the non-functional requirements (NFRs) in microservices architectures. This article dives into some key design considerations around performance using Apache Kafka as the event backbone.
Performance targets impact many of your architectural, technology, infrastructure, and design decisions. The increased adoption of hybrid and multicloud solutions has made modeling performance more important than ever, and many architectural tradeoffs are made based upon performance modeling.
Performance modeling should cover transaction and event inventory, workload modeling (concurrency, peak volumes, and expected response time for different transactions/events), and infrastructure modeling. Building a performance model helps you create the deployment model (especially scalability), make architectural and design optimizations to reduce latency, and design tests to validate performance and throughput.
Avoid monolithic monsters
A monolithic architecture centralizes processing, so it's impossible to scale different components independently. Even service implementation should be broken down into loosely coupled components using staged event-driven architecture (SEDA) to provide the ability to scale each component separately and to make the service more resilient. Each deployed component should be independently scalable and deployed as a cluster to increase concurrency and resiliency.
Choose the event backbone
Your event backbone impacts performance. These five characteristics are important to consider from a performance perspective:
- Performance of message ingestion and making the messages persistent, such as writing messages to the disk, as performance needs to be consistent all the time
- Performance of message consumption by consumers
- Ability to scale (for example, the addition of new nodes/brokers in the cluster)
- Time needed to rebalance when a node or broker fails
- Time needed to rebalance when a consumer instance leaves or joins a consumer group
We chose Apache Kafka because of the distributed event streaming platform's proven track record in performance, fault-tolerance, scalability, and availability over numerous engagements. It takes care of the first three points above. For rebalance time, Kafka's performance is dependent on the amount of data on the topics. While the Apache Pulsar distributed messaging and streaming platform tries to solve this, we use Kafka, given its good track record, and closely monitor Apache Pulsar's evolution.
[ For more insight on the evolving enterprise architect role, download The automation architect's handbook. ]
Database queries are expensive, and leveraging caching helps you avoid the cost. Caching creates a fast data layer that helps in data lookup. All read-only calls get redirected to the cache instead of fetching them from the database or another remote service. The stream data processing pipelines populate (or update) the cache in near-real time. Event processors then reference the data in the cache instead of querying databases or making service calls to systems of record. Event processors write to the Apache Ignite distributed database, and persistence processors asynchronously write the data to the database. This gives a major boost to performance.
We chose Ignite because it:
- Provides distributed caching (supports data partitioning)
- Is horizontally scalable
- Is ANSI SQL compliant
- Is easy to use
- Supports replication, providing some degree of fault-tolerance
- Is cloud-ready
You can make caching persistent. For example, the cached data gets written to disk, but it impacts performance. The performance penalty is very significant when not using SSD storage. If caches are non-persistent, then the architecture needs to cater for rehydrating the cache from data/event stores. Recovery performance is critical to reduce mean time to recovery. We leverage Kafka topics to rehydrate the cache. They are the event/state stores. A multi-instance recovery component reads data from Kafka topics and rehydrates the cache.
Performance is at its best when you collocate the event producers, consumers, and the event backbone. However, this would mean that if the datacenter goes down, it brings down the entire platform. You can avoid this if you set up replication and mirroring for the event backbone. For example, you can use Kafka MirrorMaker 2 (MM2) to set up replication across datacenters and availability zones.
Choose the message format
Message serialization and deserialization speed have some impact on performance. There are multiple choices for message format, including XML, JSON, Avro, Protobuf, and Thrift. For this application, we chose Avro due to its compactness, deserialization performance, and schema evolution support.
Make concurrency-related decisions
In this architecture, the key deciding parameters in terms of concurrency are:
- Number of partitions on Kafka topics
- Number of instances in each consumer group
- Number of threads in each instance of a consumer or producer
These have a direct impact on the throughput. In Kafka, each partition can get consumed by a single thread of the same consumer group. Having multiple partitions and multiple drives on Kafka brokers helps spread the events without having to worry about sequencing. Having a multithreaded consumer can help to a certain extent until you reach the CPU, memory, and network resource utilization limits. Having multiple instances in the same consumer group splits the load across multiple nodes and servers, providing horizontal scalability.
When using partitions, the higher the number of partitions, the higher the concurrency. However, there are certain considerations when determining the optimal number of partitions. It is important to choose the partition key so that it evenly spreads events across partitions without breaking the ordering requirements. Also, over-partitioning has some implications in terms of open file handles, higher unavailability of Kafka topics in case of broker failures, higher end-to-end latency, and higher memory requirements for the consumer.
Doing performance tests to benchmark performance and then extrapolating them to the desired throughput and performance helps finalize these three concurrency parameters.
[ Don't try to recreate what was normal before the pandemic. Learn from leading CIOs in a new report from Harvard Business Review Analytic Services: Maintaining Momentum on Digital Transformation. ]
I/O contributes significantly to latency. It impacts all components of the architecture. It impacts the event backbone, caching, databases, and application components.
For distributed caching components that have persistence enabled, I/O is one of the key factors that impact performance. For example, for Apache Ignite, I strongly recommend you use SSDs for good performance and also enable direct I/O. Non-persistent caches are the fastest. However, the downside is if the cache cluster goes down, cached data gets lost. You can solve this by having a recovery process, which rehydrates the cache. Note that the longer the recovery time, the larger the lag in the streaming pipeline. Therefore, the recovery process must be extremely fast. You can do this by running multiple instances of the recovery process and avoiding any transformation or business logic in the recovery processors.
Kafka doesn't necessarily require high-performance disks (such as SSD), but Kafka recommends using multiple drives (and multiple log dirs) for good throughput. However, the application and operating system shouldn't share drives. Mount options such as
noatime provide performance gain. Kafka's documentation provides other mount options based on the filesystem type.
From an application point of view, keep logging to a minimum. Instead of logging everything, log outliers. Even though most logging frameworks are capable of asynchronous logging, latency still has an impact.
Our event streaming architecture relies heavily on in-memory processing, especially for Kafka brokers and Apache Ignite server nodes. It is important to allocate adequate memory to processors, consumers, Kafka brokers, and in-memory data-grid nodes.
Tuning the operating system's memory settings can also help boost performance (to an extent). One of the key parameters is
vm.swappiness, which controls the swapping out of process memory. It takes a value between 0 and 100. The higher the number, the more aggressively swapping happens. Keeping this number low helps reduce swapping.
Since Kafka relies heavily on the page cache, you can also tweak the
vm.dirty_ratio options to control flushing data to disks.
Optimize network usage
For efficiency, you can apply compression to very large messages. This reduces network utilization. However, that comes at the cost of higher CPU utilization for the producer, consumer, and broker. Kafka supports four compression types:
zstd. Snappy fits in the middle, giving a good balance of CPU usage, compression ratio, speed, and network utilization.
You can read more about why you might choose Snappy in Message compression in Kafka.
Deserialize relevant messages
In a publish-subscribe mechanism, a consumer can get messages that they are not interested in. Adding a filter eliminates the need for processing non-relevant messages. However, it still requires deserializing the event payloads. You can avoid this by adding event metadata in the event headers. This allows consumers to look at the event headers and decide whether to parse the payload or not. This significantly improves throughput of the consumers and reduces resource utilization.
[ Register for the Understanding Kafka in the enterprise webinar series. ]
Parse what is required (especially XML documents)
In the financial services industry, XML documents are heavily used. For example, input to the event streaming applications may be XML. Parsing XML is CPU- and memory-intensive. Choosing to use an XML parser is a key decision related to performance and resource utilization. If the XML document is very large, consider using Simple API for XML (SAX) parsers. If the event streaming application does not require fully parsing the XML document, having pre-configured XPATHS to look up the required data and constructing the event payload from it may be a faster option.
However, suppose you need the entire XML data. In that case, it is wise to parse the entire document and convert it into the event streaming application's message format once. This provides multiple instances of this processor instead of having each processor in the event streaming pipeline parsing the XML document.
Monitor and identify performance bottlenecks
It should be easy to identify performance bottlenecks. You can use a lightweight instrumentation framework (based on aspect-oriented programming, or AOP). For example, we combine AOP with Spring-Boot Actuator and Micrometer to expose a Prometheus endpoint. For Apache Kafka, we use the JMX exporter for Prometheus to gather Kafka performance metrics. Then we use Grafana to build a rich dashboard displaying performance metrics for the producers, consumers, Kafka, and Ignite (basically all architecture components).
This helps pinpoint bottlenecks quickly rather than relying on log analysis and correlation.
Tune Kafka configuration
Kafka has many configuration parameters for brokers, producers, consumers, and Kafka streams. We use the following parameters to tune Kafka for latency and throughput:
log.dirs(set multiple directories on different drives to speed up I/O)
fetch.min.bytes(must be balanced because if set to the default value of 1, it will improve latency but throughput is not so good)
See Kafka performance tuning for additional information.
Tune garbage collection
Tuning garbage collection (GC) helps avoid long pauses and excessive GC overhead. (As of JDK 8,) consider using Garbage-First Garbage Collector (G1GC) for most applications (multiprocessor machines with large memory). It attempts to meet collection pause time goals and tries to achieve high throughput. Additionally, you don't need to do many configurations to get started.
Ensure that sufficient memory is available to the JVM. In the case of Ignite, use off-heap memory to store data.
-Xmx at the same value with the
-XX:+AlwaysPreTouch to ensure all memory allocation from the operating system happens during startup.
Throughput matters in event-driven architecture, so use the following parameters in G1GC to tune throughput further:
-XX:MaxGCPauseMillis(increase its value to improve throughput)
For more details, refer to the tuning G1GC documentation.
Tune Apache Ignite
You can find good documentation for tuning Apache Ignite on its official site. The recommendations we follow for improving Ignite's performance include:
- Maintain the 80:20 principle while allocating memory to Ignite server node (80% to the Ignite process, 20% reserved for the operating system)
- Use ZooKeeper for cluster discovery
- Use off-heap memory for storing data
- Allocate at least 16GB to the JVM heap
- Divide the off-heap into different data regions depending on the requirements (such as reference data region, input data region, or output data region)
- Hold reference data in "replicated" caches
- Hold I/O (for example, transactional data) in partitioned caches
- Define an affinity key in each cache for queries for collocated processing
- Tune JVM GC using the guidance above
- Use cache groups for logically related data
- Enable "lazy" loading in JDBC drivers
- Size different thread pools correctly to improve performance
- Use the JCache API instead of SQL queries
- Ensure cursors are closed
- Use "NearCache" configuration for reference data, so looking it up doesn't require the client nodes to fetch from remote server nodes
- Increase inline size of indexes
- Use direct-IO
- Enable native persistence in reference data caches—transactional caches don't (transactional caches get backed up on Kafka and rehydrated into Ignite from Kafka using a separate recovery processor in outages)
It is critical to maintain desired performance levels for each architecture component, as issues in any one of them can cause the stream processing to choke. Therefore, you need to tune every architecture component for performance without compromising other NFRs. It is essential to have a deep technical understanding of each component to tune it effectively.
This is excerpted from Designing high-volume systems using event-driven architectures and reused with permission.
Navigate the shifting technology landscape. Read An architect's guide to multicloud infrastructure.