This post will show you how to deploy Red Hat AMQ Streams on Red Hat OpenShift 4 backed by Red Hat OpenShift Container Storage 4. Red Hat AMQ Streams is based on Apache Kafka, a popular platform for streaming data delivery and processing.
This post covers a number of concepts:
-
Apache Kafka, Red Hat AMQ Streams and OCS introduction, architecture and use-case
-
Installing the Red Hat AMQ Streams Operator from OpenShift OperatorHub
-
Deploying ZooKeeper and Kafka with persistent storage from OpenShift Container Storage
-
Creating the Kafka topic and Kafka user, using their respective operators
-
Creating a Kafka Producer and Consumer application
-
Testing Kafka High Availability
Introduction
Note: This scenario assumes you already have an OpenShift cluster or have followed the instructions in the Deploying your storage backend using OpenShift Container Storage 4 blog to set up OpenShift Container Storage 4.
Apache Kafka
Apache Kafka is an open source, distributed, partitioned and replicated commit-log based publish and subscribe messaging system. It is one of the most popular messaging systems out there, used for a variety of use cases, ranging from web activity tracking, eventing, metrics, logging, streaming data to data lake and several others. Kafka can handle real-time streams of data, to collect big data, or to do real-time analysis, or both.
Red Hat AMQ Streams
The Red Hat AMQ streams component is a massively scalable, distributed, and high-performance data streaming enterprise distribution based on the Apache Kafka project. Red Hat AMQ Streams makes it easy to run and operate Apache Kafka on OpenShift. AMQ Streams provides three operators.
Cluster Operator
Responsible for deploying and managing Apache Kafka clusters within an OpenShift cluster.
Topic Operator
Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift cluster.
User Operator
Responsible for managing Kafka users within a Kafka cluster running within an OpenShift cluster.
OpenShift Container Storage for AMQ Streams (Kafka)
Kafka has in-build data resiliency capabilities. However, data persistence plays a vital role when it's deployed on a cloud native environment like Kubernetes. Red Hat OpenShift Container Storage has been tightly integrated with Red Hat OpenShift Container Platform, which is the core platform for Red Hat AMQ Streams.
The data persistence needed by AMQ Streams components, namely Apache Zookeeper and Apache Kafka are fulfilled by OpenShift Container Storage block storage persistent volume claims. While typical public cloud block storage offerings could not span availability zones (per their design) with Container Storage your applications enjoy the highest levels of resiliency spanning across availability zones.
This will be described in depth in the next section. Another interesting use case of using Container Storage for AMQ Streams is to move Kafak messages to scalable, distributed S3 compatible object storage for data analytics and long term retention purposes. OpenShift Container Storage is a unified storage solution providing block, object and file interfaces, which makes it a good persistence solution for Apache Kafka.
Installing Red Hat AMQ Streams
Create a new project called amq-streams
from the OpenShift console by navigating to the projects as shown in the next figure:
Note: You can also create the project from the command line: oc new-project amq-streams
.
Make sure to select the amq-streams
project and search AMQ Streams Operator from the OperatorHub.
Install the AMQ Streams Operator.
Under Installation Mode, select amq-stream
as the namespace.
Selecting a specific namespace helps to purge the operator and its resources easily. However if you want to install the operator across the cluster select “All Namespaces” option.
Verify that the Operator has been successfully installed.
At this stage, you should have the Red Hat AMQ Streams Operator installed on your OpenShift 4 environment.
Creating ZooKeeper and Kafka Clusters
Red Hat AMQ Streams provides the necessary operators which are responsible for deploying and managing Apache Kafka clusters on OpenShift.
In order to make Red Hat OpenShift Container Storage your default storage provider, set the default storage class to ocs-storagecluster-ceph-rbd
. Refer to the change the default storage class to Ceph RBD for step-by-step instructions.
oc get sc
Example output:
$ oc get sc NAME PROVISIONER AGE gp2 kubernetes.io/aws-ebs 11d ocs-storagecluster-ceph-rbd (default) openshift-storage.rbd.csi.ceph.com 10d ocs-storagecluster-cephfs openshift-storage.cephfs.csi.ceph.com 10d $
Create a Kafka cluster which will also deploy a zookeeper cluster required by Kafka.
oc project amq-streams oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/01-kafka-zookeeper.yaml
Monitor the cluster progress with the watch oc
command:
watch oc get all
Finally once the Kafka cluster is up and running, you should see output similar to this:
$ oc get all NAME READY STATUS RESTARTS AGE pod/amq-streams-cluster-operator-v1.3.0-b94f877f5-tzhhg 1/1 Running 0 6m24s pod/kafka-cluster-entity-operator-67668f5954-zwxzc 3/3 Running 0 31s pod/kafka-cluster-kafka-0 2/2 Running 0 2m58s pod/kafka-cluster-kafka-1 2/2 Running 0 2m58s pod/kafka-cluster-kafka-2 2/2 Running 0 2m58s pod/kafka-cluster-zookeeper-0 2/2 Running 0 4m2s pod/kafka-cluster-zookeeper-1 2/2 Running 0 4m2s pod/kafka-cluster-zookeeper-2 2/2 Running 0 4m2s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka-cluster-kafka-0 ClusterIP 172.30.114.98 <none> 9094/TCP 3m service/kafka-cluster-kafka-1 ClusterIP 172.30.123.161 <none> 9094/TCP 3m service/kafka-cluster-kafka-2 ClusterIP 172.30.99.23 <none> 9094/TCP 3m service/kafka-cluster-kafka-bootstrap ClusterIP 172.30.228.1 <none> 9091/TCP,9092/TCP,9093/TCP 3m service/kafka-cluster-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP,9093/TCP 3m service/kafka-cluster-kafka-external-bootstrap ClusterIP 172.30.121.94 <none> 9094/TCP 3m service/kafka-cluster-zookeeper-client ClusterIP 172.30.44.252 <none> 2181/TCP 4m3s service/kafka-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m3s NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/amq-streams-cluster-operator-v1.3.0 1/1 1 1 6m25s deployment.apps/kafka-cluster-entity-operator 1/1 1 1 32s NAME DESIRED CURRENT READY AGE replicaset.apps/amq-streams-cluster-operator-v1.3.0-b94f877f5 1 1 1 6m25s replicaset.apps/kafka-cluster-entity-operator-67668f5954 1 1 1 32s NAME READY AGE statefulset.apps/kafka-cluster-kafka 3/3 2m59s statefulset.apps/kafka-cluster-zookeeper 3/3 4m3s NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD route.route.openshift.io/kafka-cluster-kafka-0 kafka-cluster-kafka-0-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com kafka-cluster-kafka-0 9094 passthrough None route.route.openshift.io/kafka-cluster-kafka-1 kafka-cluster-kafka-1-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com kafka-cluster-kafka-1 9094 passthrough None route.route.openshift.io/kafka-cluster-kafka-2 kafka-cluster-kafka-2-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com kafka-cluster-kafka-2 9094 passthrough None route.route.openshift.io/kafka-cluster-kafka-bootstrap kafka-cluster-kafka-bootstrap-amq-streams.apps.cluster-espoo-ebad.espoo-ebad.example.opentlc.com kafka-cluster-kafka-external-bootstrap 9094 passthrough None $
As a part of the Kafka/ZooKeeper cluster creation PV and PVC were created. Verify the status of PVC is Bound
:
oc get pvc -n amq-streams oc get pv -o json | jq -r '.items | sort_by(.spec.capacity.storage)[]| select(.spec.claimRef.namespace=="amq-streams") | [.spec.claimRef.name,.spec.capacity.storage] | @tsv'
Example output:
$ oc get pvc -n amq-streams NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE data-kafka-cluster-kafka-0 Bound pvc-91601dfe-f1b4-11e9-b1e6-0a6f9f40dc3e 100Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-kafka-1 Bound pvc-9160e85a-f1b4-11e9-843c-12e73ceaa62c 100Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-kafka-2 Bound pvc-91613a33-f1b4-11e9-843c-12e73ceaa62c 100Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-zookeeper-0 Bound pvc-73630d23-f1b4-11e9-843c-12e73ceaa62c 10Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-zookeeper-1 Bound pvc-7374c25c-f1b4-11e9-843c-12e73ceaa62c 10Gi RWO ocs-storagecluster-ceph-rbd 18h data-kafka-cluster-zookeeper-2 Bound pvc-73736821-f1b4-11e9-b1e6-0a6f9f40dc3e 10Gi RWO ocs-storagecluster-ceph-rbd 18h $ $ oc get pv -o json | jq -r '.items | sort_by(.spec.capacity.storage)[]| select(.spec.claimRef.namespace=="amq-streams") | [.spec.claimRef.name,.spec.capacity.storage] | @tsv' data-kafka-cluster-kafka-0 100Gi data-kafka-cluster-kafka-1 100Gi data-kafka-cluster-kafka-2 100Gi data-kafka-cluster-zookeeper-0 10Gi data-kafka-cluster-zookeeper-2 10Gi data-kafka-cluster-zookeeper-1 10Gi $
At this point you have a running Kafka and ZooKeeper cluster on Red Hat OpenShift 4, deployed through Red Hat AMQ Streams Operator, consuming persistent block storage from Red Hat OpenShift Container Storage 4.
Create Kafka Topic and Kafka User
To start consuming Kafka, we first need to create a Kafka topic. AMQ Streams provides an operator to manage Kafka topics and Kafka users.
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/02-kafka-topic.yaml
List Kafka Topics (kt):
oc get kt
Example output:
$ oc get kt NAME PARTITIONS REPLICATION FACTOR my-topic 3 3 $
Create a Kafka user.
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/03-kafka-user.yaml
List Kafka users.
oc get kafkauser
Example output:
$ oc get kafkauser NAME AUTHENTICATION AUTHORIZATION kafka-user1 tls simple $
Now, we have a Kafka Topic and a Kafka user created on our Kafka Cluster using Red Hat AMQ Streams Operator
Create a sample Kafka Producer and Consumer Application
To demonstrate Kafka usage, let’s deploy a sample hello-world-producer application.
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/04-hello-world-producer.yaml
This sample application will produce one million messages in an iterative manner
To review the Kafka producer messages lets tail to the logs of `hello-world-producer` app.
oc logs -n amq-streams -f $(oc get pods -l app=hello-world-producer -o name)
Note : You can leave the hello-world-consumer shell tab open to see live messages production, however, you can always press Ctrl+C
to cancel the producer messages
Instead of the CLI, you could also view logs from the OpenShift Console.
We now have a Kafka producer app generating messages and pushing the messages to Kafka Topic. We now deploy a sample hello world consumer app, which will consume messages from the Kafka Topic.
Deploy the Kafka consumer application:
oc apply -f https://raw.githubusercontent.com/red-hat-storage/ocs-training/master/ocs4kafka/05-hello-world-consumer.yaml
Monitor logs of the Kafka consumer app, in real time using CLI Or via OpenShift Console(shown above)
oc logs -n amq-streams -f $(oc get pods -l app=hello-world-consumer -o name)
Press Ctrl+C
to cancel.
Making the Kafka cluster more resilient
Kafka out-of-the-box provides high availability and fault tolerance. However, the storage backend used underneath Kafka plays a vital role in the overall resiliency of the system.
In this section, we will demonstrate how OpenShift Container Storage increases the resiliency of the Kafka cluster by instantly attaching the PV to replacement pods in case of failure detection.
-
Open four SSH sessions, and log in to OpenShift cluster using OC client
-
In session-1: tail the real-time logs of Kafka producer application, created in the last section
oc logs -n amq-streams -f $(oc get pods -l app=hello-world-producer -o name)
-
In session-2 : tail the real-time logs of kafka consumer application, created in the last section
oc logs -n amq-streams -f $(oc get pods -l app=hello-world-consumer -o name)
-
In session-3: watch OCP Kafka pods, PV and PVC status
oc project amq-streams watch oc get pods,pv,pvc
-
Next, we will intentionally induce Kafka pod failure. Keep a close eye on all three sessions. OCS makes Kafka pod recovery instant, resulting in no outage in your messaging infrastructure.
-
In session-4: run the following command to induce failure of the Kafka pod
oc delete pods kafka-cluster-kafka-0 --grace-period=0
Summary
You will notice that the Kafka producer and consumer applications would not notice any outage due to one of the Kafka pod failures, instead they will continue to function as if nothing happened. In the background, when Kubernetes detects Kafka pod failure, it launches a new Kafka pod and attaches the persistent volume which was once attached to the previous Kafka pod.
The Kafka pod, upon instantiation, will start serving requests without rebuilding the dataset as the data is already persisted by the OCS persistent volume. Hence compared to ephemeral storage, OCS persistent volume, preserves the data and complements the Kafka cluster with faster recovery and resiliency in case of failures.
Resources and Feedback
To find out more about OpenShift Container Storage or to take a test drive, visit our Red Hat OpenShift Container Storage 4 page.
If you would like to learn more about what the OpenShift Container Storage team is up to or provide feedback on any of the new 4.2 features, take this brief 3-minute survey.
저자 소개
채널별 검색
오토메이션
기술, 팀, 인프라를 위한 IT 자동화 최신 동향
인공지능
고객이 어디서나 AI 워크로드를 실행할 수 있도록 지원하는 플랫폼 업데이트
오픈 하이브리드 클라우드
하이브리드 클라우드로 더욱 유연한 미래를 구축하는 방법을 알아보세요
보안
환경과 기술 전반에 걸쳐 리스크를 감소하는 방법에 대한 최신 정보
엣지 컴퓨팅
엣지에서의 운영을 단순화하는 플랫폼 업데이트
인프라
세계적으로 인정받은 기업용 Linux 플랫폼에 대한 최신 정보
애플리케이션
복잡한 애플리케이션에 대한 솔루션 더 보기
오리지널 쇼
엔터프라이즈 기술 분야의 제작자와 리더가 전하는 흥미로운 스토리
제품
- Red Hat Enterprise Linux
- Red Hat OpenShift Enterprise
- Red Hat Ansible Automation Platform
- 클라우드 서비스
- 모든 제품 보기
툴
체험, 구매 & 영업
커뮤니케이션
Red Hat 소개
Red Hat은 Linux, 클라우드, 컨테이너, 쿠버네티스 등을 포함한 글로벌 엔터프라이즈 오픈소스 솔루션 공급업체입니다. Red Hat은 코어 데이터센터에서 네트워크 엣지에 이르기까지 다양한 플랫폼과 환경에서 기업의 업무 편의성을 높여 주는 강화된 기능의 솔루션을 제공합니다.