Kafka with Go - Custom Partitioner

Read more articles on Kafka, Go
github logo
Github Repository
Code for this articles is available on GitHub
Kafka with Go - Custom Partitioner

Subscribe

Get updated 1 - 2 times a month when new articles are published, no spam ever.

1. Introduction #

In this article you are going to learn how to write a custom partitioner for Kafka producers in Go.

You are going to use the kafka-go library.

Use Case Assumption: Let's consider an application that generates various logs with different log levels such as error, info, warn, etc. We aim to develop a custom partitioner that ensures error logs are sent to a specific partition.

use case

What is a partitioner?

A partitioner is used by Kafka to determine which partition a message should be sent to when a topic has multiple partitions.

2. Setting Up Kafka with Docker Compose #

To run Kafka locally, use Docker Compose. Begin by creating a docker-compose.yaml file with the following configuration:

 
yaml
services: kafka: image: bitnami/kafka:latest ports: - '9092:9092' - '9093:9093' environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
docker-compose.yaml

Next, launch the Kafka service using Docker Compose:

 
bash
docker compose up

3. Setting up a Go Project #

Start by initializing a new Go project with the following commands:

 
bash
mkdir kafka-go-custom-partitioner cd kafka-go-custom-partitioner go mod init github.com/gurleensethi/kafka-go-custom-partitioner

To interact with Kafka, you will use the popular kafka-go package. Install it in your project:

 
bash
go get github.com/segmentio/kafka-go

4. Creating a Producer #

Create a folder named producer and within it, create a file called main.go.

4.1 Topic Creation #

Begin by establishing a new connection, then proceed to call the CreateTopics function to create a new topic in Kafka.

Ensure that the number of partitions is set to 3.

 
go
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() topic := "logs" conn, err := kafka.DialContext(ctx, "tcp", "localhost:9092") if err != nil { panic(err) } err = conn.CreateTopics(kafka.TopicConfig{ Topic: topic, NumPartitions: 3, ReplicationFactor: 1, }) if err != nil { panic(err) } }
producer/main.go

Run the program.

 
bash
go run main.go

Note

You can execute the program multiple times without encountering errors. If the topic already exists, the function call will simply be ignored and no action will be taken.

4.2 Creating a writer #

In the same file, after creating a new topic, proceed to instantiate a new writer using the kafka.NewWriter function.

Utilizing the writer, send two messages, assigning the key "error" to one message and "warn" to the other. These keys will be used when you implement a custom partitioner.

 
go
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { // ... writer := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: topic, }) err = writer.WriteMessages(ctx, kafka.Message{ Key: []byte("error"), Value: []byte("some error happened in the application"), }, kafka.Message{ Key: []byte("warn"), Value: []byte("this is a warning"), }) if err != nil { panic(err) } fmt.Println("written messages successfully") }
producer/main.go

4.3 Creating a partitioner #

In the kafka-go package, a partitioner is referred to as a Balancer.

To create a new Balancer, you need to implement the kafka.Balancer interface, which contains a single function.

Below is the Balancer interface source code from kafka-go package.

 
go
// The Balancer interface provides an abstraction of the message distribution // logic used by Writer instances to route messages to the partitions available // on a kafka cluster. // // Balancers must be safe to use concurrently from multiple goroutines. type Balancer interface { // Balance receives a message and a set of available partitions and // returns the partition number that the message should be routed to. // // An application should refrain from using a balancer to manage multiple // sets of partitions (from different topics for examples), use one balancer // instance for each partition set, so the balancer can detect when the // partitions change and assume that the kafka topic has been rebalanced. Balance(msg Message, partitions ...int) (partition int) }

The package provides several predefined balancers:

👉 Let's adopt a straightforward strategy: all messages with a key of error will be directed to the first partition, i.e., partition 0. Everything else will be evenly distributed among the remaining partitions using round-robin.

custom partitioning strategy

Within the same file, define a new CustomerBalancer type and implement the Balance function.

 
go
type CustomBalancer struct { // we use the inbuilt round robin balancer baseBalancer kafka.RoundRobin } func (b *CustomBalancer) Balance(msg kafka.Message, partitions ...int) (partition int) { // only error logs are sent to first partition if string(msg.Key) == "error" { return 0 } // everything else gets round robined to other paritions return b.baseBalancer.Balance(msg, partitions[1:]...) }
producer/main.go

Utilize the custom balancer by passing it to the NewWriter function call.

 
go
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { // ... writer := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: topic, Balancer: &CustomBalancer{}, // 👈 }) // ... }
producer/main.go

Execute the program, the first message is sent to partition 0, while the second message is sent to partition 1.

5. Creating a Consumer #

Write a consumer that only consumes messages from partition 0 of topic logs.

 
go
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() topic := "logs" reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: topic, Partition: 0, // read only error logs }) for { msg, err := reader.ReadMessage(ctx) if err != nil { panic(err) } fmt.Println(string(msg.Key)) fmt.Println(string(msg.Value)) } }
consumer/main.go

Run the program.

go run consumer/main.go

error
some error happened in the application

Thank you for reading the article and don't forget to check out kafka-go-custom-partitioner repository.

Subscribe

Get updated 1 - 2 times a month when new articles are published, no spam ever.

    TheDeveloperCafe © 2022-2024