This post will show you how to deploy Red Hat AMQ Streams on Red Hat OpenShift 4 backed by Red Hat OpenShift Container Storage 4. Red Hat AMQ Streams is based on Apache Kafka, a popular platform for streaming data delivery and processing.

This post covers a number of concepts:

  • Apache Kafka, Red Hat AMQ Streams and OCS introduction, architecture and use-case

  • Installing the Red Hat AMQ Streams Operator from OpenShift OperatorHub

  • Deploying ZooKeeper and Kafka with persistent storage from OpenShift Container Storage

  • Creating the Kafka topic and Kafka user, using their respective operators

  • Creating a Kafka Producer and Consumer application

  • Testing Kafka High Availability 

Introduction 

Note: This scenario assumes you already have an OpenShift cluster or have followed the instructions in the Deploying your storage backend using OpenShift Container Storage 4 blog to set up OpenShift Container Storage 4.

Apache Kafka

Apache Kafka is an open source, distributed, partitioned and replicated commit-log based publish and subscribe messaging system. It is one of the most popular messaging systems out there, used for a variety of use cases, ranging from web activity tracking, eventing, metrics, logging, streaming data to data lake and several others. Kafka can handle real-time streams of data, to collect big data, or to do real-time analysis, or both.

 

Apache Kafka use cases

Red Hat AMQ Streams

The Red Hat AMQ streams component is a massively scalable, distributed, and high-performance data streaming enterprise distribution based on the Apache Kafka project. Red Hat AMQ Streams makes it easy to run and operate Apache Kafka on OpenShift. AMQ Streams provides three operators.

Cluster Operator

Responsible for deploying and managing Apache Kafka clusters within an OpenShift cluster.

Topic Operator

Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift cluster.

User Operator

Responsible for managing Kafka users within a Kafka cluster running within an OpenShift cluster.

 

Figure 2: AMQ Streams diagram

OpenShift Container Storage for AMQ Streams (Kafka)

Kafka has in-build data resiliency capabilities. However, data persistence plays a vital role when it's deployed on a cloud native environment like Kubernetes. Red Hat OpenShift Container Storage has been tightly integrated with Red Hat OpenShift Container Platform, which is the core platform for Red Hat AMQ Streams. 

The data persistence needed by AMQ Streams components, namely Apache Zookeeper and Apache Kafka are fulfilled by OpenShift Container Storage block storage persistent volume claims. While typical public cloud block storage offerings could not span availability zones (per their design) with Container Storage your applications enjoy the highest levels of resiliency spanning across availability zones. 

This will be described in depth in the next section. Another interesting use case of using Container Storage for AMQ Streams is to move Kafak messages to scalable, distributed S3 compatible object storage for data analytics and long term retention purposes. OpenShift Container Storage is a unified storage solution providing block, object and file interfaces, which makes it a good persistence solution for Apache Kafka.  

 

Figure 3: Storage use case for Apache Kafka

Installing Red Hat AMQ Streams

Create a new project called amq-streams from the OpenShift console by navigating to the projects as shown in the next figure: 

 

Figure 4: Creating a new project called amq-streams

Note: You can also create the project from the command line: oc new-project amq-streams.

Make sure to select the amq-streams project and search AMQ Streams Operator from the OperatorHub.

 

Figure 5: Select amq-streams project

Install the AMQ Streams Operator.

 

Figure 6: Install the AMQ Streams Operator

Under Installation Mode, select amq-stream as the namespace.

Selecting a specific namespace helps to purge the operator and its resources easily. However if you want to install the operator across the cluster select “All Namespaces” option.

 

Figure 7: Under Installation Mode, select `amq-stream` as the namespace

Verify that the Operator has been successfully installed.

 

Figure 8: If you want to install the operator across the cluster select “All Namespaces” option

 

Figure 9: Verify that the Operator has been installed

At this stage, you should have the Red Hat AMQ Streams Operator installed on your OpenShift 4 environment.

Creating ZooKeeper and Kafka Clusters

Red Hat AMQ Streams provides the necessary operators which are responsible for deploying and managing Apache Kafka clusters on OpenShift.

In order to make Red Hat OpenShift Container Storage your default storage provider, set the default storage class to ocs-storagecluster-ceph-rbd. Refer to the change the default storage class to Ceph RBD for step-by-step instructions.

oc get sc

Example output:

$ oc get sc
NAME                                    PROVISIONER                             AGE
gp2                                     kubernetes.io/aws-ebs                   11d
ocs-storagecluster-ceph-rbd (default)   openshift-storage.rbd.csi.ceph.com      10d
ocs-storagecluster-cephfs               openshift-storage.cephfs.csi.ceph.com   10d
$

Create a Kafka cluster which will also deploy a zookeeper cluster required by Kafka.

oc project amq-streams
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/01-kafka-zookeeper.yaml

Monitor the cluster progress with the watch oc command:

watch oc get all

Finally once the Kafka cluster is up and running, you should see output similar to this:

$ oc get all
NAME                                                      READY   STATUS    RESTARTS   AGE
pod/amq-streams-cluster-operator-v1.3.0-b94f877f5-tzhhg   1/1     Running   0          6m24s
pod/kafka-cluster-entity-operator-67668f5954-zwxzc        3/3     Running   0          31s
pod/kafka-cluster-kafka-0                                 2/2     Running   0          2m58s
pod/kafka-cluster-kafka-1                                 2/2     Running   0          2m58s
pod/kafka-cluster-kafka-2                                 2/2     Running   0          2m58s
pod/kafka-cluster-zookeeper-0                             2/2     Running   0          4m2s
pod/kafka-cluster-zookeeper-1                             2/2     Running   0          4m2s
pod/kafka-cluster-zookeeper-2                             2/2     Running   0          4m2s

NAME                                             TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
service/kafka-cluster-kafka-0                    ClusterIP   172.30.114.98    <none>        9094/TCP                     3m
service/kafka-cluster-kafka-1                    ClusterIP   172.30.123.161   <none>        9094/TCP                     3m
service/kafka-cluster-kafka-2                    ClusterIP   172.30.99.23     <none>        9094/TCP                     3m
service/kafka-cluster-kafka-bootstrap            ClusterIP   172.30.228.1     <none>        9091/TCP,9092/TCP,9093/TCP   3m
service/kafka-cluster-kafka-brokers              ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   3m
service/kafka-cluster-kafka-external-bootstrap   ClusterIP   172.30.121.94    <none>        9094/TCP                     3m
service/kafka-cluster-zookeeper-client           ClusterIP   172.30.44.252    <none>        2181/TCP                     4m3s
service/kafka-cluster-zookeeper-nodes            ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   4m3s

NAME                                                  READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/amq-streams-cluster-operator-v1.3.0   1/1     1            1           6m25s
deployment.apps/kafka-cluster-entity-operator         1/1     1            1           32s

NAME                                                            DESIRED   CURRENT   READY   AGE
replicaset.apps/amq-streams-cluster-operator-v1.3.0-b94f877f5   1         1         1       6m25s
replicaset.apps/kafka-cluster-entity-operator-67668f5954        1         1         1       32s

NAME                                       READY   AGE
statefulset.apps/kafka-cluster-kafka       3/3     2m59s
statefulset.apps/kafka-cluster-zookeeper   3/3     4m3s

NAME                                                     HOST/PORT                                                                                          PATH   SERVICES                                 PORT   TERMINATION   WILDCARD
route.route.openshift.io/kafka-cluster-kafka-0           kafka-cluster-kafka-0-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com                  kafka-cluster-kafka-0                    9094   passthrough   None
route.route.openshift.io/kafka-cluster-kafka-1           kafka-cluster-kafka-1-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com                  kafka-cluster-kafka-1                    9094   passthrough   None
route.route.openshift.io/kafka-cluster-kafka-2           kafka-cluster-kafka-2-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com                  kafka-cluster-kafka-2                    9094   passthrough   None
route.route.openshift.io/kafka-cluster-kafka-bootstrap   kafka-cluster-kafka-bootstrap-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com          kafka-cluster-kafka-external-bootstrap   9094   passthrough   None
$

As a part of the Kafka/ZooKeeper cluster creation PV and PVC were created. Verify the status of PVC is Bound:

oc get pvc -n amq-streams

oc get pv -o json | jq -r '.items | sort_by(.spec.capacity.storage)[]| select(.spec.claimRef.namespace=="amq-streams") | [.spec.claimRef.name,.spec.capacity.storage] | @tsv'

Example output:

$ oc get pvc -n amq-streams
NAME                             STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS                  AGE
data-kafka-cluster-kafka-0       Bound    pvc-91601dfe-f1b4-11e9-b1e6-0a6f9f40dc3e   100Gi      RWO            ocs-storagecluster-ceph-rbd   18h
data-kafka-cluster-kafka-1       Bound    pvc-9160e85a-f1b4-11e9-843c-12e73ceaa62c   100Gi      RWO            ocs-storagecluster-ceph-rbd   18h
data-kafka-cluster-kafka-2       Bound    pvc-91613a33-f1b4-11e9-843c-12e73ceaa62c   100Gi      RWO            ocs-storagecluster-ceph-rbd   18h
data-kafka-cluster-zookeeper-0   Bound    pvc-73630d23-f1b4-11e9-843c-12e73ceaa62c   10Gi       RWO            ocs-storagecluster-ceph-rbd   18h
data-kafka-cluster-zookeeper-1   Bound    pvc-7374c25c-f1b4-11e9-843c-12e73ceaa62c   10Gi       RWO            ocs-storagecluster-ceph-rbd   18h
data-kafka-cluster-zookeeper-2   Bound    pvc-73736821-f1b4-11e9-b1e6-0a6f9f40dc3e   10Gi       RWO            ocs-storagecluster-ceph-rbd   18h
$

