Go语言学习之context包的用法详解

Phaedra ·
更新时间:2024-09-20
· 1393 次阅读

目录

前言

需求一

需求二

Context 接口

emptyCtx

valueCtx

类型定义

WithValue

cancelCtx

类型定义

cancelCtx

WithCancel

timerCtx

类型定义

WithDeadline

WithTimeout

总结

前言

日常 Go 开发中,Context 包是用的最多的一个了,几乎所有函数的第一个参数都是 ctx,那么我们为什么要传递 Context 呢,Context 又有哪些用法,底层实现是如何呢?相信你也一定会有探索的欲望,那么就跟着本篇文章,一起来学习吧!

需求一

开发中肯定会调用别的函数,比如 A 调用 B,在调用过程中经常会设置超时时间,比如超过2s 就不等待 B 的结果了,直接返回,那么我们需要怎么做呢?

// 睡眠5s,模拟长时间操作 func FuncB() (interface{}, error) {  time.Sleep(5 * time.Second)  return struct{}{}, nil } func FuncA() (interface{}, error) {  var res interface{}  var err error  ch := make(chan interface{})   // 调用FuncB(),将结果保存至 channel 中  go func() {   res, err = FuncB()   ch <- res  }()   // 设置一个2s的定时器  timer := time.NewTimer(2 * time.Second)   // 监测是定时器先结束,还是 FuncB 先返回结果  select {     // 超时,返回默认值  case <-timer.C:   return "default", err     // FuncB 先返回结果,关闭定时器,返回 FuncB 的结果  case r := <-ch:   if !timer.Stop() {    <-timer.C   }   return r, err  } } func main() {  res, err := FuncA()  fmt.Println(res, err) }

上面我们的实现,可以实现超过等待时间后,A 不等待 B,但是 B 并没有感受到取消信号,如果 B 是个计算密度型的函数,我们也希望B 感知到取消信号,及时取消计算并返回,减少资源浪费。

另一种情况,如果存在多层调用,比如A 调用 B、C,B 调用 D、E,C调用 E、F,在超过 A 的超时时间后,我们希望取消信号能够一层层的传递下去,后续所有被调用到的函数都能感知到,及时返回。

需求二

在多层调用的时候,A->B->C->D,有些数据需要固定传输,比如 LogID,通过打印相同的 LogID,我们就能够追溯某一次调用,方便问题的排查。如果每次都需要传参的话,未免太麻烦了,我们可以使用 Context 来保存。通过设置一个固定的 Key,打印日志时从中取出 value 作为 LogID。

const LogKey = "LogKey" // 模拟一个日志打印,每次从 Context 中取出 LogKey 对应的 Value 作为LogID type Logger struct{} func (logger *Logger) info(ctx context.Context, msg string) {  logId, ok := ctx.Value(LogKey).(string)  if !ok {   logId = uuid.New().String()  }  fmt.Println(logId + " " + msg) } var logger Logger // 日志打印 并 调用 FuncB func FuncA(ctx context.Context) {  logger.info(ctx, "FuncA")  FuncB(ctx) } func FuncB(ctx context.Context) {  logger.info(ctx, "FuncB") } // 获取初始化的,带有 LogID 的 Context,一般在程序入口做 func getLogCtx(ctx context.Context) context.Context {  logId, ok := ctx.Value(LogKey).(string)  if ok {   return ctx  }  logId = uuid.NewString()  return context.WithValue(ctx, LogKey, logId) } func main() {  ctx = getLogCtx(context.Background())  FuncA(ctx) }

这利用到了本篇文章讲到的 valueCtx,继续往下看,一起来学习 valueCtx 是怎么实现的吧!

Context 接口 type Context interface {  Deadline() (deadline time.Time, ok bool)  Done() <-chan struct{}  Err() error  Value(key interface{}) interface{} }

Context 接口比较简单,定义了四个方法:

Deadline() 方法返回两个值,deadline 表示 Context 将会在什么时间点取消,ok 表示是否设置了deadline。当 ok=false 时,表示没有设置deadline,那么此时 deadline 将会是个零值。多次调用这个方法返回同样的结果。

