Esta pagina se ve mejor con JavaScript habilitado

Go routines

 ·  🎃 kr0m

Go routines es una manera de ejecutar funciones en background mientras el programa principal sigue su curso. Esto hace posible la ejecución de código de forma asíncrona, resultando muy útil para la ejecución de tareas en paralelo.

El artículo se compone de varias secciones:


Si eres nuevo en el mundo de Go te recomiendo los siguientes artículos anteriores:


Go routine:

En el siguiente código vamos a llamar a una función del modo tradicional y acto seguido a la misma mediante goroutine, pero veremos como aparece el going posterior a la go routine ya que el hilo principal de ejecución sigue adelante, no queda bloqueado mientras se ejecuta la go routine.

package main

import (
    "fmt"
    "time"
)

func f(from string) {
    for i := 0; i < 4; i++ {
        fmt.Println(from, ":", i)
    }
}

func main() {
    // Regular function call:
    f("direct")
    // GoRoutine function call:
    go f("goroutine")
    fmt.Println("going")
    // Sleep 1s
    time.Sleep(time.Second)
    fmt.Println("done")
}

Si ejecutamos el código veremos la siguiente salida:

direct : 0
direct : 1
direct : 2
direct : 3
going
goroutine : 0
goroutine : 1
goroutine : 2
goroutine : 3
done

Main no espera:

El problema de este enfoque es que hemos tenido que poner un time.Sleep(time.Second) para permitir que la función f() terminase antes de que main() llegase al final. Si la función f() requiere de mucho tiempo, main() termina sin esperar a las go routines que haya llamado, podemos ver un ejemplo en el código siguiente:

package main

import (
    "fmt"
    "time"
)

func f(from string) {
    for i := 0; i < 4; i++ {
        fmt.Println(from, ":", i)
        time.Sleep(time.Second)
    }
}

func main() {
    // Regular function call:
    f("direct")
    // GoRoutine function call:
    go f("goroutine")
    fmt.Println("going")
    // Sleep 1s
    time.Sleep(time.Second)
    fmt.Println("done")
}

Si ejecutamos el código podemos ver que se ha llamado a f() mediante la go routine, pero esta vez solo le ha dado tiempo a ejecutar hasta la segunda iteración del bucle, ya que main() no espera a nadie:

direct : 0
direct : 1
direct : 2
direct : 3
going
goroutine : 0
goroutine : 1
done

Las go routines no soportan valor de retorno:

Un aspecto a tener en cuenta es que no se puede llamar a una función mediante go routines y almacenar el valor de retorno ya que estas rutinas se ejecutan en otro hilo de ejecución y no se sabe cuando terminarán. Modificando el ejemplo anterior de la siguiente manera podemos ver el error que nos muestra al intentarlo.

package main

import (
    "fmt"
    "time"
)

func f(from string) string{
    return from
}

func main() {
    // Regular function call:
    f("direct")
    // GoRoutine function call:
    answer := go f("goroutine")
    fmt.Println("going")
    // Sleep 1s
    time.Sleep(time.Second)
    fmt.Println("done")
}
./goroutine2.go:16:15: syntax error: unexpected go, expected expression

Channels:

La manera de sincronizar hilos de ejecución y obtener datos desde las go routines es mediante los channels de Go, mediante estos podrán intercambiar mensajes a fin de coordinar acciones.

Debemos tener en cuenta que cada canal debe ser de un tipo ya sea primitivo o mas complejo como una estructura, además los canales quedan bloqueados hasta que se lea/escriba de ellos, aunque se puede evitar parcialmente el problema mediante los buffered channels.

Tanto la recepción como la transmisión en un canal, bloquea ambas parte hasta que se reciba en el otro lado, es decir que si en main() tengo una recepción, el programa no seguirá adelante hasta recibir por el canal. En cambio si en main() tengo una emisión, no avanzará hasta que alguien lea del canal.

Podemos ver un ejemplo en el que la función sendEmail() queda bloqueada esperando a que main() lea las confirmaciones de envío de emails del canal.