$ oc get pv -o json | jq -r '.items | sort_by(.spec.capacity.storage)[]| select(.spec.claimRef.namespace=="amq-streams") | [.spec.claimRef.name,.spec.capacity.storage] | @tsv'
data-kafka-cluster-kafka-0 100Gi
data-kafka-cluster-kafka-1 100Gi
data-kafka-cluster-kafka-2 100Gi
data-kafka-cluster-zookeeper-0 10Gi
data-kafka-cluster-zookeeper-2 10Gi
data-kafka-cluster-zookeeper-1 10Gi
$

At this point you have a running Kafka and ZooKeeper cluster on Red Hat OpenShift 4, deployed through Red Hat AMQ Streams Operator, consuming persistent block storage from Red Hat OpenShift Container Storage 4.

Create Kafka Topic and Kafka User

To start consuming Kafka, we first need to create a Kafka topic. AMQ Streams provides an operator to manage Kafka topics and Kafka users.

oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/02-kafka-topic.yaml

List Kafka Topics (kt):

oc get kt

Example output:

$ oc get kt
NAME       PARTITIONS   REPLICATION FACTOR
my-topic   3            3
$

Create a Kafka user.

oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/03-kafka-user.yaml

List Kafka users.

oc get kafkauser

Example output:

$ oc get kafkauser
NAME          AUTHENTICATION   AUTHORIZATION
kafka-user1   tls              simple
$

Now, we have a Kafka Topic and a Kafka user created on our Kafka Cluster using Red Hat AMQ Streams Operator

Create a sample Kafka Producer and Consumer Application

To demonstrate Kafka usage, let’s deploy a sample hello-world-producer application.

oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/04-hello-world-producer.yaml

This sample application will produce one million messages in an iterative manner

To review the Kafka producer messages lets tail to the logs of `hello-world-producer` app.

oc logs -n amq-streams -f $(oc get pods -l app=hello-world-producer -o name)

Note : You can leave the hello-world-consumer shell tab open to see live messages production, however, you can always press Ctrl+C to cancel the producer messages

Instead of the CLI, you could also view logs from the OpenShift Console.

 

Figure 10: View logs from the OpenShift Console

 

Figure 11: view logs from the OpenShift Console

We now have a Kafka producer app generating messages and pushing the messages to Kafka Topic. We now deploy a sample hello world consumer app, which will consume messages from the Kafka Topic.

Deploy the Kafka consumer application:

oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/05-hello-world-consumer.yaml

Monitor logs of the Kafka consumer app, in real time using CLI Or via OpenShift Console(shown above)

oc logs -n amq-streams -f $(oc get pods -l app=hello-world-consumer -o name)

Press Ctrl+C to cancel.

Making the Kafka cluster more resilient

Kafka out-of-the-box provides high availability and fault tolerance. However, the storage backend used underneath Kafka plays a vital role in the overall resiliency of the system.

In this section, we will demonstrate how OpenShift Container Storage increases the resiliency of the Kafka cluster by instantly attaching the PV to replacement pods in case of failure detection.

  • Open four SSH sessions, and log in to OpenShift cluster using OC client

  • In session-1: tail the real-time logs of Kafka producer application, created in the last section

oc logs -n amq-streams -f $(oc get pods -l app=hello-world-producer -o name)
  • In session-2 : tail the real-time logs of kafka consumer application, created in the last section

oc logs -n amq-streams -f $(oc get pods -l app=hello-world-consumer -o name)
  • In session-3: watch OCP Kafka pods, PV and PVC status

oc project amq-streams
watch oc get pods,pv,pvc
  • Next, we will intentionally induce Kafka pod failure. Keep a close eye on all three sessions. OCS makes Kafka pod recovery instant, resulting in no outage in your messaging infrastructure.

  • In session-4: run the following command to induce failure of the Kafka pod

oc delete pods kafka-cluster-kafka-0 --grace-period=0

Summary

You will notice that the Kafka producer and consumer applications would not notice any outage due to one of the Kafka pod failures, instead they will continue to function as if nothing happened. In the background, when Kubernetes detects Kafka pod failure, it launches a new Kafka pod and attaches the persistent volume which was once attached to the previous Kafka pod. 

The Kafka pod, upon instantiation, will start serving requests without rebuilding the dataset as the data is already persisted by the OCS persistent volume. Hence compared to ephemeral storage, OCS persistent volume, preserves the data and complements the Kafka cluster with faster recovery and resiliency in case of failures.

Resources and Feedback

To find out more about OpenShift Container Storage or to take a test drive, visit our Red Hat OpenShift Container Storage 4 page.

If you would like to learn more about what the OpenShift Container Storage team is up to or provide feedback on any of the new 4.2 features, take this brief 3-minute survey.