Go Concurrency Patterns

Go's concurrency model empower developers to build responsive, efficient applications that make the most of modern hardware.

19 min read
#Golang#Concurrency

Introduction to Concurrency in Go

Understanding Concurrency and Parallelism

Let's talk about concurrency and parallelism - two terms that get thrown around a lot, sometimes as if they are the same thing. But they are actually quite different concepts in how systems handle multiple tasks.

Think of concurrency as being like a juggler keeping several balls in the air. The juggler isn't actually touching all the balls at the same time, but by quickly switching attention between them, they create the impression of handling multiple things at once. It's all about structure and how we organize our code to handle multiple tasks.

ConcurrencyParallelism\text{Concurrency} \neq \text{Parallelism}

Parallelism, on the other hand, is more like having multiple jugglers each handling their own balls. It's about actually doing multiple things at the exact same time, which usually requires multiple processors or CPU cores. In simple terms: parallelism is when our program is literally doing multiple things simultaneously.

Goroutines and Channels

At the heart of Go's concurrency model are two powerful tools: goroutines and channels. Goroutines are like super lightweight threads that let our functions run side by side without the heavy overhead of traditional OS threads. The Go runtime manages them for us, so we can spin up thousands without breaking a sweat.

But goroutines by themselves aren't that useful without a way to communicate. That's where channels come in - they're the messaging system that lets our goroutines talk to each other safely. Think of them as typed pipes where data flows in one direction.

Data flows through channels using the <- operator, which looks a bit like an arrow (which makes sense since data is flowing in a direction). We can make channels one-way streets if we want, either receive-only (<-chan T) or send-only (chan<- T).

In Go, we'll work with two flavors of channels:

  • Unbuffered channels: These are like a relay race baton handoff - the sender and receiver need to be ready at the same time, or one will wait for the other.
  • Buffered channels (created with a capacity like ch := make(chan int, 10)): These are more like a mailbox with limited space. The sender can drop off messages until the mailbox is full, and the receiver can pick them up whenever they're ready (until the mailbox is empty).

Why Concurrency Matters in Modern Applications

So why should we care about all this concurrency stuff? Because it's the difference between an app that feels snappy and one that leaves users drumming their fingers.

Think about it - modern apps need to juggle multiple things at once: processing user input, making API calls, updating the UI, and more. Without concurrency, each task would need to finish before the next one starts, creating those frustrating moments where everything freezes.

Take a web server handling user requests - with concurrency, each request gets its own goroutine, so a slow database query for one user doesn't mean everyone else has to wait. Or imagine a data processing pipeline that needs to crunch through gigabytes of information - break it into chunks, process them concurrently, and watch our program fly.

Fundamental Go Concurrency Patterns

1. The Generator Pattern

Generator Pattern

The Generator pattern in Go is a concurrency pattern that involves a function returning a channel, which subsequently produces a sequence of values over time. This pattern is particularly valuable for generating a stream of data lazily, meaning the values are computed and emitted only when they are requested by the consumer. This on-demand generation offers significant memory efficiency, especially when dealing with potentially infinite or exceptionally large sequences of data.