package main

import (
    "fmt"
    "strconv"
    "time"
)

func sendEmail(email string, myChannel chan string) {
    for i := 0; i < 3; i++ {
        fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
        message := "Email: " + strconv.Itoa(i) + " sent successfully"
        myChannel <- message
    }
}

func main() {
    // Create a channel with 3 slots
    myChannel := make(chan string)
    // GoRoutine function call:
    go sendEmail("kr0m@alfaexploit.com", myChannel)
    for i := 0; i < 3; i++ {
        time.Sleep(time.Second * 3)
        fmt.Println(<-myChannel)
    }
    fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-14 18:37:59.676379163 +0200 CEST m=+0.000062471
Email: 0 sent successfully
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-14 18:38:02.677226485 +0200 CEST m=+3.000910002
Email: 1 sent successfully
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-14 18:38:05.67955186 +0200 CEST m=+6.003235309
Email: 2 sent successfully
done

Podemos ver como todos los emails han sido enviados con una separación entre ellos de 3s.


Buffered channels:

Mediante los buffered channels evitaremos hasta cierto punto los bloqueos mencionados en el apartado anterior, ya que se trata de canales con buffer los cuales son capaces de almacenar X elementos antes de quedar bloqueados.

El ejemplo anterior quedaría del siguiente modo.

package main

import (
    "fmt"
    "strconv"
    "time"
)

func sendEmail(email string, myChannel chan string) {
    for i := 0; i < 3; i++ {
        fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
        message := "Email: " + strconv.Itoa(i) + " sent successfully"
        myChannel <- message
    }
}

func main() {
    // Create a channel
    myChannel := make(chan string, 3)
    // GoRoutine function call:
    go sendEmail("kr0m@alfaexploit.com", myChannel)
    for i := 0; i < 3; i++ {
        time.Sleep(time.Second * 3)
        fmt.Println(<-myChannel)
    }
    fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-14 18:38:55.339534415 +0200 CEST m=+0.000037323
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-14 18:38:55.339616778 +0200 CEST m=+0.000119668
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-14 18:38:55.339618977 +0200 CEST m=+0.000121874
Email: 0 sent successfully
Email: 1 sent successfully
Email: 2 sent successfully
done

Podemos ver como todos los envíos de email han sido realizados en el mismo segundo, posteriormente la función main() va leyendo los resultados a su ritmo, pero el envío de los emails no ha bloqueado a main() ni main() ha bloqueado el envío.


Length vs Capacity:

La capacidad de un buffered channel es el número de valores que el canal puede llegar a almacenar antes de quedar bloqueado, mientras que la longitud es el número de elementos que el canal está almacenando actualmente.

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}
capacity is 3
length is 2
read value naveen
new length is 1

Closing channels:

Incluso con los buffered channels seguimos teniendo el problema de cuanto tiempo debe esperar la función main() antes de llegar al final del programa. Afortunadamente Go implementa una forma de controlar cuando un canal se ha cerrado, de este modo podemos llamar a una go routine, esperar los datos y cuando haya cerrado el canal, finalizar la ejecución de main().

La forma de leer tanto el valor del canal como el estado es la siguiente.

v, ok := <-myChannel

El programa anterior quedaría de la siguiente manera.

package main

import (
    "fmt"
    "strconv"
    "time"
)

func sendEmail(email string, myChannel chan string) {
    for i := 0; i < 3; i++ {
        fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
        message := "Email: " + strconv.Itoa(i) + " sent successfully"
        myChannel <- message
    }
    close(myChannel)
}

func main() {
    // Create a channel
    myChannel := make(chan string, 3)
    // GoRoutine function call:
    go sendEmail("kr0m@alfaexploit.com", myChannel)
    for {
        v, open := <-myChannel
        if open == false {
            break
        }
        fmt.Println("Received ", v, open)
    }
    fmt.Println("done")
}

También podemos utilizar el for range de Go sin tener que preocuparnos de si el canal está cerrado.

package main

import (
    "fmt"
    "strconv"
    "time"
)

func sendEmail(email string, myChannel chan string) {
    for i := 0; i < 3; i++ {
        fmt.Println("Email sent to:" + email + " - iteration: " + strconv.Itoa(i) + " - " + time.Now().String())
        message := "Email: " + strconv.Itoa(i) + " sent successfully"
        myChannel <- message
    }
    close(myChannel)
}

func main() {
    // Create a channel
    myChannel := make(chan string, 3)
    // GoRoutine function call:
    go sendEmail("kr0m@alfaexploit.com", myChannel)
    for v := range myChannel {
        fmt.Println("Received ", v)
    }
    fmt.Println("done")
}
Email sent to:kr0m@alfaexploit.com - iteration: 0 - 2024-08-18 19:09:04.03698939 +0200 CEST m=+0.000032195
Email sent to:kr0m@alfaexploit.com - iteration: 1 - 2024-08-18 19:09:04.03704383 +0200 CEST m=+0.000086620
Email sent to:kr0m@alfaexploit.com - iteration: 2 - 2024-08-18 19:09:04.037045945 +0200 CEST m=+0.000088735
Received  Email: 0 sent successfully
Received  Email: 1 sent successfully
Received  Email: 2 sent successfully
done

La idea es dejar todas las go routines funcionando, seguir ejecutando código en main() y al final de main() justo antes de salir comprobar todos los canales que hemos dejado abiertos.


Unidirectional channels:

Los canales pueden ser unidireccionales, pero estos no resultan de mucha utilidad si solo puede leerse o escribirse de ellos. No obstante los canales bidireccionales pueden pasar a unidireccionales si una función lo requiere, por ejemplo si solo necesita escribir en el canal y nunca leer, se puede forzar para que nunca se cometa el error de leer.

Un sencillo ejemplo donde el canal es bidireccional en la función main() pero unidireccional en sendData().

package main

import "fmt"

func sendData(sendch chan<- int) {
    sendch <- 10
}

func main() {
    chnl := make(chan int)
    go sendData(chnl)
    fmt.Println(<-chnl)
}

Si intentamos leer desde sendData() obtendremos el siguiente error:

./test.go:6:12: invalid operation: cannot receive from send-only channel sendch (variable of type chan<- int)

Wait groups:

Un wait group no es mas que un contador de go routines, mediante el método Add() añadiremos rutinas pendientes mientras que con Done() las eliminaremos, incrementando y decrementando el contador respectivamente.

El método Wait() bloqueará la ejecución de código hasta que el contador sea igual a cero.

wg.Add(N) -> Increment counter.
wg.Done() -> Decrement counter.
wg.Wait() -> Wait until counter == 0.

Veamos un sencillo ejemplo en el que se añaden 3 go routines desde la función main() y se espera a que se procesen antes de salir. Mientrastanto en la función process() se eliminan al terminar el procesado de datos, permitiendo que Wait() sea finalmente desbloqueado.

package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}

