codeflood logo

Why is my Kafka consumer not consuming?

Kafka is a great piece of technology for integrating disparate applications in a decoupled manner. It does however have some nuances which are important to understand. I’ve been tripped up a few times while doing local development, so I thought I’d write it all down in the hope of saving someone else some frustration.

The particular scenario that tripped me up was when my Kafka consumer application was not receiving any messages. But first, we’ll need to cover some Kafka basics.

Kafka Basics

Kafka is a message broker. Messages are created and published by “producers” and are received and processed by “consumers”. The producers and consumers are your own applications which interact with Kafka in those roles.

Messages are produced to topics in Kafka, and each topic is divided up into multiple partitions. A single message is produced to a single partition only. When a consumer subscribes to a topic, it will be assigned a number of partitions and will consume the messages only from those partitions. If you have more consumers than you have topic partitions, some consumers will not be assigned any partitions and will remain idle.

Kafka partition assignment with a single consumer
Figure 1: Kafka partition assignment with a single consumer.
Kafka partition assignment with many consumers
Figure 2: Kafka partition assignment with many consumers.

Kafka allows scaling by adding more partitions to a topic. This provides more throughput on the topic and allows more consumers to process the messages on the topic.

Example Code

Producing messages with the Confluent Kafka C# client is easy. Just connect your producer to your Kafka broker and produce the messages into the necessary topic:

Producer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
using Confluent.Kafka;

const string TopicName = "test";

var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "producer1"
};

var builder = new ProducerBuilder<Null, string>(config);
using var producer = builder.Build();

var message = new Message<Null, string>
{
Value = "hello"
};

await producer.ProduceAsync(TopicName, message);

Kafka messages have a key which is used for routing the message into topic partitions. With defaults, messages with the same key will be routed to the same partition and ordering will be respected for the messages inside that partition. In the above code example I don’t care about how the messages are routed and their ordering, so I’ve used the Confluent.Kafka.Null type as the type of the key. This is the first generic type on the ProducerBuilder class and the Message class. The other generic type is the type of the message value.

When consuming messages we need to consider whether our consumer application instances will be competing or observing. In Kafka terms, competing consumers are consumers which have the same consumer group ID. Each consumer application instance within the group will only see a subset of the messages from the topic, but all messages from the topic will be consumed across all consumer application instances within the group. When debugging why a consumer doesn’t get messages, consider whether your consumer is observing (should see all messages) or competing (should see a subset of messages).

Consumer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
using Confluent.Kafka;

const string TopicName = "test";

var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "consumer1",
GroupId = "consumer"
};

var builder = new ConsumerBuilder<Null, string>(config);
using var consumer = builder.Build();
consumer.Subscribe(TopicName);

var message = consumer.Consume();
Console.WriteLine(message.Message.Value);
consumer.Close();

The Problem

To explore this issue, I’ll use a topic with 3 partitions.

The sample code above is enough to produce some messages and consume them, and I can reproduce the issue where my consumer application was never seeing any messages. Here’s how to reproduce the issue:

  1. Run the producer application which will produce 1 message.
  2. Run the consumer application and wait for 30 seconds. It will not see the message.

The reason we need to wait for 30 seconds is because topic subscription and partition assignment takes some time. During that time the consumer will not see any messages. In the example code above I’m using an overload of consumer.Consume() which blocks until a message has been received by the consumer.

Analysis

Now we can observe the issue, we can start analyzing the cause.

Firstly, I want to ensure my consumer application is getting at least 1 topic partition assigned to it. If the consumer has no partitions assigned, it will never see any messages. I’ll start by using the kafka-consumer-groups.sh utility which is shipped with Kafka, to see if any applications are subscribed to any topics. I’ll execute these commands while the consumer application is running, to ensure if the application has any assignments, they will be listed. The Kafka utilities can be found in the bin folder of the Kafka release folder.

1
2
3
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

consumer

I can see 1 consumer group present and it’s named consumer, which matches the GroupId in the example code above. Let’s get more detail on that group by using kafka-consumer-groups.sh to describe the group.