func generator() <-chan int {
	ch := make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

Practical use cases:

  • Producing streams of data: When we're dealing with large datasets or continuous streams, generators can produce data on-demand, so we don't have to load everything into memory at once. Think reading lines from a massive file or fetching records from a database in chunks.
  • Implementing iterators: Generators make great custom iterators.
  • Simulating real-time data sources: Need to test with data that looks like it's coming in real-time? Generators are perfect for simulating data feeds.
  • As the first stage in pipeline: Generators often kick off a pipeline of concurrent processing stages, feeding data into the next steps.

2. The Fan-In Pattern

Fan-In Pattern

The fan-in pattern in Go is employed to consolidate multiple independent input channels into a single output channel. This enables a single consumer to process data arriving from various concurrent sources in a unified manner. Essentially, it acts as a multiplexer.

It is important to note that the order in which data from the different input channels appear on the output channel is generally not guaranteed. The fan-in pattern is frequently used in mixed with fan-out fan-out pattern. In such scenarios, work might be initially distributed to multiple worker goroutines (fan-out), and then the fan-in pattern is used to gather the results from these workers into a single channel for subsequent processing or aggregation.

func fanIn(input1, input2 <-chan int, output chan<- int) {
	for {
		select {
		case i1, ok1 := <-input1:
			if ok1 {
				output <- i1
			}
		case i2, ok2 := <-input2:
			if ok2 {
				output <- i2
			}
		}
	}
}

Practical use cases:

  • Aggregating data: Combining data from multiple sources, such as different APIs or databases, into a single stream.
  • Parallel processing: Collecting results from multiple worker goroutines that are processing different parts of a task in parallel.
  • Event handling: Merging event streams from multiple listeners or sources into a single channel for centralized event processing.
  • Load balancing: Collecting responses from multiple worker processes or servers in a load-balancing system.
  • Pipeline stages: Combining outputs from multiple parallel instances of a processing stage in a pipeline.

3. The Fan-Out Pattern

Fan-Out Pattern

The fan-out pattern in Go involves distributing data from a single input source channel to multiple output channels. This is commonly used to distribute work items from a central source to multiple worker goroutines for concurrent processing. Fan-out can be implemented in several ways. One common approach is to have multiple goroutines independently read from the same input channel until that channel is closed. Another method involves a single goroutine reading from the input channel and then selectively sending each data item to one of the many output channels.

func fanOut(input <-chan int) <-chan int {
	output := make(chan int)
	go func() {
		defer close(output)
		for data := range input {
			output <- data
		}
	}()
	return output
}
 
func main() {
	...
	out1 := fanOut(in)
	out2 := fanOut(in)
	...
}

Practical use cases:

  • Parallel task processing: Distributing a set of independent tasks to multiple goroutines to be executed concurrently.
  • Event broadcasting: Sending the same event or message to multiple independent handlers or subscribe.
  • Distributing read operations: Allowing multiple consumers to independently read data from the same source by providing each with its own channel.
  • Generating multiple outputs: Processing a single input stream to produce different types of outputs or logs concurrently.
  • Load balancing: Distributing incoming requests or tasks across multiple identical service instances to improve performance and resilience.

4. The Pipelines Pattern

The pipeline pattern organizes data processing into a sequence of stages, where each stage is typically implemented as a function (often running in its own goroutine) connected to the next stage via channels. The output from one stage serves as the input for the subsequent stages, creating a flow of data through the pipeline.

func main() {
	ch := generate(1, 2, 3, 4, 5)
	evenOut := even(ch)
	sqOut := sq(evenOut)
	...
}

Practical use cases:

  • Data transformation: Applying a sequence of transformation to data, such as filtering, mapping, and reducing.
  • Stream processing: Handling continuous streams of data in a step-by-step manner, where each step performs a specific operation.
  • Image and video processing: Breaking down complex processing tasks into manageable stages like decoding, applying filters, and encoding.
  • Web request handling: Processing incoming requests through stages like authentication, request parsing, and response generation.
  • Complex processing workflows: Dividing complex tasks into a series of simpler, interconnected steps.

5. The Worker Pool Pattern

Worker pool pattern involves a set number of worker goroutines that concurrently process tasks from a shared queue. This pattern is particularly useful for managing and parallelizing the executing of a large number of independent tasks while controlling the number of goroutines running concurrently. In a worker pool, a central task queue, typically implemented as a channel, holds the tasks to be processed. A fixed number of worker goroutines continuously monitor this queue. When a worker becomes available, it picks up a tasks from queue, execute it, and then send the result to another channel ot perform some other action. By limiting the number of worker goroutines, this pattern avoids the overhead of creating a new goroutine for each task, which can be inefficient when dealing with a very high volume o short-lived tasks.

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		results <- j * 2
	}
}
 