Worker pools:

EL concepto de worker pool consiste en tener varios procesos en paralelo haciendo el mismo trabajo, en este caso se trata de ir procesando jobs almacenados en una canal que actúa a modo de cola.

De forma muy resumida el código encola jobs en un canal, deja una go routine leyendo del canal de resultados, arranca varios workers que leen del canal de jobs, procesan los datos y encolan los resultados en el canal de resultados.

Cada una de las funciones se encarga de una parte del proceso:

  • queueJobs: Encola los jobs en el canal jobs, el canal tiene un máximo de 10 jobs, quedando bloqueado hasta que la función worker() vaya leyendo los jobs, vaciando la cola y dejando hueco para nuevos jobs.
  • getResults: Se queda leyendo del canal results, cuando haya terminado de iterar, asigna true al canal done.
  • createWorkerPool: Arranca 100 procesos worker qunque como máximo habrán 10 funcionando simultáneamente ya que es el tamaño del buffered channel del que leen. Incrementa el contador de un workgroup por cada proceso arrancado, se queda a la espera del workGroup.
  • worker: Obtiene la salida de la función digits y guarda el resultado en una estructura que mete en el canal results.
    Todos los workers leen tareas del canal jobs, cuando no queden jobs por hacer, todos los procesos worker ejecutarán el wg.Done() liberando el bloqueo de la función createWorkerPool().
    Al ser liberada la función createWorkerPool(), esta cierra el canal results que a su vez es detectado por la función getResults(), terminando el bucle y ejecutando done <- true.
    En main() se detecta el <-done ejecutando el resto de instrucciones.

