Go routines es una manera de ejecutar funciones en background mientras el programa principal sigue su curso. Esto hace posible la ejecución de código de forma asÃncrona, resultando muy útil para la ejecución de tareas en paralelo.
El artÃculo se compone de varias secciones:
- Go routine.
- Main no espera.
- Las go routines no soportan valor de retorno.
- Channels.
- Buffered channels.
- Length vs Capacity.
- Closing channels.
- Unidirectional channels.
- Wait groups.
- Worker pools.
- Select.
- Mutex.
Si eres nuevo en el mundo de Go te recomiendo los siguientes artÃculos anteriores:
Go routine:
En el siguiente código vamos a llamar a una función del modo tradicional y acto seguido a la misma mediante goroutine, pero veremos como aparece el going
posterior a la go routine ya que el hilo principal de ejecución sigue adelante, no queda bloqueado mientras se ejecuta la go routine.
package main
import (
"fmt"
"time"
)
func f(from string) {
for i := 0; i < 4; i++ {
fmt.Println(from, ":", i)
}
}
func main() {
// Regular function call:
f("direct")
// GoRoutine function call:
go f("goroutine")
fmt.Println("going")
// Sleep 1s
time.Sleep(time.Second)
fmt.Println("done")
}
Si ejecutamos el código veremos la siguiente salida:
direct : 0
direct : 1
direct : 2
direct : 3
going
goroutine : 0
goroutine : 1
goroutine : 2
goroutine : 3
done
Main no espera:
El problema de este enfoque es que hemos tenido que poner un time.Sleep(time.Second) para permitir que la función f() terminase antes de que main() llegase al final. Si la función f() requiere de mucho tiempo, main() termina sin esperar a las go routines que haya llamado, podemos ver un ejemplo en el código siguiente:
package main
import (
"fmt"
"time"
)
func f(from string) {
for i := 0; i < 4; i++ {
fmt.Println(from, ":", i)
time.Sleep(time.Second)
}
}
func main() {
// Regular function call:
f("direct")
// GoRoutine function call:
go f("goroutine")
fmt.Println("going")
// Sleep 1s
time.Sleep(time.Second)
fmt.Println("done")
}
Si ejecutamos el código podemos ver que se ha llamado a f() mediante la go routine, pero esta vez solo le ha dado tiempo a ejecutar hasta la segunda iteración del bucle, ya que main() no espera a nadie:
direct : 0
direct : 1
direct : 2
direct : 3
going
goroutine : 0
goroutine : 1
done
Las go routines no soportan valor de retorno:
Un aspecto a tener en cuenta es que no se puede llamar a una función mediante go routines y almacenar el valor de retorno ya que estas rutinas se ejecutan en otro hilo de ejecución y no se sabe cuando terminarán. Modificando el ejemplo anterior de la siguiente manera podemos ver el error que nos muestra al intentarlo.
package main
import (
"fmt"
"time"
)
func f(from string) string{
return from
}
func main() {
// Regular function call:
f("direct")
// GoRoutine function call:
answer := go f("goroutine")
fmt.Println("going")
// Sleep 1s
time.Sleep(time.Second)
fmt.Println("done")
}
./goroutine2.go:16:15: syntax error: unexpected go, expected expression
Channels:
La manera de sincronizar hilos de ejecución y obtener datos desde las go routines es mediante los channels de Go, mediante estos podrán intercambiar mensajes a fin de coordinar acciones.
Debemos tener en cuenta que cada canal debe ser de un tipo ya sea primitivo o mas complejo como una estructura, además los canales quedan bloqueados hasta que se lea/escriba de ellos, aunque se puede evitar parcialmente el problema mediante los buffered channels.
Tanto la recepción como la transmisión en un canal, bloquea ambas parte hasta que se reciba en el otro lado, es decir que si en main() tengo una recepción, el programa no seguirá adelante hasta recibir por el canal. En cambio si en main() tengo una emisión, no avanzará hasta que alguien lea del canal.
Podemos ver un ejemplo en el que la función sendEmail() queda bloqueada esperando a que main() lea las confirmaciones de envÃo de emails del canal.
package main
import (
"fmt"
"strconv"
"time"
)
func sendEmail(email string, myChannel chan string) {
for i := 0; i < 3; i++ {
fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
message := "Email: " + strconv.Itoa(i) + " sent successfully"
myChannel <- message
}
}
func main() {
// Create a channel with 3 slots
myChannel := make(chan string)
// GoRoutine function call:
go sendEmail("kr0m@alfaexploit.com", myChannel)
for i := 0; i < 3; i++ {
time.Sleep(time.Second * 3)
fmt.Println(<-myChannel)
}
fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-14 18:37:59.676379163 +0200 CEST m=+0.000062471
Email: 0 sent successfully
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-14 18:38:02.677226485 +0200 CEST m=+3.000910002
Email: 1 sent successfully
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-14 18:38:05.67955186 +0200 CEST m=+6.003235309
Email: 2 sent successfully
done
Podemos ver como todos los emails han sido enviados con una separación entre ellos de 3s.
Buffered channels:
Mediante los buffered channels evitaremos hasta cierto punto los bloqueos mencionados en el apartado anterior, ya que se trata de canales con buffer los cuales son capaces de almacenar X elementos antes de quedar bloqueados.
El ejemplo anterior quedarÃa del siguiente modo.
package main
import (
"fmt"
"strconv"
"time"
)
func sendEmail(email string, myChannel chan string) {
for i := 0; i < 3; i++ {
fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
message := "Email: " + strconv.Itoa(i) + " sent successfully"
myChannel <- message
}
}
func main() {
// Create a channel
myChannel := make(chan string, 3)
// GoRoutine function call:
go sendEmail("kr0m@alfaexploit.com", myChannel)
for i := 0; i < 3; i++ {
time.Sleep(time.Second * 3)
fmt.Println(<-myChannel)
}
fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-14 18:38:55.339534415 +0200 CEST m=+0.000037323
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-14 18:38:55.339616778 +0200 CEST m=+0.000119668
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-14 18:38:55.339618977 +0200 CEST m=+0.000121874
Email: 0 sent successfully
Email: 1 sent successfully
Email: 2 sent successfully
done
Podemos ver como todos los envÃos de email han sido realizados en el mismo segundo, posteriormente la función main() va leyendo los resultados a su ritmo, pero el envÃo de los emails no ha bloqueado a main() ni main() ha bloqueado el envÃo.
Length vs Capacity:
La capacidad de un buffered channel es el número de valores que el canal puede llegar a almacenar antes de quedar bloqueado, mientras que la longitud es el número de elementos que el canal está almacenando actualmente.
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
capacity is 3
length is 2
read value naveen
new length is 1
Closing channels:
Incluso con los buffered channels seguimos teniendo el problema de cuanto tiempo debe esperar la función main() antes de llegar al final del programa. Afortunadamente Go implementa una forma de controlar cuando un canal se ha cerrado, de este modo podemos llamar a una go routine, esperar los datos y cuando haya cerrado el canal, finalizar la ejecución de main().
La forma de leer tanto el valor del canal como el estado es la siguiente.
v, ok := <-myChannel
El programa anterior quedarÃa de la siguiente manera.
package main
import (
"fmt"
"strconv"
"time"
)
func sendEmail(email string, myChannel chan string) {
for i := 0; i < 3; i++ {
fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
message := "Email: " + strconv.Itoa(i) + " sent successfully"
myChannel <- message
}
close(myChannel)
}
func main() {
// Create a channel
myChannel := make(chan string, 3)
// GoRoutine function call:
go sendEmail("kr0m@alfaexploit.com", myChannel)
for {
v, open := <-myChannel
if open == false {
break
}
fmt.Println("Received ", v, open)
}
fmt.Println("done")
}
También podemos utilizar el for range
de Go sin tener que preocuparnos de si el canal está cerrado.
package main
import (
"fmt"
"strconv"
"time"
)
func sendEmail(email string, myChannel chan string) {
for i := 0; i < 3; i++ {
fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
message := "Email: " + strconv.Itoa(i) + " sent successfully"
myChannel <- message
}
close(myChannel)
}
func main() {
// Create a channel
myChannel := make(chan string, 3)
// GoRoutine function call:
go sendEmail("kr0m@alfaexploit.com", myChannel)
for v := range myChannel {
fmt.Println("Received ", v)
}
fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-18 19:09:04.03698939 +0200 CEST m=+0.000032195
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-18 19:09:04.03704383 +0200 CEST m=+0.000086620
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-18 19:09:04.037045945 +0200 CEST m=+0.000088735
Received Email: 0 sent successfully
Received Email: 1 sent successfully
Received Email: 2 sent successfully
done
La idea es dejar todas las go routines funcionando, seguir ejecutando código en main() y al final de main() justo antes de salir comprobar todos los canales que hemos dejado abiertos.
Unidirectional channels:
Los canales pueden ser unidireccionales, pero estos no resultan de mucha utilidad si solo puede leerse o escribirse de ellos. No obstante los canales bidireccionales pueden pasar a unidireccionales si una función lo requiere, por ejemplo si solo necesita escribir en el canal y nunca leer, se puede forzar para que nunca se cometa el error de leer.
Un sencillo ejemplo donde el canal es bidireccional en la función main() pero unidireccional en sendData().
package main
import "fmt"
func sendData(sendch chan<- int) {
sendch <- 10
}
func main() {
chnl := make(chan int)
go sendData(chnl)
fmt.Println(<-chnl)
}
Si intentamos leer desde sendData() obtendremos el siguiente error:
./test.go:6:12: invalid operation: cannot receive from send-only channel sendch (variable of type chan<- int)
Wait groups:
Un wait group no es mas que un contador de go routines, mediante el método Add() añadiremos rutinas pendientes mientras que con Done() las eliminaremos, incrementando y decrementando el contador respectivamente.
El método Wait() bloqueará la ejecución de código hasta que el contador sea igual a cero.
wg.Add(N) -> Increment counter.
wg.Done() -> Decrement counter.
wg.Wait() -> Wait until counter == 0.
Veamos un sencillo ejemplo en el que se añaden 3 go routines desde la función main() y se espera a que se procesen antes de salir. Mientrastanto en la función process() se eliminan al terminar el procesado de datos, permitiendo que Wait() sea finalmente desbloqueado.
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
Worker pools:
EL concepto de worker pool consiste en tener varios procesos en paralelo haciendo el mismo trabajo, en este caso se trata de ir procesando jobs almacenados en una canal que actúa a modo de cola.
De forma muy resumida el código encola jobs en un canal, deja una go routine leyendo del canal de resultados, arranca varios workers que leen del canal de jobs, procesan los datos y encolan los resultados en el canal de resultados.
Cada una de las funciones se encarga de una parte del proceso:
- queueJobs: Encola los jobs en el canal jobs, el canal tiene un máximo de 10 jobs, quedando bloqueado hasta que la función worker() vaya leyendo los jobs, vaciando la cola y dejando hueco para nuevos jobs.
- getResults: Se queda leyendo del canal results, cuando haya terminado de iterar, asigna true al canal done.
- createWorkerPool: Arranca 100 procesos
worker
qunque como máximo habrán 10 funcionando simultáneamente ya que es el tamaño del buffered channel del que leen. Incrementa el contador de un workgroup por cada proceso arrancado, se queda a la espera del workGroup. - worker: Obtiene la salida de la función digits y guarda el resultado en una estructura que mete en el canal results.
Todos los workers leen tareas del canal jobs, cuando no queden jobs por hacer, todos los procesosworker
ejecutarán el wg.Done() liberando el bloqueo de la función createWorkerPool().
Al ser liberada la función createWorkerPool(), esta cierra el canal results que a su vez es detectado por la función getResults(), terminando el bucle y ejecutando done <- true.
En main() se detecta el <-done ejecutando el resto de instrucciones.
Hay que tener en cuenta que conforme queueJobs() va encolando trabajos, worker() los va desencolando y ejecutando. El número de workers que hacen esto los ha arrancado createWorkerPool() pero se ven limitados por el tamaño del canal jobs, variando numberOfWorkers podemos obtener una concurrencia mayor o menor ya que variará el tamaño de dicho canal, quizás deberÃamos de obtener el número de cores del sistema y configurar dicho parámetro con N-1.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type job struct {
id int
randomNumber1 int
randomNumber2 int
}
type result struct {
job job
sumOfNumbers int
}
var numberOfWorkers = 10
var noOfJobs = 100
var jobs = make(chan job, numberOfWorkers)
var results = make(chan result, numberOfWorkers)
var done = make(chan bool)
func addNumbers(randomNumber1, randomNumber2 int) int {
sum := randomNumber1 + randomNumber2
time.Sleep(100 * time.Millisecond)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := result{job, addNumbers(job.randomNumber1, job.randomNumber2)}
results <- output
}
wg.Done()
}
func createWorkerPool(numberOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numberOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func queueJobs(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomNumber1 := rand.Intn(999)
randomNumber2 := rand.Intn(999)
job := job{i, randomNumber1, randomNumber2}
jobs <- job
}
close(jobs)
}
func getResults(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, %d + %d = %d\n", result.job.id, result.job.randomNumber1, result.job.randomNumber2, result.sumOfNumbers)
}
done <- true
}
func main() {
startTime := time.Now()
go queueJobs(noOfJobs)
go getResults(done)
createWorkerPool(numberOfWorkers)
<-done
endTime := time.Now()
diff1 := endTime.Sub(startTime)
fmt.Println("--------------------------------")
startTime = time.Now()
for i := 0; i < noOfJobs; i++ {
randomNumber1 := rand.Intn(999)
randomNumber2 := rand.Intn(999)
result := addNumbers(randomNumber1, randomNumber2)
fmt.Printf("Job id %d, %d + %d = %d\n", i, randomNumber1, randomNumber2, result)
}
endTime = time.Now()
diff2 := endTime.Sub(startTime)
fmt.Println("--------------------------------")
fmt.Println("Worker Pool execution, total time taken: ", diff1.Seconds(), "seconds")
fmt.Println("Sequential execution, total time taken: ", diff2.Seconds(), "seconds")
}
Podemos ver una difecencia abismal entre ejecutar el código mediante go routines o secuencialmente, del orden de 10X mas rápido.
Worker Pool execution, total time taken: 1.071072607 seconds
Sequential execution, total time taken: 10.580277014 seconds
Select:
La sentencia select
resulta muy útil cuando queremos dejar varias go routines en background y leer la respuesta de la primera en responder, esta forma de operar es muy utilizada en aplicaciones donde el rendimiento es fundamental.
Por ejemplo podrÃamos tener una base de datos distribuida, replicada en cluster en varias localizaciones goegráficas, consultar a cada uno de los nodos en una go routine y utilizar la respuesta de la máquina mas rápida.
package main
import (
"fmt"
"time"
)
func tryServer(serverNumber int, output chan string) {
// Real case could be like this, instead of the switch/case:
// output <- databaseConnect(serverNumber)
switch serverNumber {
case 1:
time.Sleep(6 * time.Second)
output <- "server1"
case 2:
time.Sleep(1 * time.Second)
output <- "server2"
default:
time.Sleep(2 * time.Second)
output <- "server3"
}
}
func main() {
servers := []string{"server1", "server2", "server3"}
output := make(chan string)
for i, _ := range servers {
go tryServer(i, output)
}
select {
case fastServer := <-output:
fmt.Println("fastServer: ", fastServer)
}
}
fastServer: server2
Mediante select
también se puede hacer un intento de lectura de un canal y si no lo consigue pasar a la opción default
.
package main
import (
"fmt"
"time"
)
func writeChan(channel chan string) {
time.Sleep(1 * time.Second)
channel <- "AAAA"
}
func main() {
channel := make(chan string)
go writeChan(channel)
select {
case v := <-channel:
fmt.Println("Received value: ", v)
default:
fmt.Println("Default case executed.")
}
}
Default case executed.
Ahora utilizando el mismo ejemplo, pero hacemos que la función writeChan() escriba de inmediato en el canal de forma que en la función main() pueda leer el dato.
package main
import (
"fmt"
"time"
)
func writeChan(channel chan string) {
//time.Sleep(1 * time.Second)
channel <- "AAAA"
}
func main() {
channel := make(chan string)
go writeChan(channel)
time.Sleep(1 * time.Second)
select {
case v := <-channel:
fmt.Println("Received value: ", v)
default:
fmt.Println("Default case executed.")
}
}
Received value: AAAA
Cuando todas las opciones del select
están disponibles al mismo tiempo se elegirá una de ellas de forma aleatoria.
package main
import (
"fmt"
"time"
)
func tryServer(serverNumber int, output chan string) {
// Real case could be like this, instead of the switch/case:
// output <- databaseConnect(serverNumber)
switch serverNumber {
case 1:
time.Sleep(1 * time.Second)
output <- "server1"
case 2:
time.Sleep(1 * time.Second)
output <- "server2"
default:
time.Sleep(1 * time.Second)
output <- "server3"
}
}
func main() {
servers := []string{"server1", "server2", "server3"}
output := make(chan string)
for i, _ := range servers {
go tryServer(i, output)
}
select {
case fastServer := <-output:
fmt.Println("fastServer: ", fastServer)
}
}
Puede salir cualquiera de los tres servers:
fastServer: server3
fastServer: server1
fastServer: server2
Mutex:
Cuando trabajamos con programas concurrentes puede que tengamos una race condition, es decir partes del código modificando una variable de forma simultánea. Veamos este ejemplo en el que se ejecutan 100 go routines llamando a la función increment(), la variable x deberÃa de incrementar su valor 100 veces, en cambio en cada ejecución obtenemos un resultado distinto.
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup) {
x = x + 1
wg.Done()
}
func main() {
var w sync.WaitGroup
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w)
}
w.Wait()
fmt.Println("Final value of x: ", x)
}
Final value of x: 968
Final value of x: 960
Final value of x: 965
Final value of x: 956
Para evitar las race condition debemos utilizar bloqueos mutex de este modo la variable x solo podrá ser modificada por una go routine en cada momento.
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
x = x + 1
m.Unlock()
wg.Done()
}
func main() {
var w sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, &m)
}
w.Wait()
fmt.Println("Final value of x:", x)
}
Final value of x: 1000