1
2
3
4
5
6
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumer

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer test 0 - 0 - consumer1-a54ab70b-ac96-4be6-bb8c-01a1b46d9f51 /127.0.0.1 consumer1
consumer test 1 - 1 - consumer1-a54ab70b-ac96-4be6-bb8c-01a1b46d9f51 /127.0.0.1 consumer1
consumer test 2 - 0 - consumer1-a54ab70b-ac96-4be6-bb8c-01a1b46d9f51 /127.0.0.1 consumer1

The output shows the consumer application has been assigned all partitions of topic test. Note that the consumer ID is the same for each partition, indicating it’s the same application instance.

We can also validate the consumer application has assignments from the application itself, by using a “Partitions Assigned Handler”. I’ll set one of those when creating the consumer client so we can log when we have partitions assigned.

Consumer.cs
1
2
3
4
var builder = new ConsumerBuilder<Null, string>(config);
builder.SetPartitionsAssignedHandler((c, assignments) => {
Console.WriteLine($"Assigned partitions {string.Join(", ", assignments.Select(x => x.Partition.Value))}");
});

Now the consumer application will report when it has partitions assigned to it.

So far we haven’t found the cause of the issue. All the reporting above shows the consumer application is subscribed properly and has an assignment. So why is it not seeing any messages?

The consumer application would not have any assignments if there were no more partitions left to assign. Remember from the “Kafka Basics” section above, if there are more consumer applications than there are partitions available then some instances would get assignments while others would not. If that were the case, then I would expect to see the other application instances listed in the consumer group with different consumer IDs.

To show how that would look, I’ll run several instances of the consumer application.

1
2
3
4
5
6
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumer

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer test 2 - 0 - consumer1-a54ab70b-ac96-4be6-bb8c-01a1b46d9f51 /127.0.0.1 consumer1
consumer test 1 - 1 - consumer1-7c719519-5dcb-4a46-bed8-6daeefab981a /127.0.0.1 consumer1
consumer test 0 - 0 - consumer1-5afb5e3a-ebee-40ce-b14e-e823abaa8a8a /127.0.0.1 consumer1

In the above output, all consumer IDs are different, so I can see there are other application instances subscribing to the topic.

However, that’s not the case for the issue we’re investigating. There are 2 causes of the observed behavior:

  1. I’m using the default value for AutoOffsetReset on the consumer configuration.
  2. I’m running the producer and consumer applications one at a time.

The AutoOffsetReset consumer configuration instructs the Kafka broker on what to do when it sees a new consumer group for the first time. When a new consumer group connects to the Kafka broker we need to decide whether the group will be sent all the previous messages on the topic, or only start sending new messages as they come in. The default value of AutoOffsetReset is AutoOffsetReset.Latest, so new groups will only see new messages. Whether that’s appropriate or not depends on your application and what it’s doing. If you wanted your consumer to receive all previous messages when it joins the Kafka broker for the first time, you could instead use the AutoOffsetReset.Earliest value.

Once the consumer sees a single message and commits it (committing is automatic in the example code above), it will always resume from after that message in the future, even if it’s been offline for hours or even days (depending on the retention configuration of the topic). AutoOffsetReset is all about consumer groups which have never committed any message offsets on the broker.

So the key to solving this issue is to simply have the consumer application running before the producer application is run. Let’s adjust the reproduction scenario accordingly:

  1. Run the consumer application and wait for 30 seconds.
  2. Run the producer application which will produce 1 message.
  3. The consumer will now see the message

What’s more, we can stop the consumer application now and producer many messages and when the consumer starts again it will resume from the last message it had processed.

Conclusion

The key to understanding this problem was in the AutoOffsetReset consumer configuration and the impact it has when new consumer groups connect to the Kafka broker for the first time. Just making sure the consumer is running before the producer starts producing messages is all that’s required to solve the problem. Luckily, this is not something you’d normally observe in production, because in production all applications would be running at once. This was a circumstance of running applications one at a time locally during development.

Comments

Comments are closed

loading...

Leave a comment

All fields are required.