This post will show you how to deploy Red Hat AMQ Streams on Red Hat OpenShift 4 backed by Red Hat OpenShift Container Storage 4. Red Hat AMQ Streams is based on Apache Kafka, a popular platform for streaming data delivery and processing.
This post covers a number of concepts:
-
Apache Kafka, Red Hat AMQ Streams and OCS introduction, architecture and use-case
-
Installing the Red Hat AMQ Streams Operator from OpenShift OperatorHub
-
Deploying ZooKeeper and Kafka with persistent storage from OpenShift Container Storage
-
Creating the Kafka topic and Kafka user, using their respective operators
-
Creating a Kafka Producer and Consumer application
-
Testing Kafka High Availability
Introduction
Note: This scenario assumes you already have an OpenShift cluster or have followed the instructions in the Deploying your storage backend using OpenShift Container Storage 4 blog to set up OpenShift Container Storage 4.
Apache Kafka
Apache Kafka is an open source, distributed, partitioned and replicated commit-log based publish and subscribe messaging system. It is one of the most popular messaging systems out there, used for a variety of use cases, ranging from web activity tracking, eventing, metrics, logging, streaming data to data lake and several others. Kafka can handle real-time streams of data, to collect big data, or to do real-time analysis, or both.
Red Hat AMQ Streams
The Red Hat AMQ streams component is a massively scalable, distributed, and high-performance data streaming enterprise distribution based on the Apache Kafka project. Red Hat AMQ Streams makes it easy to run and operate Apache Kafka on OpenShift. AMQ Streams provides three operators.
Cluster Operator
Responsible for deploying and managing Apache Kafka clusters within an OpenShift cluster.
Topic Operator
Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift cluster.
User Operator
Responsible for managing Kafka users within a Kafka cluster running within an OpenShift cluster.
OpenShift Container Storage for AMQ Streams (Kafka)
Kafka has in-build data resiliency capabilities. However, data persistence plays a vital role when it's deployed on a cloud native environment like Kubernetes. Red Hat OpenShift Container Storage has been tightly integrated with Red Hat OpenShift Container Platform, which is the core platform for Red Hat AMQ Streams.
The data persistence needed by AMQ Streams components, namely Apache Zookeeper and Apache Kafka are fulfilled by OpenShift Container Storage block storage persistent volume claims. While typical public cloud block storage offerings could not span availability zones (per their design) with Container Storage your applications enjoy the highest levels of resiliency spanning across availability zones.
This will be described in depth in the next section. Another interesting use case of using Container Storage for AMQ Streams is to move Kafak messages to scalable, distributed S3 compatible object storage for data analytics and long term retention purposes. OpenShift Container Storage is a unified storage solution providing block, object and file interfaces, which makes it a good persistence solution for Apache Kafka.
Installing Red Hat AMQ Streams
Create a new project called amq-streams
from the OpenShift console by navigating to the projects as shown in the next figure:
Note: You can also create the project from the command line: oc new-project amq-streams
.
Make sure to select the amq-streams
project and search AMQ Streams Operator from the OperatorHub.
Install the AMQ Streams Operator.
Under Installation Mode, select amq-stream
as the namespace.
Selecting a specific namespace helps to purge the operator and its resources easily. However if you want to install the operator across the cluster select “All Namespaces” option.
Verify that the Operator has been successfully installed.
At this stage, you should have the Red Hat AMQ Streams Operator installed on your OpenShift 4 environment.
Creating ZooKeeper and Kafka Clusters
Red Hat AMQ Streams provides the necessary operators which are responsible for deploying and managing Apache Kafka clusters on OpenShift.
In order to make Red Hat OpenShift Container Storage your default storage provider, set the default storage class to ocs-storagecluster-ceph-rbd
. Refer to the change the default storage class to Ceph RBD for step-by-step instructions.
oc get sc
Example output:
$ oc get sc NAME PROVISIONER AGE gp2 kubernetes.io/aws-ebs 11d ocs-storagecluster-ceph-rbd (default) openshift-storage.rbd.csi.ceph.com 10d ocs-storagecluster-cephfs openshift-storage.cephfs.csi.ceph.com 10d $
Create a Kafka cluster which will also deploy a zookeeper cluster required by Kafka.
oc project amq-streams oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/01-kafka-zookeeper.yaml
Monitor the cluster progress with the watch oc
command:
watch oc get all
Finally once the Kafka cluster is up and running, you should see output similar to this:
$ oc get all NAME READY STATUS RESTARTS AGE pod/amq-streams-cluster-operator-v1.3.0-b94f877f5-tzhhg 1/1 Running 0 6m24s pod/kafka-cluster-entity-operator-67668f5954-zwxzc 3/3 Running 0 31s pod/kafka-cluster-kafka-0 2/2 Running 0 2m58s pod/kafka-cluster-kafka-1 2/2 Running 0 2m58s pod/kafka-cluster-kafka-2 2/2 Running 0 2m58s pod/kafka-cluster-zookeeper-0 2/2 Running 0 4m2s pod/kafka-cluster-zookeeper-1 2/2 Running 0 4m2s pod/kafka-cluster-zookeeper-2 2/2 Running 0 4m2s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka-cluster-kafka-0 ClusterIP 172.30.114.98 <none> 9094/TCP 3m service/kafka-cluster-kafka-1 ClusterIP 172.30.123.161 <none> 9094/TCP 3m service/kafka-cluster-kafka-2 ClusterIP 172.30.99.23 <none> 9094/TCP 3m service/kafka-cluster-kafka-bootstrap ClusterIP 172.30.228.1 <none> 9091/TCP,9092/TCP,9093/TCP 3m service/kafka-cluster-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP,9093/TCP 3m service/kafka-cluster-kafka-external-bootstrap ClusterIP 172.30.121.94 <none> 9094/TCP 3m service/kafka-cluster-zookeeper-client ClusterIP 172.30.44.252 <none> 2181/TCP 4m3s service/kafka-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m3s NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/amq-streams-cluster-operator-v1.3.0 1/1 1 1 6m25s deployment.apps/kafka-cluster-entity-operator 1/1 1 1 32s NAME DESIRED CURRENT READY AGE replicaset.apps/amq-streams-cluster-operator-v1.3.0-b94f877f5 1 1 1 6m25s replicaset.apps/kafka-cluster-entity-operator-67668f5954 1 1 1 32s NAME READY AGE statefulset.apps/kafka-cluster-kafka 3/3 2m59s statefulset.apps/kafka-cluster-zookeeper 3/3 4m3s NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD route.route.openshift.io/kafka-cluster-kafka-0 kafka-cluster-kafka-0-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com kafka-cluster-kafka-0 9094 passthrough None route.route.openshift.io/kafka-cluster-kafka-1 kafka-cluster-kafka-1-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com kafka-cluster-kafka-1 9094 passthrough None route.route.openshift.io/kafka-cluster-kafka-2 kafka-cluster-kafka-2-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com kafka-cluster-kafka-2 9094 passthrough None route.route.openshift.io/kafka-cluster-kafka-bootstrap kafka-cluster-kafka-bootstrap-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com kafka-cluster-kafka-external-bootstrap 9094 passthrough None $
As a part of the Kafka/ZooKeeper cluster creation PV and PVC were created. Verify the status of PVC is Bound
:
oc get pvc -n amq-streams oc get pv -o json | jq -r '.items | sort_by(.spec.capacity.storage)[]| select(.spec.claimRef.namespace=="amq-streams") | [.spec.claimRef.name,.spec.capacity.storage] | @tsv'
Example output:
$ oc get pvc -n amq-streams NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE data-kafka-cluster-kafka-0 Bound pvc-91601dfe-f1b4-11e9-b1e6-0a6f9f40dc3e 100Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-kafka-1 Bound pvc-9160e85a-f1b4-11e9-843c-12e73ceaa62c 100Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-kafka-2 Bound pvc-91613a33-f1b4-11e9-843c-12e73ceaa62c 100Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-zookeeper-0 Bound pvc-73630d23-f1b4-11e9-843c-12e73ceaa62c 10Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-zookeeper-1 Bound pvc-7374c25c-f1b4-11e9-843c-12e73ceaa62c 10Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-zookeeper-2 Bound pvc-73736821-f1b4-11e9-b1e6-0a6f9f40dc3e 10Gi RWO ocs-storagecluster-ceph-rbd 18h $ $ oc get pv -o json | jq -r '.items | sort_by(.spec.capacity.storage)[]| select(.spec.claimRef.namespace=="amq-streams") | [.spec.claimRef.name,.spec.capacity.storage] | @tsv' data-kafka-cluster-kafka-0 100Gi data-kafka-cluster-kafka-1 100Gi data-kafka-cluster-kafka-2 100Gi data-kafka-cluster-zookeeper-0 10Gi data-kafka-cluster-zookeeper-2 10Gi data-kafka-cluster-zookeeper-1 10Gi $
At this point you have a running Kafka and ZooKeeper cluster on Red Hat OpenShift 4, deployed through Red Hat AMQ Streams Operator, consuming persistent block storage from Red Hat OpenShift Container Storage 4.
Create Kafka Topic and Kafka User
To start consuming Kafka, we first need to create a Kafka topic. AMQ Streams provides an operator to manage Kafka topics and Kafka users.
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/02-kafka-topic.yaml
List Kafka Topics (kt):
oc get kt
Example output:
$ oc get kt NAME PARTITIONS REPLICATION FACTOR my-topic 3 3 $
Create a Kafka user.
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/03-kafka-user.yaml
List Kafka users.
oc get kafkauser
Example output:
$ oc get kafkauser NAME AUTHENTICATION AUTHORIZATION kafka-user1 tls simple $
Now, we have a Kafka Topic and a Kafka user created on our Kafka Cluster using Red Hat AMQ Streams Operator
Create a sample Kafka Producer and Consumer Application
To demonstrate Kafka usage, let’s deploy a sample hello-world-producer application.
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/04-hello-world-producer.yaml
This sample application will produce one million messages in an iterative manner
To review the Kafka producer messages lets tail to the logs of `hello-world-producer` app.
oc logs -n amq-streams -f $(oc get pods -l app=hello-world-producer -o name)
Note : You can leave the hello-world-consumer shell tab open to see live messages production, however, you can always press Ctrl+C
to cancel the producer messages
Instead of the CLI, you could also view logs from the OpenShift Console.
We now have a Kafka producer app generating messages and pushing the messages to Kafka Topic. We now deploy a sample hello world consumer app, which will consume messages from the Kafka Topic.
Deploy the Kafka consumer application:
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/05-hello-world-consumer.yaml
Monitor logs of the Kafka consumer app, in real time using CLI Or via OpenShift Console(shown above)
oc logs -n amq-streams -f $(oc get pods -l app=hello-world-consumer -o name)
Press Ctrl+C
to cancel.
Making the Kafka cluster more resilient
Kafka out-of-the-box provides high availability and fault tolerance. However, the storage backend used underneath Kafka plays a vital role in the overall resiliency of the system.
In this section, we will demonstrate how OpenShift Container Storage increases the resiliency of the Kafka cluster by instantly attaching the PV to replacement pods in case of failure detection.
-
Open four SSH sessions, and log in to OpenShift cluster using OC client
-
In session-1: tail the real-time logs of Kafka producer application, created in the last section
oc logs -n amq-streams -f $(oc get pods -l app=hello-world-producer -o name)
-
In session-2 : tail the real-time logs of kafka consumer application, created in the last section
oc logs -n amq-streams -f $(oc get pods -l app=hello-world-consumer -o name)
-
In session-3: watch OCP Kafka pods, PV and PVC status
oc project amq-streams watch oc get pods,pv,pvc
-
Next, we will intentionally induce Kafka pod failure. Keep a close eye on all three sessions. OCS makes Kafka pod recovery instant, resulting in no outage in your messaging infrastructure.
-
In session-4: run the following command to induce failure of the Kafka pod
oc delete pods kafka-cluster-kafka-0 --grace-period=0
Summary
You will notice that the Kafka producer and consumer applications would not notice any outage due to one of the Kafka pod failures, instead they will continue to function as if nothing happened. In the background, when Kubernetes detects Kafka pod failure, it launches a new Kafka pod and attaches the persistent volume which was once attached to the previous Kafka pod.
The Kafka pod, upon instantiation, will start serving requests without rebuilding the dataset as the data is already persisted by the OCS persistent volume. Hence compared to ephemeral storage, OCS persistent volume, preserves the data and complements the Kafka cluster with faster recovery and resiliency in case of failures.
Resources and Feedback
To find out more about OpenShift Container Storage or to take a test drive, visit our Red Hat OpenShift Container Storage 4 page.
If you would like to learn more about what the OpenShift Container Storage team is up to or provide feedback on any of the new 4.2 features, take this brief 3-minute survey.
About the author
Browse by channel
Automation
The latest on IT automation for tech, teams, and environments
Artificial intelligence
Updates on the platforms that free customers to run AI workloads anywhere
Open hybrid cloud
Explore how we build a more flexible future with hybrid cloud
Security
The latest on how we reduce risks across environments and technologies
Edge computing
Updates on the platforms that simplify operations at the edge
Infrastructure
The latest on the world’s leading enterprise Linux platform
Applications
Inside our solutions to the toughest application challenges
Original shows
Entertaining stories from the makers and leaders in enterprise tech
Products
- Red Hat Enterprise Linux
- Red Hat OpenShift
- Red Hat Ansible Automation Platform
- Cloud services
- See all products
Tools
- Training and certification
- My account
- Customer support
- Developer resources
- Find a partner
- Red Hat Ecosystem Catalog
- Red Hat value calculator
- Documentation
Try, buy, & sell
Communicate
About Red Hat
We’re the world’s leading provider of enterprise open source solutions—including Linux, cloud, container, and Kubernetes. We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.
Select a language
Red Hat legal and privacy links
- About Red Hat
- Jobs
- Events
- Locations
- Contact Red Hat
- Red Hat Blog
- Diversity, equity, and inclusion
- Cool Stuff Store
- Red Hat Summit