Contents

Go: concurrency patterns

This article is the result of me taking notes and trying out basic concurrency concepts presented by Rob Pike in his talk available here: YT video by Rob Pike.

Spawning a goroutine

When you just want to run a goroutine, it does not make the caller wait. The program behaves as if you just spawn a shell command with & at the end.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14

package main

import (
	"fmt"
	"math/rand"
)

func idea(text string) {
	fmt.Printf("Idea #%d: %s", rand.Int31n(1000), text)
}
func main() {
	go idea("Where is the spring?")
}

We could wait some time for the goroutine to finish:

1
2
3
4
func main() {
	go idea("Where is the spring?")
	time.Sleep(2 * time.Second)
}

…but there is a better way.

Channels

Here I will create a signalling channel which will notify the main program when the goroutine completes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func idea(text string, end chan bool) {
	fmt.Printf("Idea #%d: %s\n", rand.Int31n(1000), text)
	end <- true
}

func main() {
	sig := make(chan bool)
	go idea("Where is the spring?", sig)
	<-sig
}

Generator pattern

There is a pattern - called generator pattern - where you return a channel which can be used to communicate with a spawned goroutine:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func idea(text string) chan bool {
	sig := make(chan bool)
	go func() {
		fmt.Printf("Idea #%d: %s\n", rand.Int31n(1000), text)
		sig <- true
	}()
	return sig
}

func main() {
	sig := idea("Where is the spring?")
	<-sig
}
Receive-only channel

Here the function idea returns a channel on which a caller main can receive data:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func idea(topic string) <-chan string {
	c := make(chan string)
	// this routine forever generates ideas and sends them to c
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("Idea #%d: %s\n", i, topic)
			time.Sleep(time.Duration(200))
		}
	}()
	return c
}

func main() {
	spring := idea("Spring activity")

	// we need just 5
	for range 5 {
		fmt.Printf("> %s", <-spring)
	}
}

Fan-in

This pattern allows to gather several channels into a single channel by

  • creating a new target channel
  • for each channel, spawning a routine that inifinitely copies values to a target channel
  • reading values from target

Values read from target channel are not coupled to any of the source channels in a way that they don’t wait for each other: if one is faster, it will generate more messages to the target channel in the unit of time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

func fanIn(inp1, inp2 <-chan string) <-chan string {
	c := make(chan string)
	go func() {
		for {
			c <- <-inp1
		}
	}()
	go func() {
		for {
			c <- <-inp2
		}
	}()
	return c
}

func main() {
	springChan := idea("Spring activity")
	winterChan := idea("Winter play")

	joined := fanIn(springChan, winterChan)
	// we need just 5
	for range 100 {
		fmt.Printf("> %s", <-joined)
	}
}

Each source waits for go ahead

Let’s say we want to keep the sequencing, i.e. need to make routines wait for each other. We need to send a special wait channel which will be part of a message. The message will not be a string anymore, but a struct with both string value and a wait channel.

Interestingly, all messages will share the same wait chan.

1
2
3
4
type Message struct {
	value string
	wait  chan bool
}

Now, we let only single message be processed from each of faned-out source channels until its wait chan is notified we can process more.

  • when a message from a source channel is sent to a fan-out, it immediatelly reads wait chan
  • messages can be created and added to other source channels (possibly with this blocked wait chan)
  • wait chan will be unblocked when messages from all other sources are processed
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func idea(topic string, wait int) <-chan Message {
	c := make(chan Message)
	waitchan := make(chan bool)
	// this routine forever generates ideas and sends them to c
	go func() {
		for i := 0; ; i++ {
			c <- Message{fmt.Sprintf("%s - %d\n", topic, i), waitchan}
			time.Sleep(time.Duration(wait))
			<-waitchan
		}
	}()
	return c
}

func withFanIn(cnt int, springDelay int, winterDelay int) {
	{
		spring := idea("Spring activity", springDelay)
		winter := idea("Winter activity", winterDelay)

		fan := fanIn(spring, winter)
		for range cnt {
			m1 := <-fan
			fmt.Printf("> %s", m1.value)
			m2 := <-fan
			fmt.Printf("> %s", m2.value)
			m1.wait <- true
			m2.wait <- true
		}
	}
}

func main() {
	withFanIn(10, 200, 1000)
}

The above solution is a bit tricky: it requires putting a channel into each message as a means to synchronize reading and writing of messages to preserve ordering (or better: sequencing) of messages.

Select

Rules:

  • all channels are evaluated
  • selection blicks until one communication can proceed
  • it proceeds; if many can, the one to proceed is selected randomly
  • if default clause exists, it executes immediately if no channels are ready
  • ** cases are comminication** - either a send or a receive

fan-in pattern using select

With select, only one goroutine is needed:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func fanIn[T comparable](inp1, inp2 <-chan T) <-chan T {
	c := make(chan T)
	go func() {
		for {
			select {
			case s := <-inp1:
				c <- s
			case s := <-inp2:
				c <- s
			}
		}
	}()

	return c
}

Timeout using select

time.After function returns a channel that blocks for a specified duration. After this interval, the channel delivers current time, once.

Below, we create spring messages after a random duration less than 5s, however, we don’t wait for any of the messages for longer than 2s:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type Message struct {
	value string
}

func idea(topic string, randMs int) <-chan Message {
	c := make(chan Message)
	// this routine forever generates ideas and sends them to c
	go func() {
		for i := 0; ; i++ {
			sleeps := time.Duration(rand.Intn(randMs)) * time.Millisecond
			fmt.Printf("Will think for %v [ms]...", sleeps)
			time.Sleep(sleeps)
			c <- Message{fmt.Sprintf("%s - %d [ms %s]\n", topic, i, sleeps)}
		}
	}()
	return c
}

func waitWithTimeout() {
	// we generate each spring idea within random time of less than 5 s ...
	spring := idea("Spring activity", 5000)

	for {
		select {
		case s := <-spring:
			fmt.Printf("> %s", s.value)
		case <-time.After(2000 * time.Millisecond):
			// ... and if the thinking takes longer than 2 s, we quit
			fmt.Println("too slow!")
			return
		}

	}
}

func main() {
	waitWithTimeout()
}

We can also contrain whole idea generation process to take no longer than 5s:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func waitWithTimeout() {
	// we generate each spring idea within random time of less than 5 s ...
	spring := idea("Spring activity", 5000)

	// ... and we don't let the whole generation take more than 5s
	timeout := time.After(5000 * time.Millisecond)

	for {
		select {
		case <-time.Tick(1 * time.Second):
			fmt.Println("[Second passed]")
		case s := <-spring:
			fmt.Printf("> %s", s.value)
		case <-timeout:
			fmt.Println("Timeout!")
			return
		}

	}
}

Resources