Full distributed trace showing HTTP, Database, and Kafka spans

# Tracing Asynchronous Messaging with OpenTelemetry in Go

Table of Contents

In the previous post, we illuminated the “dark matter” of our microservices: the database layer. We saw how a simple SQL query could be the bottleneck in our request pipeline and how the N+1 problem manifests visually in a trace.

But modern distributed systems rarely stop at just HTTP and SQL. To build resilient, decoupled systems, we often turn to asynchronous messaging using tools like RabbitMQ, Kafka, or SQS.

In our OrderFlow demo app, we have a common pattern:

  1. A user places an order (HTTP).
  2. The Orders service persists the transaction to the database.
  3. It then fires off an order.created event to Kafka.
  4. The Worker service consumes that event and triggers a confirmation email.

Without instrumentation, our trace stops dead after step 2. The Orders service fires the event, but we don’t see it in the trace, and we have no idea what is happening after. The Worker picks it up, but as far as Jaeger is concerned, that’s a completely new, unrelated trace.

Today, we’re going to bridge that gap. We’ll learn how to propagate OpenTelemetry context across the asynchronous boundary of a message queue, creating a single, unbroken trace from the user’s click to the final email delivery.

The Challenge: Context Propagation

In HTTP, context propagation is standardized. We use the traceparent header to pass the TraceID and SpanID from one service to another. Libraries like otelhttp handle this automatically.

Kafka doesn’t have a built-in “traceparent” standard that everyone agrees on, but it does support message headers. Our job is to:

  1. Inject the trace context into the Kafka headers when producing a message.
  2. Extract the trace context from the Kafka headers when consuming a message.

If you want to follow along, clone the repo and checkout the starting branch:

Terminal window
git clone https://github.com/joao-fontenele/orderflow-otel-demo.git
cd orderflow-otel-demo
git checkout 02-database-instrumentation

Or jump straight to 03-messaging-instrumentation to see the final code.

Step 1: The Carrier

OpenTelemetry has an interface called TextMapCarrier that abstracts the mechanism of storage. It essentially says: “I don’t care if you are an HTTP header, a gRPC metadata, or a carrier pigeon; just give me a way to Get, Set, and list Keys.”

The kafka-go library we’re using stores headers as a slice of structs, which doesn’t automatically fit that interface. So, we need to write a simple adapter.

Create internal/messaging/carrier.go:

package messaging
import "github.com/segmentio/kafka-go"
type MessageCarrier struct {
msg *kafka.Message
}
func NewMessageCarrier(msg *kafka.Message) *MessageCarrier {
return &MessageCarrier{msg: msg}
}
// Get retrieves the value for a given key.
func (c *MessageCarrier) Get(key string) string {
for _, h := range c.msg.Headers {
if h.Key == key {
return string(h.Value)
}
}
return ""
}
// Set stores a key-value pair.
func (c *MessageCarrier) Set(key, value string) {
// If the key exists, update it
for i, h := range c.msg.Headers {
if h.Key == key {
c.msg.Headers[i].Value = []byte(value)
return
}
}
// Otherwise, append it
c.msg.Headers = append(c.msg.Headers, kafka.Header{
Key: key,
Value: []byte(value),
})
}
func (c *MessageCarrier) Keys() []string {
keys := make([]string, len(c.msg.Headers))
for i, h := range c.msg.Headers {
keys[i] = h.Key
}
return keys
}

This tiny struct is the Rosetta Stone that allows the OpenTelemetry propagator to talk to Kafka messages.

Step 2: Instrumenting the Producer

When we publish a message, we want to start a span to track the “send” operation. More importantly, we want to inject the context of that span into the message so the consumer knows where it came from.

In internal/messaging/producer.go:

func (p *Producer) Publish(ctx context.Context, key string, event any) error {
// ... marshaling code ...
msg := kafka.Message{
Key: []byte(key),
Value: data,
}
// 1. Start a PRODUCER span
ctx, span := producerTracer.Start(ctx, "send "+p.topic,
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.MessagingSystemKafka,
semconv.MessagingOperationName("send"),
semconv.MessagingOperationTypePublish,
semconv.MessagingDestinationName(p.topic),
semconv.MessagingKafkaMessageKey(key),
),
)
defer span.End()
// 2. Inject the context into the message headers using our Carrier
otel.GetTextMapPropagator().Inject(ctx, NewMessageCarrier(&msg))
// 3. Send the message
if err := p.writer.WriteMessages(ctx, msg); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
return nil
}

Notice trace.SpanKindProducer. This tells Jaeger that this span is sending a message, which helps it draw those nice directional arrows in the UI.

Step 3: Instrumenting the Consumer

On the receiving end, the logic is reversed. We pull the message off the wire, wrap it in our MessageCarrier, and ask the propagator to Extract any trace context hidden in the headers.

If it finds a context, our new “process” span becomes a child of the original “send” span. If not, it just starts a new root trace.

In internal/messaging/consumer.go:

func (c *Consumer) processMessage(ctx context.Context, msg kafka.Message, handler func(ctx context.Context, payload []byte) error) error {
// 1. Extract context from headers
parentCtx := otel.GetTextMapPropagator().Extract(ctx, NewMessageCarrier(&msg))
// 2. Start a CONSUMER span using the extracted parent context
spanCtx, span := consumerTracer.Start(parentCtx, "process "+c.topic,
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(
semconv.MessagingSystemKafka,
semconv.MessagingOperationName("process"),
semconv.MessagingOperationTypeDeliver,
semconv.MessagingDestinationName(c.topic),
semconv.MessagingKafkaConsumerGroup(c.groupID),
semconv.MessagingKafkaMessageOffset(int(msg.Offset)),
),
)
defer span.End()
// 3. Call the actual business logic handler with the new span context
if err := handler(spanCtx, msg.Value); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
return nil
}

Step 4: The Worker’s Outbound Calls

We’re almost there. The Worker service receives the message (now instrumented), but it then needs to make HTTP calls to the Email and Inventory services.

For the trace to continue flowing out of the worker, we need to ensure its HTTP client is also instrumented.

In cmd/worker/main.go:

// Initialize the TracerProvider for the worker
shutdownTracer, err := telemetry.InitTracerProvider(ctx, "worker", "0.1.0")
// ...
// Use otelhttp to wrap the HTTP client
httpClient := &http.Client{
Timeout: 10 * time.Second,
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
notificationHandler := worker.NewNotificationHandler(..., httpClient, logger)

Because we passed spanCtx (which contains the trace ID from Kafka) into our handler in Step 3, and because our handler uses this context when making HTTP requests with httpClient, the propagation chain continues!

The Result: A Unified Trace

Let’s fire it up.

  1. make all-up to start the stack.
  2. ./scripts/generate-traffic.sh to create an order.

Open Jaeger. You will no longer see disjointed traces. Instead, you get this masterpiece:

Full distributed trace showing HTTP, Database, and Kafka spans

Let’s trace the path:

  1. Gateway receives the POST request.
  2. Orders service processes it and commits to the DB (that cluster of yellow spans on the left).
  3. Orders service sends order.created (the brown span).
  4. Worker service processes order.created (the long orange span starting in the middle).
  5. Worker calls Inventory and Email services (the spans on the right).

The gap in the middle? That’s the time the message sat in the Kafka topic waiting to be picked up. We can now visualize queue latency!

Wrapping Up

We have now instrumented the three main pillars of backend communication:

  • Synchronous HTTP (Service to Service)
  • Database SQL (Service to Data)
  • Asynchronous Messaging (Service to Queue to Service)

We have full observability. If a user complains “I didn’t get my order confirmation email,” you can find the trace for their order ID and see exactly where it failed—was the Gateway slow? Did the DB lock up? Did the Worker crash? Or did the Email service return a 500?

It’s all there, in one single view.

In the next part of this series, we’ll look at how to leverage all this data we’re collecting to build metrics and dashboards, turning our individual traces into high-level system insights.

JP Fontenele avatar

Thanks for reading! Feel free to check out my other posts or reach out via GitHub.


OpenTelemetry in Go Series

Comments