func main() {
	const numJobs = 6
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)
 
	for w := range 3 {
		go worker(w, jobs, results)
	}
	for j := range numJobs {
		jobs <- j
	}
	close(jobs)
	for range numJobs {
		<-results
	}
}

Practical use cases:

  • Processing a large number of independent tasks: Such as file processing, handling web requests, or performing batch operations.
  • Limiting the number of concurrent operations: To prevent resource exhaustion and ensure system stability.
  • Improving throughput: By parallelizing the execution of tasks across multiple workers.
  • Managing I/O bound operations: Allowing other tasks to process while waiting for I/O to complete.
  • Implementing background job processing: In web applications or other systems where certain tasks can be performed asynchronously.

6. The Queuing Pattern

The queueing pattern allows for processing of a limited number of items concurrently using a buffered channel. The buffered channel acts as a queue with a fixed capacity, and sending to this channel will block only when the buffer is full. This mechanism effectively limits the number of goroutines or operations that can proceed simultaneously. Once a processing unit finishes its task, it receives from the channel, freeing up a slot in the buffer for the next item to be processed.

func process(w int, queue chan struct{}, wg *sync.WaitGroup) {
	queue <- struct{}{} // Acquire a slot
	go func() {
		defer wg.Done()
		... // Perform tasks
		<-queue // Release the slot
	}()
}
 
func main() {
	limit, work := 2, 8
 
	var wg sync.WaitGroup
	queue := make(chan struct{}, limit) // Buffered channel with a capacity of `limit`
	wg.Add(work)
	for w := range work {
		process(w, queue, &wg)
	}
	wg.Wait()
	close(queue)
}

Practical use cases:

  • Rate limiting: Controlling the number of requests or operations that can be performed within a certain time frame.
  • Task batching: Accumulating a certain number of tasks in the queue before processing then as a batch.
  • Sequential processing with limited parallelism: Ensuring tasks are processed in order while still allowing a degree of concurrency.

Advanced Go Concurrency Patterns

1. The Tee Channel Pattern

The tee channel pattern allows for duplication of values received from a single input channel to two or more distinct output channels. This is similar to a physical tee fitting in plumbing, which splits a single pipe into two, allowing water to flow through both.

A typical implementation of the tee channel pattern involves a function that takes a read-only input channel and returns two write-only output channels. When working with unbuffered channels, careful consideration must be given to the order of sends and the readiness of receivers to avoid potential blocking and deadlocks.

func tee(in <-chan int) (chan int, chan int) {
	out1, out2 := make(chan int), make(chan int)
	go func() {
		defer close(out1)
		defer close(out2)
		for val := range in {
			out1 <- val
			out2 <- val
		}
	}()
 
	return out1, out2
}

Practical use cases:

  • Mirroring data streams: Sending a stream of data to a primary processing pipeline while simultaneously sending a copy to a logging and monitoring service.
  • Dual processing: Performing two different operations on the same input data concurrently.
  • Conditional branching: Directing the same data to different processing paths based on certain conditions evaluated after the tee function.
  • Debugging and diagnostics: Observing the flow of data through a particular channel without interrupting the primary processing pipeline.

2. The Bridge Channel Pattern

The bridge channel pattern serves to flatten a channel of channels (<-chan <-chan T) into a single channel (<-chan T) that emits all the values from the inner channels sequentially. This pattern is particularly useful when a function or process returns a channel that, in turn, produces other channels, and we need to consume all the values from these inner channel in a unified, linear manner.

A bride function typically launches a goroutine that continuously reads from the outer channel (that channel of channels). For each inner channel received, it then reads all the values from the inner channel and sends them onto the output channel of the bridge function. This effectively created a "bridge" that simplifies the consumption of data from a potentially complex, nested channel structure.

func bridge(inputs ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
 
	for _, ch := range inputs {
		wg.Add(1)
		go func(c <-chan int) {
			defer wg.Done()
			for v := range c {
				out <- v
			}
		}(ch)
	}
 
	go func() { wg.Wait(); close(out) }()
	return out
}