Hay que tener en cuenta que conforme queueJobs() va encolando trabajos, worker() los va desencolando y ejecutando. El número de workers que hacen esto los ha arrancado createWorkerPool() pero se ven limitados por el tamaño del canal jobs, variando numberOfWorkers podemos obtener una concurrencia mayor o menor ya que variará el tamaño de dicho canal, quizás deberíamos de obtener el número de cores del sistema y configurar dicho parámetro con N-1.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type job struct {
    id            int
    randomNumber1 int
    randomNumber2 int
}

type result struct {
    job          job
    sumOfNumbers int
}

var numberOfWorkers = 10
var noOfJobs = 100
var jobs = make(chan job, numberOfWorkers)
var results = make(chan result, numberOfWorkers)
var done = make(chan bool)

func addNumbers(randomNumber1, randomNumber2 int) int {
    sum := randomNumber1 + randomNumber2
    time.Sleep(100 * time.Millisecond)
    return sum
}

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := result{job, addNumbers(job.randomNumber1, job.randomNumber2)}
        results <- output
    }
    wg.Done()
}

func createWorkerPool(numberOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < numberOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

func queueJobs(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomNumber1 := rand.Intn(999)
        randomNumber2 := rand.Intn(999)
        job := job{i, randomNumber1, randomNumber2}
        jobs <- job
    }
    close(jobs)
}

func getResults(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, %d + %d = %d\n", result.job.id, result.job.randomNumber1, result.job.randomNumber2, result.sumOfNumbers)
    }
    done <- true
}

func main() {
    startTime := time.Now()
    go queueJobs(noOfJobs)
    go getResults(done)
    createWorkerPool(numberOfWorkers)
    <-done
    endTime := time.Now()
    diff1 := endTime.Sub(startTime)
    fmt.Println("--------------------------------")

    startTime = time.Now()
    for i := 0; i < noOfJobs; i++ {
        randomNumber1 := rand.Intn(999)
        randomNumber2 := rand.Intn(999)
        result := addNumbers(randomNumber1, randomNumber2)
        fmt.Printf("Job id %d, %d + %d = %d\n", i, randomNumber1, randomNumber2, result)
    }
    endTime = time.Now()
    diff2 := endTime.Sub(startTime)
    fmt.Println("--------------------------------")

    fmt.Println("Worker Pool execution, total time taken: ", diff1.Seconds(), "seconds")
    fmt.Println("Sequential execution, total time taken: ", diff2.Seconds(), "seconds")
}

Podemos ver una difecencia abismal entre ejecutar el código mediante go routines o secuencialmente, del orden de 10X mas rápido.

Worker Pool execution, total time taken:  1.071072607 seconds
Sequential execution, total time taken:  10.580277014 seconds

Select:

La sentencia select resulta muy útil cuando queremos dejar varias go routines en background y leer la respuesta de la primera en responder, esta forma de operar es muy utilizada en aplicaciones donde el rendimiento es fundamental.

Por ejemplo podríamos tener una base de datos distribuida, replicada en cluster en varias localizaciones goegráficas, consultar a cada uno de los nodos en una go routine y utilizar la respuesta de la máquina mas rápida.

package main

import (
    "fmt"
    "time"
)

func tryServer(serverNumber int, output chan string) {
    // Real case could be like this, instead of the switch/case:
    // output <- databaseConnect(serverNumber)
    switch serverNumber {
    case 1:
        time.Sleep(6 * time.Second)
        output <- "server1"
    case 2:
        time.Sleep(1 * time.Second)
        output <- "server2"
    default:
        time.Sleep(2 * time.Second)
        output <- "server3"
    }
}

