:mannotop async – manno的博客

标签: async

Manage Child Goroutines With context.Context

context.Context

As described in official Go documentation,context.Context carries deadlines, cancelation signals, and other scoped values across boundaries and between processes.

Basic usage

Context represents scope or the lifetime of the attached goroutines. When you see a function signature that looks like the following:

// Task is a function that can be and should be run as goroutine,
// it can be cancelled by cancelling the input context.
func Task(context.Context, ...args interface{})

It means that you can use the function as following:

func main() {
    // Create a new context being cancelled in 5 seconds.
    ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
    // Start a new goroutine whose lifetime's attached to ctx.
    go task(ctx, args...)
}

The above code means that if the task function lasts over 5 seconds, it will be canceled, which helps avoid leaking goroutines.

You should design your own API in the manner shown above. When you have some long-running functions, consider attaching their lifetime to the context.

Create and derive context

To create a new and empty context, use:

// Create a new, empty, and unexecuted context.
context.Background()

Contexts can be derived, child contexts are complete once parent context is finished.

func main() {
    // Create a new context.
    parent, cancelParent := context.WithCancel(context.Background())
    // Derive child contexts from parent.
    childA, _ := context.WithTimeout(parent, 5 * time.Secound)
    childB, _ := context.WithDeadline(parent, time.Now().Add(1 * time.Minute)
    go func() {
        <-childA.Done()
        <-childB.Done()
        fmt.Println("All children are done")
    }()
    // Cancel parent make all children are cancelled.
    cancelParent()
}
// -> Result: All children are done
  • context.WithCancel(parentContext) creates a new context which completes when the returned cancel function is called or when the parent’s context finishes, whichever happens first.
  • context.WithTimeout(contextContext, 5 * time.Second) creates a new context which finishes when the returned cancel function is called or when it exceeds timeout or when the parent’s context finishes, whichever happens first.
  • context.WithDeadline(parentContext, time.Now().Add(1 * time.Minute) creates a new context which finishes when the returned cancel function deadline expires or when the parent’s context completes, whichever happens first.

There are some other methods to derive context. Check out here for further details.

Manage Child Goroutines

Now’s let solve a real-world common problem with context.Context.

The following is a very common use case in concurrency programming:

“You have 2 tasks A and B. Your main goroutine forks N goroutines (workers) running task A and M goroutines (workers) running task B. How do you gracefully shut down all child tasks when your main goroutine finished (e.g. user requests to close application)?”

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
)

func taskA(ctx context.Context, index int) {
	done := false
	go func() {
		// Keep doing something.
		for i := 0; !done; i++ {
			fmt.Printf("A%d%d\n", index, i)
		}
	}()

	// Wait util context is cancelled.
	<-ctx.Done()
	// Turn on closing flag.
	done = true
}

func taskB(ctx context.Context, index int) {
loop:
	for i := 0; ; i++ {
		select {
		// Try pushing some message to some channel.
		case someChannel <- fmt.Sprintf("B%d%d\n", index, i):
			continue loop
			// Or when context is cancelled, finish the task.
		case <-ctx.Done():
			break loop
		}
	}
}

func main() {
	// Create application context.
	ctx, cancel := context.WithCancel(context.Background())

	// Fork n of task A with application context.
	for i := 0; i < N; i++ {
		go taskA(ctx, i)
	}

	// Fork m of task B with application context.
	for i := 0; i < M; i++ {
		go taskB(ctx, i)
	}

	// Wait for SIGINT.
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt)
	<-sig

	// Shutdown. Cancel application context will kill all attached tasks.
	cancel()
}

Use Go Channels as Promises and Async/Await

Promise And Async/Await in other languages

If you’ve ever programmed with Javascript, you definitely know about Promise and async/awaitC#Java, Python, and some other programming languages apply the same pattern but with other names such as Task or Future.

On the contrary, Go doesn’t follow the pattern at all. Instead, it introduces goroutines and channels. However, it isn’t difficult to replicate the pattern with goroutines and channels.

Single async/await

//javascript

const longRunningTask = async () => {
    // Simulate a workload.
    sleep(3000)
    return Math.floor(Math.random() * Math.floor(100))
}

const r = await longRunningTask()
console.log(r)
package main

import (
	"fmt"
        "math/rand"
	"time"
)

//golang

