1. Introduction #
This guide will walk you through:
- Setting up Kafka using Docker Compose.
- Writing and reading messages to and from a Kafka topic using Go, functioning as both a producer and a consumer.
2. Setting Up Kafka with Docker Compose #
To run Kafka locally, use Docker Compose. Begin by creating a docker-compose.yaml
file with the following configuration:
yamlservices: kafka: image: bitnami/kafka:latest ports: - '9092:9092' - '9093:9093' environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
docker-compose.yaml
Next, launch the Kafka service using Docker Compose:
bashdocker compose up
3. Setting up a Go Project #
Start by initializing a new Go project with the following commands:
bashmkdir kafka-go-hello-world cd kafka-go-hello-world go mod init github.com/gurleensethi/kafka-go-hello-world
To interact with Kafka, we'll use the popular kafka-go package. Install it in your project:
bashgo get github.com/segmentio/kafka-go
4. Creating a Producer in Go #
Create a folder named producer
and within it, create a file called main.go
.
Start by creating a new connection. When creating a new connection you have to specify the topic name and partition id.
gopackage main import ( "context" "log" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0) if err != nil { log.Fatalf("failed to connect to Kafka: %v", err) } defer conn.Close() }
producer/main.go
Note on Topic Creation
There's no need to manually create the my-topic
topic.
With KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
set in the docker-compose.yaml file, Kafka will automatically create a topic if it doesn't already exist.
Send a "hello world" message using conn.WriteMessages
function.
gopackage main import ( "context" "log" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0) if err != nil { log.Fatalf("failed to connect to Kafka: %v", err) } defer conn.Close() _, err = conn.WriteMessages(kafka.Message{ Value: []byte("hello world"), }) if err != nil { log.Fatalf("failed to write messages: %v", err) } }
producer/main.go
Run the program.
bashgo run producer/main.go
5. Creating a Consumer in Go #
Create a folder named consumer
and within it, create a file called main.go
.
Similar to what you did with the producer, start by creating a new connection. When creating a new connection you have to specify the topic name and partition id.
gopackage main import ( "context" "fmt" "log" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0) if err != nil { log.Fatalf("failed to connect to Kafka: %v", err) } defer conn.Close() }
consumer/main.go
You are going to use the ReadBatch
function to read a batch of messages.
To read a message from the batch, use the batch.ReadMessage
function.
gopackage main import ( "context" "fmt" "log" "github.com/segmentio/kafka-go" ) func main() { ctx := context.Background() conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0) if err != nil { log.Fatalf("failed to connect to Kafka: %v", err) } defer conn.Close() batch := conn.ReadBatch(1, 1024) if err != nil { log.Fatalf("failed to create a message batch: %v", err) } msg, err := batch.ReadMessage() if err != nil { log.Fatalf("failed to read message: %v", err) } fmt.Println("Received message:", string(msg.Value)) err = batch.Close() if err != nil { log.Fatalf("failed to close message batch: %v", err) } }
consumer/main.go
Reading entire batch
Since the batch can contain multiple messages, you can call ReadMessage
back-to-back in a loop to read all the messages.
Run the program and you should see the output.
bashgo run producer/main.go hello world
Thank you for reading the article and don't forget to check out kafka-go-hello-world repository.