Practical use cases:

  • Handling dynamic data sources: When the number or source of data channels is not known beforehand or changes over time.
  • Simplifying complex workflows: Where different parts of a system produce results on separate channels.

3. The Ring Buffer Channel Pattern

The ring buffer channel pattern utilizes a channel with a fixed capacity that, when full, overrides the oldest item with the newest one. This behaviour mirrors a circular buffer data structure. This pattern is particularly advantageous when maintaining a fixed-size history of the most recent data points is desired, and discarding older data is acceptable when the buffer reaches its capacity.

This pattern can be beneficial for throttling data streams from fast producers to slower consumers, preventing the producer from being blocked indefinitely while ensuring that the consumer always has access to the latest data. However, the trade-off is potential data loss.

func ringBuffer(in <-chan int, size int) <-chan int {
	out := make(chan int)
 
	go func() {
		defer close(out)
		buffer := make([]int, 0, size)
 
		for v := range in {
			if len(buffer) == size {
				buffer = buffer[1:]
			}
 
			buffer = append(buffer, v)
 
			for _, item := range buffer {
				out <- item
			}
		}
	}()
 
	return out
}

Practical use cases:

  • Maintaining a history of recent events: Such as the last N log messages or sensor readings.
  • Throttling data streams: Preventing fast producers from overwhelming slow consumers while ensuring the consumer receives the most recent data.
  • Implementing bounded queues: Where a fixed-size buffer is needed and older data can be discarded.
  • Real-time data processing: Where only the most up-to-date info is relevant.

4. The Bounded Parallelism Pattern

Bounded parallelism is a concurrency pattern that limits the number of goroutines executing a specific task concurrently. This control over the level of concurrency is essential for managing resource utilization, preventing system overload, and maintaining stability, especially when dealing with a large number of potential tasks.

This pattern is commonly implemented using a fixed number of worker goroutines that process tasks from a shared queue. Another approach involves using a semaphore, often used with a buffered channel, to restrict the number of goroutines that can access a particular resource or execute a specific section of code simultaneously.

func boundedParallel(works []int, parallelism int, fn func(int)) {
	var wg sync.WaitGroup
	jobs := make(chan int, len(works))
 
	// Start workers
	wg.Add(parallelism)
	for range parallelism {
		go func() {
			defer wg.Done()
			for job := range jobs {
				fn(job)
			}
		}()
	}
 
	// Send work to workers
	for _, work := range works {
		jobs <- work
	}
	close(jobs)
 
	wg.Wait()
}

Practical use cases:

  • Preventing resource exhaustion: By limiting the number of concurrent operations, applications can avoid overwhelming CPU, memory, or network resources.
  • Controlling access to external services: When interacting with APIs that have rate limits, bounded parallelism can help ensure that the application does not exceed these limits.

Essential Additional Go Concurrency Patterns

1. The Context Pattern

The context pattern in Go provides as standardized way to manage lifecycle of operations, especially in concurrent and distribution systems. A context.Context carries deadlines, cancellation signals, and other request-scoped values across API boundaries and between processes. It forms a tree structure, where context derived from a parent context inherit its properties, and cancellation signals propagate down the tree, allowing for graceful shutdown of related goroutines.

The context package also offers functions for creating derived contexts with specific behaviours:

  • WithCancel(): returns a context that can be manually cancelled.
  • WithDeadline() and WithTimeout(): return contexts that will be cancelled after a specified time or duration.
  • WithValue(): creates a new new context carrying a specific key-value pair.
func worker(ctx context.Context, id int) {
	for {
		select {
		case <-ctx.Done():
			fmt.Printf("Worker %d: shutting down\n", id)
			return
		default:
			fmt.Printf("Worker %d: working...\n", id)
			time.Sleep(500 * time.Millisecond)
		}
	}
}
 