func longRunningTask() <-chan int32 {
	r := make(chan int32)

	go func() {
		defer close(r)
		
		// Simulate a workload.
		time.Sleep(time.Second * 3)
		r <- rand.Int31n(100)
	}()

	return r
}

func main() {
	r := <-longRunningTask()
	fmt.Println(r)
}

To declare an “async” function in Go:

  • The return type is <-chan ReturnType.
  • Within the function, create a channel by make(chan ReturnType) and return the created channel at the end of the function.
  • Start an anonymous goroutine by go func() {...} and implement the function’s logic inside that anonymous function.
  • Return the result by sending the value to channel.
  • At the beginning of the anonymous function, add defer close(r) to close the channel once done.

To “await” the result, simply read the value from channel by v := <- fn().

Promise.all()

It’s very common that we start multiple async tasks then wait for all of them to finish and gather their results. Doing that is quite simple in both Javascript and Golang.

//javascript

const longRunningTask = async () => {
    // Simulate a workload.
    sleep(3000)
    return Math.floor(Math.random() * Math.floor(100))
}

const [a, b, c] = await Promise.all(longRunningTask(), longRunningTask(), longRunningTask())
console.log(a, b, c)
package main

import (
	"fmt"
        "math/rand"
	"time"
)

//golang

func longRunningTask() <-chan int32 {
	r := make(chan int32)

	go func() {
		defer close(r)
		
		// Simulate a workload.
		time.Sleep(time.Second * 3)
		r <- rand.Int31n(100)
	}()

	return r
}

func main() {
	aCh, bCh, cCh := longRunningTask(), longRunningTask(), longRunningTask()
	a, b, c := <-aCh, <-bCh, <-cCh
	
	fmt.Println(a, b, c)
}

In Golang, we have to do it in 2 lines of code and introduce 3 more variables, but it’s clean and simple enough.

We can not do <-longRun(), <-longRun(), <-longRun(), which will longRun() one by one instead all in once.

Promise.race()

Sometimes, a piece of data can be received from several sources to avoid high latencies, or there’re cases that multiple results are generated but they’re equivalent and the only first response is consumed. This first-response-win pattern, therefore, is quite popular. Achieving that in both Javascript and Go is very simple.

//javascript

const one = async () => {
    // Simulate a workload.
    sleep(Math.floor(Math.random() * Math.floor(2000)))
    return 1
}

const two = async () => {
    // Simulate a workload.
    sleep(Math.floor(Math.random() * Math.floor(1000)))
    sleep(Math.floor(Math.random() * Math.floor(1000)))
    return 2
}

const r = await Promise.race(one(), two())
console.log(r)
package main

import (
	"fmt"
	"math/rand"
	"time"
)

//golang

func one() <-chan int32 {
	r := make(chan int32)

	go func() {
		defer close(r)

		// Simulate a workload.
		time.Sleep(time.Millisecond * time.Duration(rand.Int63n(2000)))
		r <- 1
	}()

	return r
}

func two() <-chan int32 {
	r := make(chan int32)

	go func() {
		defer close(r)

		// Simulate a workload.
		time.Sleep(time.Millisecond * time.Duration(rand.Int63n(1000)))
		time.Sleep(time.Millisecond * time.Duration(rand.Int63n(1000)))
		r <- 2
	}()

	return r
}

func main() {
	var r int32
	select {
	case r = <-one():
	case r = <-two():
	}

	fmt.Println(r)
}

select-case is the pattern that Go designed specifically for racing channel operations. We can even do more stuff within each case, but we’re focusing only on the result so we just leave them all empty.

Promise.then() and Promise.catch()

Because Go’s error propagation model is very different from Javascript, there’s any clean way to replicate Promise.then() and Promise.catch(). In Go, error is returned along with return values instead of being thrown as exception. Therefore, if your function can fail, you can consider changing your return <-chan ReturnType into <-chan ReturnAndErrorType, which is a struct holding both the result and error.

Go style Promise&then

type Promise struct {
    wg  sync.WaitGroup
    res string
    err error
}

func NewPromise(f func() (string, error)) *Promise {
    p := &Promise{}
    p.wg.Add(1)
    go func() {
        p.res, p.err = f()
        p.wg.Done()
    }()
    return p
}

func (p *Promise) Then(r func(string), e func(error)) {
    go func() {
        p.wg.Wait()
        if p.err != nil {
            e(p.err)
            return
        }
        r(p.res)
    }()
}