Setting Up Apache Kafka with Go

vishal rana
3 min readFeb 13, 2023

Apache kafka is a widely used message broker. In this blog, we’ll cover how you can setup Apache kafka with go and we’ll try to understand each step with exaplantion and attention to details.

A little context on Apache Kafka.

Apache Kafka is a distributed event store and stream-processing platform. It helps you decouple your services. It serves as a buffer store for your messages.

Kafka is simply a collection of topics that splits into one or more partitions. A Kafka partition is a linearly ordered sequence of messages, where each message is identified by their index (called as offset).

Integration

I’m using docker to setup Kafka and zookeeper

  • create a file docker-compose.yml
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  • Start container docker-compose up -d

This will start Kafka and zookeeper server.

What is the end goal for this setup:

  • I should be able to push messages to a topic
  • I should be able to receive messages from a topic.

For integration with Go, we can are going to use:

 github.com/segmentio/kafka-go

Install the package using

go get github.com/segmentio/kafka-go

Once package installion is done, let’s setup our Consumer to start consuming messages


func ReadMessages(ctx context.Context) {
brokerTopic := os.Getenv("BROKER_TOPIC") // topic name from .env
brokers := os.Getenv("BROKER_HOST")// topic host from .env e.g localhost:9092

groupId := os.Getenv("BROKER_GROUP_ID") // group id to manage offset and consume message only once

// make a new reader that consumes from topic, partition 0
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokers},
Topic: brokerTopic,
Partition: 0, // connecting to partition 0
GroupID: groupId,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
StartOffset: kafka.LastOffset,
})

// Continuously poll for new messages
for {
m, err := r.ReadMessage(ctx) // reading messages from topic
if err != nil {
break
}

fmt.Printf("Received message %s\n", string(m.Value))

// Commit the offset
r.CommitMessages(ctx, m)
}

if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}

we have setup our consumer that will read messages from our topic.

  • we have setup a new Reader with topic and partition
  • we have setup a groupId so that once we commit a message, we don’t read that again.

Now, let’s setup our producer. that will eventually push messages to the topic

// creating a variable to store connection
var Connections = connectionVariables{}

type connectionVariables struct {
NotificationConn *kafka.Conn
}

// setting up broker for the system
func SetupBroker() {

brokerTopic := os.Getenv("BROKER_TOPIC")
brokers := os.Getenv("BROKER_HOST")

groupId := os.Getenv("BROKER_GROUP_ID")

ConfigureNotificationBroker(brokers, groupId, brokerTopic)

}

// configuring notifications broker topic connection
func ConfigureNotificationBroker(kafkaBrokerUrls string, groupId string, topic string) {
// setting up the connection to the broker and topic so that we can use it later to push new messages
conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrls, topic, 0)
if err != nil {
log.Fatal("failed to dial leader:", err)
}

// storing connection to be used for later reference
Connections.NotificationConn = conn
}


// function to push message to topic
func PushMessage(message string) {
// setting a write deadline for write
Connections.NotificationConn.SetWriteDeadline(time.Now().Add(10 * time.Second))

// writing a message to topic
_, err := Connections.NotificationConn.WriteMessages(
kafka.Message{Value: []byte(message)},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}

}

we have setup consumer and producer both. You can try adding messages to the topic and the receiver will get the messages and commit the offset.

This is a very basic setup. Code can be refactored as well. This is just to give you an idea of how the setup will look like. If you have any questions, do leave a comment.

Happy Learning!!

--

--