func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
 
	for i := range 3 {
		go worker(ctx, i)
	}
 
	time.Sleep(3 * time.Second)
	fmt.Println("Main: exiting")
}

Practical use cases:

  • Managing request lifecycle: In server applications, to control the execution of goroutines spawned to handle incoming requests.
  • Passing request-scoped data: Such as authentication tokens, tracing information, or user IDs, down the call chain.
  • Ensuring resource cleanup: By using defer cancel(), resources associated with a context are released when the context's scope ends.

2. The Errgroup Pattern

The errgroup pattern, provided by golang.org/x/sync/errgroup package, offers robust synchronization and error handling for collections of goroutine workers on a common task. It build upon the functionality of sync.WaitGroup by adding the capability to propagate errors and manage the lifecycle of goroutines through context cancellation.

Using errgroup, we can launch multiple goroutines to perform subtasks and then wait for all of them to complete. If any of the goroutines return a non-nil error, the errgroup captures this error and, if a context is associated with the group (using errgroup.WithContext), it cancels that context.

The errgroup package also allows for limiting the number of goroutines that can be active concurrently within the group using the SetLimit() method.

func main() {
	var g errgroup.Group
	urls := []string{
		"https://google.com",
		"https://tasnimzotder.com",
		"https://invalid-url",
	}
 
	for _, url := range urls {
		url := url
		g.Go(func() error {
			return fetchData(url)
		})
	}
 
	if err := g.Wait(); err != nil {
		fmt.Println("Error fetching URLs:", err)
	} else {
		fmt.Println("Successfully fetched all URLs")
	}
}

Practical use cases:

  • Making concurrent API calls: Where the result depends on the success of all calls, and an error in one potentially stop others.
  • Implementing parallel data processing: Where multiple goroutines process different parts of a dataset, and errors need to be aggregated and handled.

3. Rate Limiting Pattern

Rate limiting is a concurrency pattern used to control the number of requests or operations that can be performed within a specific time period. This is a critical mechanism for protecting systems from abuse, ensuring fair usage among users, and preventing overloads that can lead to performance degradation or service outage.

Several algorithms exist for Implementing rate limiting, including the Token Bucket, Leaky Bucket, Fixed Window Counter, and Sliding Window. These algorithms vary in how they track and enforce limits on the number of allowed requests over time.

Go provides tools in its standard library, such as the time package for basic rate limiting using tickers, and the golang.org/x/time/rate package for more sophisticated implementations like the Token Bucket.

func main() {
	rateLimit := time.Second / 10 // 10 calls per second
 
	requests := make(chan int, 5)
	for i := range 5 {
		requests <- i
	}
	close(requests)
 
	throttle := time.Tick(rateLimit)
	for req := range requests {
		<-throttle
		fmt.Println("request", req, time.Now())
	}
}

Another example:

func main() {
	// rate limiter: 5 req per second with burst of 10
	limiter := rate.NewLimiter(5, 10)
 
	var wg sync.WaitGroup
	for i := range 20 {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			if err := limiter.Wait(context.Background()); err != nil {
				fmt.Printf("Rate limiter error: %v\n", err)
				return
			}
 
			fmt.Printf("Request %d processed at %s\n", id, time.Now())
		}(i)
	}
 
	wg.Wait()
}

Practical use cases:

  • Protecting APIs: From malicious attacks like denial-of-service (DoS) and from abuse by limiting the number of requests a user or client can make.
  • Ensuring fair usage: In multi-tenant systems or shared resources, to prevent one user from consuming all available capacity.
  • Complying with external service limits: When interacting with third-party APIs that have their own rate limiting policies.

Conclusion

Go's concurrency model is one of its standout features that makes the language so powerful for modern applications. We've explored a range of patterns from simple generators to more complex approaches like contexts and error groups.

What makes these patterns special isn't just their technical design, but how practical they are for real-world problems. Each pattern we've covered solves common challenges we'll face when building responsive, efficient applications.

References