Done() 返回一个只读的 channel,类型为 chan struct{},如果当前的 Context 不支持取消,Done 返回 nil。我们知道,如果一个 channel 中没有数据,读取数据会阻塞;而如果channel被关闭,则可以读取到数据,因此可以监听 Done 返回的 channel,来获取 Context 取消的信号。

Err() 返回 Done 返回的 channel 被关闭的原因。当 channel 未被关闭时,Err() 返回 nil;channel 被关闭时则返回相应的值,比如 Canceled 、DeadlineExceeded。Err() 返回一个非 nil 值之后,后面再次调用会返回相同的值。

Value() 返回 Context 保存的键值对中,key 对应的 value,如果 key 不存在则返回 nil。

Done() 是一个比较常用的方法,下面是一个比较经典的流式处理任务的示例:监听 ctx.Done() 是否被关闭来判断任务是否需要取消,需要取消则返回相应的原因;没有取消则将计算的结果写入到 out channel中。

 func Stream(ctx context.Context, out chan<- Value) error {   for {     // 处理数据    v, err := DoSomething(ctx)    if err != nil {     return err    }     // ctx.Done() 读取到数据,说明获取到了任务取消的信号    select {    case <-ctx.Done():     return ctx.Err()     // 否则将结果输出,继续计算    case out <- v:    }   }  }

Value() 也是一个比较常用的方法,用于在上下文中传递一些数据。使用 context.WithValue() 方法存入 key 和 value,通过 Value() 方法则可以根据 key 拿到 value。

func main() {  ctx := context.Background()  c := context.WithValue(ctx, "key", "value")  v, ok := c.Value("key").(string)  fmt.Println(v, ok) } emptyCtx

Context 接口并不需要我们自己去手动实现,一般我们都是直接使用 context 包中提供的 Background() 方法和 TODO() 方法,来获取最基础的 Context。

var (  background = new(emptyCtx)  todo       = new(emptyCtx) ) func Background() Context {  return background } func TODO() Context {  return todo }

Background() 方法一般用在 main 函数,或者程序的初始化方法中;在我们不知道使用哪个 Context,或者上文没有传递 Context时,可以使用 TODO()。

Background() 和 TODO() 都是基于 emptyCtx 生成的,从名字可以看出来,emptyCtx 是一个空的Context,没有 deadline、不能被取消、没有键值对。

type emptyCtx int func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {  return } func (*emptyCtx) Done() <-chan struct{} {  return nil } func (*emptyCtx) Err() error {  return nil } func (*emptyCtx) Value(key interface{}) interface{} {  return nil } func (e *emptyCtx) String() string {  switch e {  case background:   return "context.Background"  case todo:   return "context.TODO"  }  return "unknown empty Context" }

除了上面两个最基本的 Context 外,context 包中提供了功能更加丰富的 Context,包括 valueCtx、cancelCtx、timerCtx,下面我们就挨个来看下。

valueCtx

使用示例

我们一般使用 context.WithValue() 方法向 Context 存入键值对,然后通过 Value() 方法根据 key 得到 value,此种功能的实现就依赖 valueCtx。

func main() {  ctx := context.Background()  c := context.WithValue(ctx, "myKey", "myValue")  v1 := c.Value("myKey")  fmt.Println(v1.(string))  v2 := c.Value("hello")  fmt.Println(v2) //  nil } 类型定义

valueCtx 结构体中嵌套了 Context,使用 key 、value 来保存键值对:

type valueCtx struct {  Context  key, val interface{} } WithValue

context包 对外暴露了 WithValue 方法,基于一个 parent context 来创建一个 valueCtx。从下面的源码中可以看出,key 必须是可比较的!

func WithValue(parent Context, key, val interface{}) Context {  if parent == nil {   panic("cannot create context from nil parent")  }  if key == nil {   panic("nil key")  }  if !reflectlite.TypeOf(key).Comparable() {   panic("key is not comparable")  }  return &valueCtx{parent, key, val} }

*valueCtx 实现了 Value(),可以根据 key 得到 value。这是一个向上递归寻找的过程,如果 key 不在当前 valueCtx 中,会继续向上找 parent Context,直到找到最顶层的 Context,一般最顶层的是 emptyCtx,而 emtpyCtx.Value() 返回 nil。

func (c *valueCtx) Value(key interface{}) interface{} {  if c.key == key {   return c.val  }  return c.Context.Value(key) } cancelCtx

cancelCtx 是一个用于取消任务的 Context,任务通过监听 Context 是否被取消,来决定是否继续处理任务还是直接返回。

如下示例中,我们在 main 函数定义了一个 cancelCtx,并在 2s 后调用 cancel() 取消 Context,即我们希望 doSomething() 在 2s 内完成任务,否则就可以直接返回,不需要再继续计算浪费资源了。

doSomething() 方法内部,我们使用 select 监听任务是否完成,以及 Context 是否已经取消,哪个先到就执行哪个分支。方法模拟了一个 5s 的任务,main 函数等待时间是2s,因此没有完成任务;如果main函数等待时间改为10s,则任务完成并会返回结果。

这只是一层调用,真实情况下可能会有多级调用,比如 doSomething 可能又会调用其他任务,一旦 parent Context 取消,后续的所有任务都应该取消。

func doSomething(ctx context.Context) (interface{}, error) {  res := make(chan interface{})  go func() {   fmt.Println("do something")   time.Sleep(time.Second * 5)   res <- "done"  }()  select {  case <-ctx.Done():   return nil, ctx.Err()  case value := <-res:   return value, nil  } } func main() {  ctx, cancel := context.WithCancel(context.Background())  go func() {   time.Sleep(time.Second * 2)   cancel()  }()  res, err := doSomething(ctx)  fmt.Println(res, err) // nil , context canceled }

接下来就让我们来研究下,cancelCtx 是如何实现取消的吧

类型定义

canceler 接口包含 cancel() 和 Done() 方法,*cancelCtx 和 *timerCtx 均实现了这个接口。

closedchan 是一个被关闭的channel,可以用于后面 Done() 返回

canceled 是一个 err,用于 Context 被取消的原因

type canceler interface {  cancel(removeFromParent bool, err error)  Done() <-chan struct{} } // closedchan is a reusable closed channel. var closedchan = make(chan struct{}) func init() {  close(closedchan) } var Canceled = errors.New("context canceled")

CancelFunc 是一个函数类型定义,是一个取消函数,有如下规范:

CancelFunc 告诉一个任务停止工作

CancelFunc 不会等待任务结束

CancelFunc 支持并发调用

第一次调用后,后续的调用不会产生任何效果

type CancelFunc func()

&cancelCtxKey 是一个固定的key,用来返回 cancelCtx 自身

var cancelCtxKey int cancelCtx

cancelCtx 是可以被取消的,它嵌套了 Context 接口,实现了 canceler 接口。cancelCtx 使用 children 字段保存同样实现 canceler 接口的子节点,当 cancelCtx 被取消时,所有的子节点也会取消。

type cancelCtx struct {  Context  mu       sync.Mutex            // 保护如下字段,保证线程安全  done     atomic.Value          // 保存 channel,懒加载,调用 cancel 方法时会关闭这个 channel  children map[canceler]struct{} // 保存子节点,第一次调用 cancel 方法时会置为 nil  err      error                 // 保存为什么被取消,默认为nil,第一次调用 cancel 会赋值 }

*cancelCtx 的 Value() 方法 和 *valueCtx 的 Value() 方法类似,只不过加了个固定的key: &cancelCtxKey。当key 为 &cancelCtxKey 时返回自身

func (c *cancelCtx) Value(key interface{}) interface{} {  if key == &cancelCtxKey {   return c  }  return c.Context.Value(key) }

*cancelCtx 的 done 字段是懒加载的,只有在调用 Done() 方法 或者 cancel() 时才会赋值。

func (c *cancelCtx) Done() <-chan struct{} {  d := c.done.Load()   // 如果已经有值了,直接返回  if d != nil {   return d.(chan struct{})  }   // 没有值,加锁赋值  c.mu.Lock()  defer c.mu.Unlock()  d = c.done.Load()  if d == nil {   d = make(chan struct{})   c.done.Store(d)  }  return d.(chan struct{}) }

Err 方法返回 cancelCtx 的 err 字段

func (c *cancelCtx) Err() error {    c.mu.Lock()    err := c.err    c.mu.Unlock()    return err } WithCancel

那么我们如何新建一个 cancelCtx呢?context 包提供了 WithCancel() 方法,让我们基于一个 Context 来创建一个 cancelCtx。WithCancel() 方法返回两个字段,一个是基于传入的 Context 生成的 cancelCtx,另一个是 CancelFunc。

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {  if parent == nil {   panic("cannot create context from nil parent")  }  c := newCancelCtx(parent)  propagateCancel(parent, &c)  return &c, func() { c.cancel(true, Canceled) } }

WithCancel 调用了两个外部方法:newCancelCtx 、propagateCancel。newCancelCtx 比较简单,根据传入的 context,返回了一个 cancelCtx 结构体。

func newCancelCtx(parent Context) cancelCtx {  return cancelCtx{Context: parent} }

propagateCancel 从名字可以看出,就是将 cancel 传播。如果父Context支持取消,那么我们需要建立一个通知机制,这样父节点取消的时候,通知子节点也取消,层层传播。

在 propagateCancel 中,如果 父Context 是 cancelCtx 类型且未取消,会将 子Context 挂在它下面,形成一个树结构;其余情况都不会挂载。

func propagateCancel(parent Context, child canceler) {   // 如果 parent 不支持取消,那么就不支持取消传播,直接返回  done := parent.Done()  if done == nil {   return   }   // 到这里说明 done 不为 nil,parent 支持取消  select {  case <-done:   // 如果 parent 此时已经取消了,那么直接告诉子节点也取消   child.cancel(false, parent.Err())   return  default:  }   // 到这里说明此时 parent 还未取消   // 如果 parent 是未取消的 cancelCtx   if p, ok := parentCancelCtx(parent); ok {     // 加锁,防止并发更新   p.mu.Lock()     // 再次判断,因为有可能上一个获得锁的进行了取消操作。     // 如果 parent 已经取消了,那么子节点也直接取消   if p.err != nil {    child.cancel(false, p.err)   } else {       // 把子Context 挂到父节点 parent cancelCtx 的 children字段下       // 之后 parent cancelCtx 取消时,能通知到所有的 子Context     if p.children == nil {     p.children = make(map[canceler]struct{})    }    p.children[child] = struct{}{}   }   p.mu.Unlock()  } else {    // parent 不是 cancelCtx 类型,可能是用户自己实现的Context   atomic.AddInt32(&goroutines, +1)     // 启动一个协程监听,如果 parent 取消了,子 Context 也取消   go func() {    select {    case <-parent.Done():     child.cancel(false, parent.Err())    case <-child.Done():    }   }()  } }

cancel 方法就是来取消 cancelCtx,主要的工作是:关闭c.done 中的channel,给 err 赋值,然后级联取消所有 子Context。如果 removeFromParent 为 true,会从父节点中删除以该节点为树顶的树。

cancel() 方法只负责自己管辖的范围,即自己以及自己的子节点,然后根据配置判断是否需要从父节点中移除自己为顶点的树。如果子节点还有子节点,那么由子节点负责处理,不用自己负责了。

propagateCancel() 中有三处调用了 cancel() 方法,传入的 removeFromParent 都为 false,是因为当时根本没有挂载,不需要移除。而 WithCancel 返回的 CancelFunc ,传入的 removeFromParent 为 true,是因为调用 propagateCancel 有可能产生挂载,当产生挂载时,调用 cancel() 就需要移除了。

func (c *cancelCtx) cancel(removeFromParent bool, err error) {   // err 是指取消的原因,必传,cancelCtx 中是 errors.New("context canceled")  if err == nil {   panic("context: internal error: missing cancel error")  }   // 涉及到保护字段值的修改,都需要加锁  c.mu.Lock()   // 如果该Context已经取消过了,直接返回。多次调用cancel,不会产生额外效果  if c.err != nil {   c.mu.Unlock()   return   }   // 给 err 赋值,这里 err 一定不为 nil  c.err = err   // close channel  d, _ := c.done.Load().(chan struct{})   // 因为c.done 是懒加载,有可能存在 nil 的情况   // 如果 c.done 中没有值,直接赋值 closedchan;否则直接 close  if d == nil {   c.done.Store(closedchan)  } else {   close(d)  }   // 遍历当前 cancelCtx 所有的子Context,让子节点也 cancel   // 因为当前的Context 会主动把子Context移除,子Context 不用主动从parent中脱离   // 因此 child.cancel 传入的 removeFromParent 为false  for child := range c.children {   child.cancel(false, err)  }   // 将 children 置空,相当于移除自己的所有子Context  c.children = nil  c.mu.Unlock()   // 如果当前 cancelCtx 需要从上层的 cancelCtx移除,调用removeChild方法   // c.Context 就是自己的父Context  if removeFromParent {   removeChild(c.Context, c)  } }

从propagateCancel方法中可以看到,只有parent 属于 cancelCtx 类型 ,才会将自己挂载。因此 removeChild 会再次判断 parent 是否为 cancelCtx,和之前的逻辑保持一致。找到的话,再将自己移除,需要注意的是,移除会把自己及其自己下面的所有子节点都移除。

如果上一步 propagateCancel 方法将自己挂载到了 A 上,但是在调用 cancel() 时,A 已经取消过了,此时 parentCancelCtx() 会返回 false。不过这没有关系,A 取消时已经将挂载的子节点移除了,当前的子节点不用将自己从 A 中移除了。

func removeChild(parent Context, child canceler) {   // parent 是否为未取消的 cancelCtx  p, ok := parentCancelCtx(parent)  if !ok {   return  }   // 获取 parent cancelCtx 的锁,修改保护字段 children  p.mu.Lock()   // 将自己从 parent cancelCtx 的 children 中删除  if p.children != nil {   delete(p.children, child)  }  p.mu.Unlock() }

parentCancelCtx 判断 parent 是否为 未取消的 *cancelCtx。取消与否容易判断,难判断的是 parent 是否为  *cancelCtx,因为有可能其他结构体内嵌了 cancelCtx,比如 timerCtx,会通过比对 channel 来确定。

func parentCancelCtx(parent Context) (*cancelCtx, bool) {   // 如果 parent context 的 done 为 nil, 说明不支持 cancel,那么就不可能是 cancelCtx  // 如果 parent context 的 done 为 closedchan, 说明 parent context 已经 cancel 了  done := parent.Done()  if done == closedchan || done == nil {   return nil, false  }   // 到这里说明支持取消,且没有被取消  // 如果 parent context 属于原生的 *cancelCtx 或衍生类型,需要继续进行后续判断  // 如果 parent context 无法转换到 *cancelCtx,则认为非 cancelCtx,返回 nil,fasle  p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)  if !ok {   return nil, false  }   // 经过上面的判断后,说明 parent context 可以被转换为 *cancelCtx,这时存在多种情况:  //   - parent context 就是 *cancelCtx  //   - parent context 是标准库中的 timerCtx  //   - parent context 是个自己自定义包装的 cancelCtx  //  // 针对这 3 种情况需要进行判断,判断方法就是:   //   判断 parent context 通过 Done() 方法获取的 done channel 与 Value 查找到的 context 的 done channel 是否一致  //   // 一致情况说明 parent context 为 cancelCtx 或 timerCtx 或 自定义的 cancelCtx 且未重写 Done(),  // 这种情况下可以认为拿到了底层的 *cancelCtx  //   // 不一致情况说明 parent context 是一个自定义的 cancelCtx 且重写了 Done() 方法,并且并未返回标准 *cancelCtx 的  // 的 done channel,这种情况需要单独处理,故返回 nil, false  pdone, _ := p.done.Load().(chan struct{})  if pdone != done {   return nil, false  }  return p, true } timerCtx

简介

timerCtx 嵌入了 cancelCtx,并新增了一个 timer 和 deadline 字段。timerCtx 的取消能力是复用 cancelCtx 的,只是在这个基础上增加了定时取消而已。

在我们的使用过程中,有可能还没到 deadline,任务就提前完成了,此时需要手动调用 CancelFunc。

func slowOperationWithTimeout(ctx context.Context) (Result, error) {   ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)     defer cancel()  // 如果未到截止时间,slowOperation就完成了,尽早调用 cancel() 释放资源   return slowOperation(ctx) } 类型定义 type timerCtx struct {    cancelCtx // 内嵌 cancelCtx    timer *time.Timer // 受 cancelCtx.mu 互斥锁的保护    deadline time.Time // 截止时间 }

Deadline() 返回 deadline 字段的值

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {  return c.deadline, true } WithDeadline

WithDeadline 基于parent Context 和 时间点 d,返回了一个定时取消的 Context,以及一个 CancelFunc。返回的Context 有三种情况被取消:1. 到达了指定时间,就会主动取消;2. 手动调用了 CancelFunc;3. 父Context取消,导致该Context被取消。这三种情况哪种先到,就会首次触发取消操作,后续的再次取消不会产生任何效果。

如果传入 parent Context 的 deadline 比指定的时间 d 还要早,此时 d 就没用处了,直接依赖 parent 取消传播就可以了。

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {   // 传入的 parent 不能为 nil  if parent == nil {   panic("cannot create context from nil parent")  }   // parent 也有 deadline,并且比 d 还要早,直接依赖 parent 的取消传播即可  if cur, ok := parent.Deadline(); ok && cur.Before(d) {   // The current deadline is already sooner than the new one.   return WithCancel(parent)  }   // 定义 timerCtx 接口  c := &timerCtx{   cancelCtx: newCancelCtx(parent),   deadline:  d,  }   // 设置传播,如果parent 属于 cancelCtx,会挂载到 children 字段上  propagateCancel(parent, c)   // 距离截止时间 d 还有多久  dur := time.Until(d)  if dur <= 0 {     // 已经到了截止时间,直接取消,同时从 parent 中取消挂载     // 由于是超时,取消时的 err 是 DeadlineExceeded   c.cancel(true, DeadlineExceeded)      // 再返回 c 和 CancelFunc,已经取消挂载了,此时的 CancelFunc 不会从 parent 中取消挂载     // 后面再次调用 CancelFunc 不会产生任何效果了     // 主动取消的话,err 是 Canceled   return c, func() { c.cancel(false, Canceled) }  }   // 还没有到截止时间,定义一个定时器,过了 dur 会自动取消  c.mu.Lock()  defer c.mu.Unlock()  if c.err == nil {   c.timer = time.AfterFunc(dur, func() {       // 由于是到了截止时间才取消,err 是 DeadlineExceeded    c.cancel(true, DeadlineExceeded)   })  }   // 返回 c 和 cancelFunc,主动取消的 err 是 Canceled  return c, func() { c.cancel(true, Canceled) } }

接下来我们看下 cancel 方法,timerCtx 的 cancel 方法 就是调用内嵌 cancelCtx 的 cancel() 方法,默认是不从父节点移除

func (c *timerCtx) cancel(removeFromParent bool, err error) {  c.cancelCtx.cancel(false, err)   // 从父节点中移除  if removeFromParent {   removeChild(c.cancelCtx.Context, c)  }   // 把定时器停了,释放资源   // 有可能还没到deadline,手动触发了 CancelFunc,此时把 timer 停了  c.mu.Lock()  if c.timer != nil {   c.timer.Stop()   c.timer = nil  }  c.mu.Unlock() } WithTimeout

WithTimeout 就是基于 WithDeadline,deadline 就是基于当前时间计算的

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {  return WithDeadline(parent, time.Now().Add(timeout)) } 总结

本篇文章,我们通过源码+示例的方式,一起学习了 context 包相关的结构以及实现逻辑,包括如下内容

Context 接口:定义了一些接口方法和规范

emptyCtx:空的Context,Background() 和 TODO() 方法就是使用的 emptyCtx

valueCtx:用于保存键值对,查询时是递归查询,可以用于 LogID 这种全局 id 的保存

cancelCtx:可以取消的Context,用于取消信号的传递

timerCtx:定时取消的 cancelCtx

以上就是Go语言学习之context包的用法详解的详细内容,更多关于Go语言 context包的资料请关注软件开发网其它相关文章!



GO 学习 go语言 context

需要 登录 后方可回复, 如果你还没有账号请 注册新账号