The Go programming language has become the most popular language in the modern cloud infrastructure stack. For Java developers, it’s easy to get started with Go because both languages share a lot of similarities. However, sometimes it’s challenging to find a good way to express something in Go.

The best way to learn a programming language is to read code. I recently came across golang.org/x/exp/winfsnotify and found an interesting piece of code that I would like to share in this blog post.

Exploring the producer-consumer pattern in Go

The producer-consumer pattern—with a single producer and a single consumer—is one of the simplest patterns in parallel computing. In Go, you can implement it like this:

package main

import (
    "fmt"
    "time"
)

func calculateNextInt(prev int) int {
    time.Sleep(1 * time.Second) // pretend this is an expensive operation
    return prev + 1
}

func main() {
    data := make(chan int)

    // producer
    go func() {
        var i = 0
        for {
            i = calculateNextInt(i)
            data <- i
        }
    }()

    // consumer
    for i := range data {
        fmt.Printf("i=%v\n", i)
    }
}

The Go channels and goroutines allow for a simple and straightforward implementation. In real-world applications, you might make an additional error channel to allow the producer to send errors to the consumer, but we leave this method aside for now.

What if we want to stop the producer loop?

The golden rule of Go channels is that channels should be closed by the goroutine writing into the channel—not by the goroutine reading from the channel. Go enforces this rule by making a program panic if a goroutine tries to write into a closed channel, and gracefully returning nil when a goroutine reads from a closed channel.

What we need is a way to signal to the producer loop that it should terminate and close the channel. A common way to do it is to create an additional channel for that signal. We call that channel quit. The modified main() function looks like this:

func main() {
    data := make(chan int)
    quit := make(chan interface{})

    // producer
    go func() {
        var i = 0
        for {
            i = calculateNextInt(i)
            select {
            case data <- i:
            case <-quit:
                close(data)
                return
            }
        }
    }()

    // consumer
    for i := range data {
        fmt.Printf("i=%v\n", i)
        if i >= 5 {
            close(quit)
        }
    }
}

After the consumer closed the quit channel, the producer will read nil from quit, close the data channel and terminate.

While this method is a good solution for most scenarios, it has one drawback. Closing the producer is an asynchronous fire-and-forget operation. After the consumer closes the quit channel, there’s no way to know when the producer is actually stopped. This scenario is a problem if the producer holds system resources and the consumer needs to wait until these resources are free.

Implementing a synchronous Close() function

Our goal is to implement a Close() function for the producer as follows:

  • Synchronous operation: When Close() returns, the producer is actually terminated.
  • Error handling: When the producer fails to shut down cleanly, Close() returns an error.

The solution I came across and that I want to share in this blog post is to create a channel of channels:

type producer struct {
    data chan int
    quit chan chan error
}
 
func (p *producer) Close() error {
    ch := make(chan error)
    p.quit <- ch
    return <-ch
}
 
func main() {
    prod := &producer{
        data: make(chan int),
        quit: make(chan chan error),
    }
 
    // producer
    go func() {
        var i = 0
        for {
            i = calculateNextInt(i)
            select {
            case prod.data <- i:
            case ch := <-prod.quit:
                close(prod.data)
                // If the producer had an error while shutting down,
                // we could write the error to the ch channel here.
                close(ch)
                return
            }
        }
    }()
 
    // consumer
    for i := range prod.data {
        fmt.Printf("i=%v\n", i)
        if i >= 5 {
            err := prod.Close()
            if err != nil {
                // cannot happen in this example
                fmt.Printf("unexpected error: %v\n", err)
            }
        }
    }
}

The Close() function creates a temporary channel (ch) that’s used by the producer to signal when shutdown is complete and if there was an error during shutdown.

Where to go from here

In this blog post, we showed you how to implement a synchronous shutdown of a producer goroutine in Go. One thing we left out is how to interrupt the actual work of the producer—in our case, simulated by the calculateNextInt() function.

This method is highly application-specific. Some operations can be interrupted by closing a file handle, some by sending a signal. You need to know what your producer is doing to come up with a way to interrupt that operation.

Why IBM Instana observability is crucial in developing concurrent systems in Go

IBM Instana™ observability features help monitor the health and performance of the producer-consumer system in real-time. Instana provides detailed metrics, logs and traces that enable developers to analyze the system’s behavior and identify any performance issues or bottlenecks. With the IBM Instana platform, developers can set up alerts based on predefined thresholds or anomalies, helping to ensure timely responses to critical issues.

In summary, combining the power of Go concurrency primitives with IBM Instana observability capabilities enhance the development and monitoring of producer-consumer systems. It allows developers to efficiently manage the flow of data and tasks while also gaining deep insights into the system’s performance and health, leading to optimized and reliable applications.

Get started and sign up for a free trial today

Categories

More from IBM Instana

Observing Camunda environments with IBM Instana Business Monitoring

3 min read - Organizations today struggle to detect, identify and act on business operations incidents. The gap between business and IT continues to grow, leaving orgs unable to link IT outages to business impact.  Site reliability engineers (SREs) want to understand business impact to better prioritize their work but don’t have a way of monitoring business KPIs. They struggle to link IT outages to business impacts because data is often siloed and knowledge is tribal. It forces teams into a highly reactive mode…

Buying APM was a good decision (so is getting rid of it)

4 min read - For a long time, there wasn’t a good standard definition of observability that encompassed organizational needs while keeping the spirit of IT monitoring intact. Eventually, the concept of “Observability = Metrics + Traces + Logs” became the de facto definition. That’s nice, but to understand what observability should be, you must consider the characteristics of modern applications: Changes in how they’re developed, deployed and operated The blurring of lines between application code and infrastructure New architectures and technologies like Docker,…

Debunking observability myths – Part 5: You can create an observable system without observability-driven automation

3 min read - In our blog series, we’ve debunked the following observability myths so far: Part 1: You can skip monitoring and rely solely on logs Part 2: Observability is built exclusively for SREs Part 3: Observability is only relevant and beneficial for large-scale systems or complex architectures Part 4: Observability is always expensive In this post, we'll tackle another fallacy that limits the potential of observability—that you can create an observable system without observability driven by automation. Why is this a myth? The notion that…

Top 8 APM metrics that IT teams use to monitor their apps

5 min read - A superior customer experience (CX) is built on accurate and timely application performance monitoring (APM) metrics. You can’t fine-tune your apps or system to improve CX until you know what the problem is or where the opportunities are. APM solutions typically provide a centralized dashboard to aggregate real-time performance metrics and insights to be analyzed and compared. They also establish baselines to alert system administrators to deviations that indicate actual or potential performance issues. IT teams, DevOps and site reliability…