Go routines are a way to execute functions in the background while the main program continues running. This allows for the execution of code asynchronously, which is very useful for running tasks in parallel.
The article consists of several sections:
- Go routine.
- Main doesn’t wait.
- Go routines do not support return values.
- Channels.
- Buffered channels.
- Length vs Capacity.
- Closing channels.
- Unidirectional channels.
- Wait groups.
- Worker pools.
- Select.
- Mutex.
If you are new to the world of Go, I recommend the following previous articles:
Go routine:
In the following code, we will call a function in the traditional way and then the same function via a go routine, but we will see how the going
appears after the go routine because the main execution thread continues, it is not blocked while the go routine is running.
package main
import (
"fmt"
"time"
)
func f(from string) {
for i := 0; i < 4; i++ {
fmt.Println(from, ":", i)
}
}
func main() {
// Regular function call:
f("direct")
// GoRoutine function call:
go f("goroutine")
fmt.Println("going")
// Sleep 1s
time.Sleep(time.Second)
fmt.Println("done")
}
If we run the code, we will see the following output:
direct : 0
direct : 1
direct : 2
direct : 3
going
goroutine : 0
goroutine : 1
goroutine : 2
goroutine : 3
done
Main doesn’t wait:
The problem with this approach is that we had to put a time.Sleep(time.Second)
to allow the function f()
to finish before main()
reaches the end. If the function f()
takes a long time, main()
finishes without waiting for the go routines it called. We can see an example in the following code:
package main
import (
"fmt"
"time"
)
func f(from string) {
for i := 0; i < 4; i++ {
fmt.Println(from, ":", i)
time.Sleep(time.Second)
}
}
func main() {
// Regular function call:
f("direct")
// GoRoutine function call:
go f("goroutine")
fmt.Println("going")
// Sleep 1s
time.Sleep(time.Second)
fmt.Println("done")
}
If we run the code, we can see that f()
has been called via the go routine, but this time it only had time to execute until the second iteration of the loop because main()
doesn’t wait for anyone:
direct : 0
direct : 1
direct : 2
direct : 3
going
goroutine : 0
goroutine : 1
done
Go routines do not support return values:
One thing to keep in mind is that you cannot call a function using go routines and store the return value since these routines run on another execution thread, and it’s not known when they will finish. Modifying the previous example as follows, we can see the error it shows when attempting this.
package main
import (
"fmt"
"time"
)
func f(from string) string {
return from
}
func main() {
// Regular function call:
f("direct")
// GoRoutine function call:
answer := go f("goroutine")
fmt.Println("going")
// Sleep 1s
time.Sleep(time.Second)
fmt.Println("done")
}
./goroutine2.go:16:15: syntax error: unexpected go, expected expression
Channels:
The way to synchronize execution threads and obtain data from go routines is through Go channels, through which they can exchange messages to coordinate actions.
We must keep in mind that each channel must be of a type, either primitive or more complex like a structure. Additionally, channels remain blocked until they are read from or written to, although this problem can be partially avoided through buffered channels.
Both receiving and sending on a channel block both sides until something is received on the other side. That is, if I have a reception in main()
, the program will not continue until it receives through the channel. On the other hand, if I have a transmission in main()
, it won’t proceed until someone reads from the channel.
We can see an example where the sendEmail()
function is blocked, waiting for main()
to read the email send confirmations from the channel.
package main
import (
"fmt"
"strconv"
"time"
)
func sendEmail(email string, myChannel chan string) {
for i := 0; i < 3; i++ {
fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
message := "Email: " + strconv.Itoa(i) + " sent successfully"
myChannel <- message
}
}
func main() {
// Create a channel with 3 slots
myChannel := make(chan string)
// GoRoutine function call:
go sendEmail("kr0m@alfaexploit.com", myChannel)
for i := 0; i < 3; i++ {
time.Sleep(time.Second * 3)
fmt.Println(<-myChannel)
}
fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-14 18:37:59.676379163 +0200 CEST m=+0.000062471
Email: 0 sent successfully
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-14 18:38:02.677226485 +0200 CEST m=+3.000910002
Email: 1 sent successfully
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-14 18:38:05.67955186 +0200 CEST m=+6.003235309
Email: 2 sent successfully
done
We can see how all the emails have been sent with a 3s interval between them.
Buffered channels:
With buffered channels, we can avoid the blockages mentioned in the previous section to some extent, as these are channels with a buffer capable of storing X elements before becoming blocked.
The previous example would look like this.
package main
import (
"fmt"
"strconv"
"time"
)
func sendEmail(email string, myChannel chan string) {
for i := 0; i < 3; i++ {
fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
message := "Email: " + strconv.Itoa(i) + " sent successfully"
myChannel <- message
}
}
func main() {
// Create a channel
myChannel := make(chan string, 3)
// GoRoutine function call:
go sendEmail("kr0m@alfaexploit.com", myChannel)
for i := 0; i < 3; i++ {
time.Sleep(time.Second * 3)
fmt.Println(<-myChannel)
}
fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-14 18:38:55.339534415 +0200 CEST m=+0.000037323
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-14 18:38:55.339616778 +0200 CEST m=+0.000119668
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-14 18:38:55.339618977 +0200 CEST m=+0.000121874
Email: 0 sent successfully
Email: 1 sent successfully
Email: 2 sent successfully
done
We can see how all the email sends were performed in the same second. Later, the main()
function reads the results at its own pace, but the email sends did not block main()
and main()
did not block the sends.
Length vs Capacity:
The capacity of a buffered channel is the number of values the channel can store before becoming blocked, while the length is the number of elements the channel is currently storing.
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
capacity is 3
length is 2
read value naveen
new length is 1
Closing channels:
Even with buffered channels, we still have the problem of how long main()
should wait before reaching the end of the program. Fortunately, Go implements a way to control when a channel has closed, allowing us to call a go routine, wait for the data, and when the channel is closed, finish executing main()
.
The way to read both the channel value and the state is as follows:
v, ok := <-myChannel
The previous program would be modified as follows:
package main
import (
"fmt"
"strconv"
"time"
)
func sendEmail(email string, myChannel chan string) {
for i := 0; i < 3; i++ {
fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
message := "Email: " + strconv.Itoa(i) + " sent successfully"
myChannel <- message
}
close(myChannel)
}
func main() {
// Create a channel
myChannel := make(chan string, 3)
// GoRoutine function call:
go sendEmail("kr0m@alfaexploit.com", myChannel)
for {
v, open := <-myChannel
if open == false {
break
}
fmt.Println("Received ", v, open)
}
fmt.Println("done")
}
We can also use Go’s for range
without worrying about whether the channel is closed.
package main
import (
"fmt"
"strconv"
"time"
)
func sendEmail(email string, myChannel chan string) {
for i := 0; i < 3; i++ {
fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
message := "Email: " + strconv.Itoa(i) + " sent successfully"
myChannel <- message
}
close(myChannel)
}
func main() {
// Create a channel
myChannel := make(chan string, 3)
// GoRoutine function call:
go sendEmail("kr0m@alfaexploit.com", myChannel)
for v := range myChannel {
fmt.Println("Received ", v)
}
fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-18 19:09:04.03698939 +0200 CEST m=+0.000032195
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-18 19:09:04.03704383 +0200 CEST m=+0.000086620
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-18 19:09:04.037045945 +0200 CEST m=+0.000088735
Received Email: 0 sent successfully
Received Email: 1 sent successfully
Received Email: 2 sent successfully
done
The idea is to let all the go routines run, continue executing code in main()
, and at the end of main()
, just before exiting, check all the channels that we have left open.
Unidirectional channels:
Channels can be unidirectional, but these are not very useful if they can only be read from or written to. However, bidirectional channels can be made unidirectional if a function requires it, for example, if it only needs to write to the channel and never read, it can be forced so that no mistake is ever made by reading.
A simple example where the channel is bidirectional in the main()
function but unidirectional in sendData()
.
package main
import "fmt"
func sendData(sendch chan<- int) {
sendch <- 10
}
func main() {
chnl := make(chan int)
go sendData(chnl)
fmt.Println(<-chnl)
}
If we try to read from sendData()
, we will get the following error:
./test.go:6:12: invalid operation: cannot receive from send-only channel sendch (variable of type chan<- int)
Wait groups:
A wait group is nothing more than a counter of go routines. Using the Add()
method, we add pending routines, while with Done()
we remove them, incrementing and decrementing the counter respectively.
The Wait()
method will block code execution until the counter equals zero.
wg.Add(N) -> Increment counter.
wg.Done() -> Decrement counter.
wg.Wait() -> Wait until counter == 0.
Let’s see a simple example where 3 go routines are added from the main()
function and it waits for them to be processed before exiting. Meanwhile, in the process()
function, they are removed upon finishing data processing, allowing Wait()
to finally be unlocked.
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
Worker pools:
The concept of a worker pool involves having several processes running in parallel doing the same work. In this case, the job is to process jobs stored in a channel that acts as a queue.
In a very summarized way, the code enqueues jobs in a channel, leaves a go routine reading from the results channel, starts several workers who read from the jobs channel, process the data, and enqueue the results in the results channel.
Each of the functions is responsible for a part of the process:
queueJobs
: Enqueues the jobs in the jobs channel. The channel has a maximum of 10 jobs, becoming blocked until theworker()
function reads the jobs, emptying the queue and making space for new jobs.getResults
: Continues reading from the results channel. When it finishes iterating, it assignstrue
to the done channel.createWorkerPool
: Starts 100worker
processes, but as a maximum, 10 will be running simultaneously as this is the size of the buffered channel they read from. Increases the count of a workgroup for each started process, and waits for the workGroup.worker
: Gets the output of thedigits
function and stores the result in a structure that it puts in the results channel.
All the workers read tasks from the jobs channel. When there are no more jobs to do, all the worker
processes will execute wg.Done()
releasing the block of the createWorkerPool()
function.
When the createWorkerPool()
function is released, it closes the results channel, which in turn is detected by the getResults()
function, ending the loop and executing done <- true
.
In main()
, the <-done
is detected, and the remaining instructions are executed.
Keep in mind that as queueJobs()
enqueues tasks, worker()
dequeues and executes them. The number of workers that do this has been started by createWorkerPool()
, but they are limited by the size of the jobs channel. By varying numberOfWorkers
, we can achieve greater or lesser concurrency since it will vary the size of that channel. Perhaps we should obtain the number of system cores and configure this parameter with N-1.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type job struct {
id int
randomNumber1 int
randomNumber2 int
}
type result struct {
job job
sumOfNumbers int
}
var numberOfWorkers = 10
var noOfJobs = 100
var jobs = make(chan job, numberOfWorkers)
var results = make(chan result, numberOfWorkers)
var done = make(chan bool)
func addNumbers(randomNumber1, randomNumber2 int) int {
sum := randomNumber1 + randomNumber2
time.Sleep(100 * time.Millisecond)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := result{job, addNumbers(job.randomNumber1, job.randomNumber2)}
results <- output
}
wg.Done()
}
func createWorkerPool(numberOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numberOfWorkers; i++
{
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func queueJobs(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomNumber1 := rand.Intn(999)
randomNumber2 := rand.Intn(999)
job := job{i, randomNumber1, randomNumber2}
jobs <- job
}
close(jobs)
}
func getResults(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, %d + %d = %d\n", result.job.id, result.job.randomNumber1, result.job.randomNumber2, result.sumOfNumbers)
}
done <- true
}
func main() {
startTime := time.Now()
go queueJobs(noOfJobs)
go getResults(done)
createWorkerPool(numberOfWorkers)
<-done
endTime := time.Now()
diff1 := endTime.Sub(startTime)
fmt.Println("--------------------------------")
startTime = time.Now()
for i := 0; i < noOfJobs; i++ {
randomNumber1 := rand.Intn(999)
randomNumber2 := rand.Intn(999)
result := addNumbers(randomNumber1, randomNumber2)
fmt.Printf("Job id %d, %d + %d = %d\n", i, randomNumber1, randomNumber2, result)
}
endTime = time.Now()
diff2 := endTime.Sub(startTime)
fmt.Println("--------------------------------")
fmt.Println("Worker Pool execution, total time taken: ", diff1.Seconds(), "seconds")
fmt.Println("Sequential execution, total time taken: ", diff2.Seconds(), "seconds")
}
We can see an abysmal difference between executing the code using go routines or sequentially, approximately 10X faster.
Worker Pool execution, total time taken: 1.071072607 seconds
Sequential execution, total time taken: 10.580277014 seconds
Select:
The select
statement is very useful when we want to leave several go routines running in the background and read the response of the first one to respond. This way of operating is often used in applications where performance is crucial.
For example, we could have a distributed database, replicated in a cluster in various geographical locations, query each of the nodes in a go routine, and use the response from the fastest machine.
package main
import (
"fmt"
"time"
)
func tryServer(serverNumber int, output chan string) {
// Real case could be like this, instead of the switch/case:
// output <- databaseConnect(serverNumber)
switch serverNumber {
case 1:
time.Sleep(6 * time.Second)
output <- "server1"
case 2:
time.Sleep(1 * time.Second)
output <- "server2"
default:
time.Sleep(2 * time.Second)
output <- "server3"
}
}
func main() {
servers := []string{"server1", "server2", "server3"}
output := make(chan string)
for i, _ := range servers {
go tryServer(i, output)
}
select {
case fastServer := <-output:
fmt.Println("fastServer: ", fastServer)
}
}
fastServer: server2
With select
, you can also attempt to read from a channel and if unsuccessful, move to the default
option.
package main
import (
"fmt"
"time"
)
func writeChan(channel chan string) {
time.Sleep(1 * time.Second)
channel <- "AAAA"
}
func main() {
channel := make(chan string)
go writeChan(channel)
select {
case v := <-channel:
fmt.Println("Received value: ", v)
default:
fmt.Println("Default case executed.")
}
}
Default case executed.
Now, using the same example, but making the writeChan()
function write immediately to the channel so that in the main()
function, it can read the data.
package main
import (
"fmt"
"time"
)
func writeChan(channel chan string) {
//time.Sleep(1 * time.Second)
channel <- "AAAA"
}
func main() {
channel := make(chan string)
go writeChan(channel)
time.Sleep(1 * time.Second)
select {
case v := <-channel:
fmt.Println("Received value: ", v)
default:
fmt.Println("Default case executed.")
}
}
Received value: AAAA
When all select
options are available at the same time, one of them will be chosen at random.
package main
import (
"fmt"
"time"
)
func tryServer(serverNumber int, output chan string) {
// Real case could be like this, instead of the switch/case:
// output <- databaseConnect(serverNumber)
switch serverNumber {
case 1:
time.Sleep(1 * time.Second)
output <- "server1"
case 2:
time.Sleep(1 * time.Second)
output <- "server2"
default:
time.Sleep(1 * time.Second)
output <- "server3"
}
}
func main() {
servers := []string{"server1", "server2", "server3"}
output := make(chan string)
for i, _ := range servers {
go tryServer(i, output)
}
select {
case fastServer := <-output:
fmt.Println("fastServer: ", fastServer)
}
}
It could be any of the three servers:
fastServer: server3
fastServer: server1
fastServer: server2
Mutex:
When working with concurrent programs, you may encounter a race condition, meaning parts of the code are simultaneously modifying a variable. Let’s see this example where 100 go routines are executed, calling the increment()
function, the variable x
should increment its value 100 times, but instead, each execution gives a different result.
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup) {
x = x + 1
wg.Done()
}
func main() {
var w sync.WaitGroup
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w)
}
w.Wait()
fmt.Println("Final value of x: ", x)
}
Final value of x: 968
Final value of x: 960
Final value of x: 965
Final value of x: 956
To avoid race conditions, we must use mutex locks, so the variable x
can only be modified by one go routine at a time.
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
x = x + 1
m.Unlock()
wg.Done()
}
func main() {
var w sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, &m)
}
w.Wait()
fmt.Println("Final value of x:", x)
}
Final value of x: 1000