func main() {
    servers := []string{"server1", "server2", "server3"}
    output := make(chan string)

    for i, _ := range servers {
        go tryServer(i, output)
    }

    select {
    case fastServer := <-output:
        fmt.Println("fastServer: ", fastServer)
    }
}
fastServer:  server2

Mediante select también se puede hacer un intento de lectura de un canal y si no lo consigue pasar a la opción default.

package main

import (
    "fmt"
    "time"
)

func writeChan(channel chan string) {
    time.Sleep(1 * time.Second)
    channel <- "AAAA"
}

func main() {
    channel := make(chan string)
    go writeChan(channel)
    select {
    case v := <-channel:
        fmt.Println("Received value: ", v)
    default:
        fmt.Println("Default case executed.")
    }
}
Default case executed.

Ahora utilizando el mismo ejemplo, pero hacemos que la función writeChan() escriba de inmediato en el canal de forma que en la función main() pueda leer el dato.

package main

import (
    "fmt"
    "time"
)

func writeChan(channel chan string) {
    //time.Sleep(1 * time.Second)
    channel <- "AAAA"
}

func main() {
    channel := make(chan string)
    go writeChan(channel)
    time.Sleep(1 * time.Second)
    select {
    case v := <-channel:
        fmt.Println("Received value: ", v)
    default:
        fmt.Println("Default case executed.")
    }
}
Received value:  AAAA

Cuando todas las opciones del select están disponibles al mismo tiempo se elegirá una de ellas de forma aleatoria.

package main

import (
    "fmt"
    "time"
)

func tryServer(serverNumber int, output chan string) {
    // Real case could be like this, instead of the switch/case:
    // output <- databaseConnect(serverNumber)
    switch serverNumber {
    case 1:
        time.Sleep(1 * time.Second)
        output <- "server1"
    case 2:
        time.Sleep(1 * time.Second)
        output <- "server2"
    default:
        time.Sleep(1 * time.Second)
        output <- "server3"
    }
}

func main() {
    servers := []string{"server1", "server2", "server3"}
    output := make(chan string)

    for i, _ := range servers {
        go tryServer(i, output)
    }

    select {
    case fastServer := <-output:
        fmt.Println("fastServer: ", fastServer)
    }
}

Puede salir cualquiera de los tres servers:

fastServer:  server3
fastServer:  server1
fastServer:  server2

Mutex:

Cuando trabajamos con programas concurrentes puede que tengamos una race condition, es decir partes del código modificando una variable de forma simultánea. Veamos este ejemplo en el que se ejecutan 100 go routines llamando a la función increment(), la variable x debería de incrementar su valor 100 veces, en cambio en cada ejecución obtenemos un resultado distinto.

package main

import (
    "fmt"
    "sync"
)

var x = 0

func increment(wg *sync.WaitGroup) {
    x = x + 1
    wg.Done()
}

func main() {
    var w sync.WaitGroup
    for i := 0; i < 1000; i++ {
        w.Add(1)
        go increment(&w)
    }
    w.Wait()
    fmt.Println("Final value of x: ", x)
}
Final value of x: 968
Final value of x: 960
Final value of x: 965
Final value of x: 956

Para evitar las race condition debemos utilizar bloqueos mutex de este modo la variable x solo podrá ser modificada por una go routine en cada momento.

package main

import (
    "fmt"
    "sync"
)

var x = 0

func increment(wg *sync.WaitGroup, m *sync.Mutex) {
    m.Lock()
    x = x + 1
    m.Unlock()
    wg.Done()
}

func main() {
    var w sync.WaitGroup
    var m sync.Mutex
    for i := 0; i < 1000; i++ {
        w.Add(1)
        go increment(&w, &m)
    }
    w.Wait()
    fmt.Println("Final value of x:", x)
}
Final value of x: 1000
Si te ha gustado el artículo puedes invitarme a un RedBull aquí