Published by

Il y a 2 ans -

Temps de lecture 1 minute

Kafka-Streams: a road to autoscaling via Kubernetes

Kafka-Streams is the stream processing library included in Apache Kafka, a streaming data platform. Because Kafka-Streams is a simple library, and not a framework, it’s used by applications that can be deployed and run in many ways. This article aims to present a few advantages that come with specific practices like containerization and orchestration, and especially autoscaling. Kafka-Streams is meant to be highly scalable. Let’s explore a way to automatically benefit from this elasticity with Kubernetes.

kafkastreams on kubernetes

tl;dr

This article is based on the talk Scale in / Scale out with Kafka-Streams and Kubernetes from Xebicon’18. The repository xke-kingof-scaling contains all the YAML examples used in the experiment. A French version is available on blog.xebia.fr.

Consumer lag, a reason to scale

Why would we want to scale our streaming apps automatically? In this part we focus on Kafka-Streams applications and their properties to see how autoscaling can help.

Message consumption

Kafka consumers subscribe to topics and then regularly poll messages from it. The maximum number of messages per poll can be configured using the fetch.max.bytes or max.poll.records configuration. The consumer also commits the offsets of the messages to declare them as consumed and treated. When it comes to Kafka-Streams, polling intervals are generally short enough to get a real time processing effect.

So far so good! How could this cause any problems?

A Kafka-Streams application can be late and present a lag regarding the message consumption. There is a gap between the latest offset in the topic and the current offset treated by the application. This is referred to as an important record-lag and it can happen for several reasons:

  • An application restart after a long down-time
  • An intensive stateful transformation
  • A peak period of usage for a given service

Like most other distributed systems, Kafka makes a really good usage of the maxim “divide and conquer”, now let’s see how it works in practice.

The consumer protocol

When a single instance of a streaming application is running, all the input topic partitions get assigned to it.

Reminder : by default messages are distributed based on their key. Messages with the same key are stored in the same partition.

By starting a second instance of our application we trigger a partition rebalance. All partitions get redistributed on the two instances. Doing so, we split the workload in two. Finally our maximum parallelism level corresponds to the number of partitions. Given an input topic with N partitions, any N+1 instance would be in stand by, waiting for another instance failure. All instances from a same consumer-group consume a distinct sequence of messages. The record-lag of the group corresponds to the sum of each instance lag.

Fig 1 : Kafka-Streams and the consumer-group

Kubernetes and the custom metrics support

We have covered a lot so far, let’s go through it again before moving on. We have several instances of the same application, and we’d like to add or remove them based on the evolution of a shared metric exposed by the application: the consumer record-lag.

This looks like a perfect job for Kubernetes!

Kubernetes maintains the number of deployments asked for a given application. It can also natively scale (in or out) based on CPU usage or memory consumption. Fortunately, since version 1.6, it can also scale applications on custom metrics. This feature requires to enrich the original Kubernetes APIs with additional adapters. Among all possible implementation of adapters, we chose the adapters based on Stackdriver to create a bridge between Kubernetes custom metrics and our Kafka-Streams JMX metrics:

K8s

Fig 2 : Exporting metrics from Kafka-Streams

We first expose JMX metrics of the streaming application in Prometheus format. Each application instance has a sidecar prometheus-to-sd to scrap the metrics and send them to Stackdriver. Now lags can be plotted, but also queried by a metric server. At this point, the metric server custom-metrics-stackdriver-adapter feeds the Kubernetes master with the new custom metric values.

Now let’s put all the pieces together.

Expose the JMX metrics in a Prometheus format

Prometheus is an open-source monitoring and alerting toolkit and one of the first software member of CNCF. It defines a display format for metrics. This format, which is becoming a reference, is used in the following part of the experiment. To do so, we use the jmx-exporter project to format the metrics from our application.

We add a few JVM parameters to the streaming app:

java -cp ... 
-Djava.rmi.server.hostname=127.0.0.1
-Djava.rmi.server.port=7071
-javaagent:/<>/jmx_prometheus_<version>.jar=9001:/<>/config.yaml

This way metrics are exposed on port 7071 and we can access them as a formatted version through HTTP on the port 9001. The config.yaml file describes the metrics exposed.

global:
rules:
pattern:"kafka.consumer<type=consumer-fetch-manager-metrics,client-id=(.*),topic=GAME-FRAME-RS, partition=(.*)><>records-lag:(.*)"
   labels: { client: $1, partition: $2, topic: GAME-FRAME-RS, metric: records-lag }
   name: "consumer_lag_game_frame_rs"
   type: GAUGE

With this block of configuration we are able to export all the metrics among kafka.consumer of type consumer-fetch-manager-metrics with the information records-lag (for the input topic GAME-FRAME-RS). We duplicate this block of config for each input topic partitions. Note that we assign the type GAUGE to this configuration, this is required. (complete file)

Building the Docker image of the streaming app

Metrics can now be queried in HTTP on a single machine in development mode.

Kafka-Consumer metrics via Jmx-exporter – Recorded by loicmdivad

Finally, we need to package everything inside a docker image to take advantage of this new feature inside the pod of Kubernetes. The build tool used here is Gradle and by adding the Docker plugin we can configure the project as follow:

plugins { id 'com.palantir.docker' version '0.20.1' }
apply plugin: 'com.palantir.docker'
mainClassName = 'fr.xebia.ldi.stream.Main'
docker {
   tags version
   dockerfile file('docker/Dockerfile')
   name 'gcr.io/cloud-fighter-101/kos-streaming-app'
}

In a few lines we declare:

  • The Dockerfile to build
  • The entry point of the streaming app (Main class)
  • Name, version and repository where to upload the image

At the root of the container we add the following files:

$ tree -l 3
#/
#└── opt
#    └── kos-stream
#        ├── config.yaml
#        └── jmx_prometheus_javaagent-0.3.1.jar

These two files are referenced from the JVM parameters (see the previous section).

Metrics aggregation to Stackdriver

Now that metrics are exposed on the address and port of a Kubernates Pod, the next step is to use prometheus-to-sd from the k8s-stackdriver project. To do so, we include an image in the streaming application pod like a sidecar. Its goal is to scrap the metrics and send them to Stackdriver. By doing so, metrics will be both persisted and displayed in a dashboard.

# ...
- name: prometheus-to-sd
 image: gcr.io/google-containers/prometheus-to-sd:v0.2.6
 command:
 - /monitor
 - --source=:http://localhost:9001
 - --stackdriver-prefix=custom.googleapis.com
 - --pod-id=$(POD_ID)
 - --namespace-id=$(POD_NAMESPACE)
 # ...

Note that the --source flag uses port 9001 in accordance with the configuration of jmx-prometheus.

kafka-prometheus

MetricServer Setup

At this point, metrics from our application are feeding nice dashboards that our Kubernetes master cannot read! What it can do instead is accessing the /apis/metrics endpoint, so what we can do is to enrich this API.

$ kubectl get --raw "/apis/custom.metrics.k8s.io/v1beta1" | jq
# Error from server (NotFound): the server could not find the requested resource

Here the result does not contain any reference to the custom metrics.

The application custom-metrics-stackdriver-adapter from the k8s-stackdriver project is meant to enrich the Kubernetes API by exposing custom metrics stored in Stackdriver. The main goal is to enable autoscaling on custom metrics. To deploy it, all you need to do is running the following command:

kubectl create -f \ https://raw.githubusercontent.com/GoogleCloudPlatform/k8s-stackdriver/master/custom-metrics-stackdriver-adapter/deploy/production/adapter.yaml
$ kubectl get --raw "/apis/custom.metrics.k8s.io/v1beta1" | jq#{
#  "name": "*/custom.googleapis.com|consumer_lag_game_frame_rs",
#  ...
#  "name": "*/custom.googleapis.com|consumer_lead_game_frame_rq",
#  ...
#}

Now the Kubernetes API has a lot more endpoints exposed. Most of them are related to custom or external metrics. We are now really close to enabling autoscaling.

Horizontal Pod Autoscaler configuration

Let’s put all the things we’ve done so far into one picture:

K8s-custom-metrics

Fig 6 : Exporting metrics from Kafka-Streams

In the blue pod, we have the streaming application and its sidecar. The sidecar scraps an internal endpoint from the pod and sends all the custom metrics to Stackdriver. Then Stackdriver acts as a backend to the metric server (deployed in a pink Pod). Now the Kubernetes master has all the custom metrics it needs. The last thing to do is setup a particular behavior for a given threshold and metric. It works similarly to a standard metric, and we configure an HPA (Horizontal Pod Autoscaler) as follows:

apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
spec:
 scaleTargetRef:
   apiVersion: apps/v1
   kind: Deployment
   name: kstreams
 minReplicas: 1
 maxReplicas: 4
 metrics:
  - type: Pods
   pods:
     metricName: custom.googleapis.com|consumer_lag_game_frame_rq
     targetAverageValue: 100000
  // - other metrics

Note that we use version v2beta1 of the autoscaling API. The maximum replicas is set to 4 since we have 4 partitions for each input topic. The name of the targeted metric is custom.googleapis.com followed by the name given in the jmx-exporter config file.

Conclusion

By generating enough messages in a short amount of time, we eventually reach the threshold specified by the HPA configuration. Additional pods are added by Kubernetes. They all contain an instance of the streaming application and their sidecar prometheus-to-sd. We can see a first drop of the record-lag since it’s split over different pods. We can see new lines apprearing which correspond to new instances spawned by Kubernetes. Finally each application instance works to decrease a sub part of the record-lag.

We have seen how, with existing tools like Stackdriver and Kubernetes, it was simple to enrich the capabilities of our streaming application. We showed you that applying autoscaling is possible but I didn’t tell you whether you should do it systematically. Autoscaling has obvious advantages as the ones listed at the beginning of this post, but it’s not always efficient depending on the use case. Scaling comes with a cost, and for some application usages or workloads this may be non negligible. Stateful operations and state migrations are some of the most known issues. To address these issues we could consider using StateFullSets, an alternative to simple Kubernetes Deployment with persistent storage, but we will still have to quantify that cost for different operations.

Going further

Finally, here is a list of useful readings on the same subject:

Published by

Publié par Loic Divad

Loïc est Data Engineer chez Xebia. Il intervient sur des problématiques liées au Big Data comme l’acquisition, le traitement et le stockage des données. Il travaille avec des outils comme Scala, Spark et Kafka.

Commentaire

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Nous recrutons

Être un Sapient, c'est faire partie d'un groupe de passionnés ; C'est l'opportunité de travailler et de partager avec des pairs parmi les plus talentueux.