1. Introduction #
In this article you are going to learn how to write a custom partitioner for Kafka producers in Go.
You are going to use the kafka-go library.
Use Case Assumption: Let's consider an application that generates various logs with different log levels such as error, info, warn, etc. We aim to develop a custom partitioner that ensures error logs are sent to a specific partition.
What is a partitioner?
A partitioner is used by Kafka to determine which partition a message should be sent to when a topic has multiple partitions.
2. Setting Up Kafka with Docker Compose #
To run Kafka locally, use Docker Compose. Begin by creating a docker-compose.yaml
file with the following configuration:
yamlservices: kafka: image: bitnami/kafka:latest ports: - '9092:9092' - '9093:9093' environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
docker-compose.yaml
Next, launch the Kafka service using Docker Compose:
bashdocker compose up
3. Setting up a Go Project #
Start by initializing a new Go project with the following commands:
bashmkdir kafka-go-custom-partitioner cd kafka-go-custom-partitioner go mod init github.com/gurleensethi/kafka-go-custom-partitioner
To interact with Kafka, you will use the popular kafka-go package. Install it in your project:
bashgo get github.com/segmentio/kafka-go
4. Creating a Producer #
Create a folder named producer
and within it, create a file called main.go
.
4.1 Topic Creation #
Begin by establishing a new connection, then proceed to call the CreateTopics
function to create a new topic in Kafka.
Ensure that the number of partitions is set to 3
.
gopackage main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() topic := "logs" conn, err := kafka.DialContext(ctx, "tcp", "localhost:9092") if err != nil { panic(err) } err = conn.CreateTopics(kafka.TopicConfig{ Topic: topic, NumPartitions: 3, ReplicationFactor: 1, }) if err != nil { panic(err) } }
producer/main.go
Run the program.
bashgo run main.go
Note
You can execute the program multiple times without encountering errors. If the topic already exists, the function call will simply be ignored and no action will be taken.
4.2 Creating a writer #
In the same file, after creating a new topic, proceed to instantiate a new writer using the kafka.NewWriter
function.
Utilizing the writer, send two messages, assigning the key "error"
to one message and "warn"
to the other. These keys will be used when you implement a custom partitioner.
gopackage main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { // ... writer := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: topic, }) err = writer.WriteMessages(ctx, kafka.Message{ Key: []byte("error"), Value: []byte("some error happened in the application"), }, kafka.Message{ Key: []byte("warn"), Value: []byte("this is a warning"), }) if err != nil { panic(err) } fmt.Println("written messages successfully") }
producer/main.go
4.3 Creating a partitioner #
In the kafka-go package, a partitioner is referred to as a Balancer.
To create a new Balancer, you need to implement the kafka.Balancer
interface, which contains a single function.
Below is the Balancer interface source code from kafka-go package.
go// The Balancer interface provides an abstraction of the message distribution // logic used by Writer instances to route messages to the partitions available // on a kafka cluster. // // Balancers must be safe to use concurrently from multiple goroutines. type Balancer interface { // Balance receives a message and a set of available partitions and // returns the partition number that the message should be routed to. // // An application should refrain from using a balancer to manage multiple // sets of partitions (from different topics for examples), use one balancer // instance for each partition set, so the balancer can detect when the // partitions change and assume that the kafka topic has been rebalanced. Balance(msg Message, partitions ...int) (partition int) }
The package provides several predefined balancers:
- Round Robin (default)
- Least Bytes
- Hash
- Reference Hash
- CRC32
- Murmur2
👉 Let's adopt a straightforward strategy: all messages with a key of error
will be directed to the first partition, i.e., partition 0
. Everything else will be evenly distributed among the remaining partitions using round-robin.
Within the same file, define a new CustomerBalancer
type and implement the Balance
function.
gotype CustomBalancer struct { // we use the inbuilt round robin balancer baseBalancer kafka.RoundRobin } func (b *CustomBalancer) Balance(msg kafka.Message, partitions ...int) (partition int) { // only error logs are sent to first partition if string(msg.Key) == "error" { return 0 } // everything else gets round robined to other paritions return b.baseBalancer.Balance(msg, partitions[1:]...) }
producer/main.go
Utilize the custom balancer by passing it to the NewWriter
function call.
gopackage main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { // ... writer := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: topic, Balancer: &CustomBalancer{}, // 👈 }) // ... }
producer/main.go
Execute the program, the first message is sent to partition 0
, while the second message is sent to partition 1
.
5. Creating a Consumer #
Write a consumer that only consumes messages from partition 0
of topic logs
.
gopackage main import ( "context" "fmt" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() topic := "logs" reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: topic, Partition: 0, // read only error logs }) for { msg, err := reader.ReadMessage(ctx) if err != nil { panic(err) } fmt.Println(string(msg.Key)) fmt.Println(string(msg.Value)) } }
consumer/main.go
Run the program.
go run consumer/main.go
error
some error happened in the application
Thank you for reading the article and don't forget to check out kafka-go-custom-partitioner repository.