Hello World in Kafka with Go

Read more articles on Kafka, Go
github logo
Github Repository
Code for this articles is available on GitHub
Hello World in Kafka with Go

Subscribe

Get updated 1 - 2 times a month when new articles are published, no spam ever.

1. Introduction #

This guide will walk you through:

  1. Setting up Kafka using Docker Compose.
  2. Writing and reading messages to and from a Kafka topic using Go, functioning as both a producer and a consumer.

kafka-go

2. Setting Up Kafka with Docker Compose #

To run Kafka locally, use Docker Compose. Begin by creating a docker-compose.yaml file with the following configuration:

 
yaml
services: kafka: image: bitnami/kafka:latest ports: - '9092:9092' - '9093:9093' environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
docker-compose.yaml

Next, launch the Kafka service using Docker Compose:

 
bash
docker compose up

3. Setting up a Go Project #

Start by initializing a new Go project with the following commands:

 
bash
mkdir kafka-go-hello-world cd kafka-go-hello-world go mod init github.com/gurleensethi/kafka-go-hello-world

To interact with Kafka, we'll use the popular kafka-go package. Install it in your project:

 
bash
go get github.com/segmentio/kafka-go

4. Creating a Producer in Go #

Create a folder named producer and within it, create a file called main.go.

Start by creating a new connection. When creating a new connection you have to specify the topic name and partition id.

 
go
package main import ( "context" "log" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0) if err != nil { log.Fatalf("failed to connect to Kafka: %v", err) } defer conn.Close() }
producer/main.go

Note on Topic Creation

There's no need to manually create the my-topic topic.
With KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true set in the docker-compose.yaml file, Kafka will automatically create a topic if it doesn't already exist.

Send a "hello world" message using conn.WriteMessages function.

 
go
package main import ( "context" "log" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0) if err != nil { log.Fatalf("failed to connect to Kafka: %v", err) } defer conn.Close() _, err = conn.WriteMessages(kafka.Message{ Value: []byte("hello world"), }) if err != nil { log.Fatalf("failed to write messages: %v", err) } }
producer/main.go

Run the program.

 
bash
go run producer/main.go

5. Creating a Consumer in Go #

Create a folder named consumer and within it, create a file called main.go.

Similar to what you did with the producer, start by creating a new connection. When creating a new connection you have to specify the topic name and partition id.

 
go
package main import ( "context" "fmt" "log" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0) if err != nil { log.Fatalf("failed to connect to Kafka: %v", err) } defer conn.Close() }
consumer/main.go

You are going to use the ReadBatch function to read a batch of messages.
To read a message from the batch, use the batch.ReadMessage function.

 
go
package main import ( "context" "fmt" "log" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0) if err != nil { log.Fatalf("failed to connect to Kafka: %v", err) } defer conn.Close() batch := conn.ReadBatch(1, 1024) if err != nil { log.Fatalf("failed to create a message batch: %v", err) } msg, err := batch.ReadMessage() if err != nil { log.Fatalf("failed to read message: %v", err) } fmt.Println("Received message:", string(msg.Value)) err = batch.Close() if err != nil { log.Fatalf("failed to close message batch: %v", err) } }
consumer/main.go

Reading entire batch

Since the batch can contain multiple messages, you can call ReadMessage back-to-back in a loop to read all the messages.

Run the program and you should see the output.

 
bash
go run producer/main.go hello world

Thank you for reading the article and don't forget to check out kafka-go-hello-world repository.

Subscribe

Get updated 1 - 2 times a month when new articles are published, no spam ever.

    TheDeveloperCafe Ā© 2022-2024