2025年golang中的select原理解析

golang中的select原理解析基本用法 检查 ch 中有没有数据 select case d lt ch default 读取已经被 close 掉的 ch 时会返回零值 不会报错 因此在使用 for select 的时候要格外注意 for select case d lt ch

大家好,我是讯享网,很高兴认识大家。
基本用法

检查 ch 中有没有数据

select { 
    case d <- ch: default: } 

讯享网

读取已经被 close 掉的 ch 时会返回零值,不会报错。因此在使用for + select的时候要格外注意。

讯享网for { 
    select { 
    case d <- ch: default: } } 

d会永远都有值,for会永远执行下去。需要做一些优化。

outer: for { 
    select { 
    case d, ok := <-ch1: if !ok { 
    break outer } case d, ok := <-ch2: if !ok { 
    break outer } } } 

对于监听单个 ch 的,可以使用for + range代替。因为一旦 ch 被关闭,它会退出循环。

讯享网for d := range ch { 
    } 

select 监听多个 ch 的时候,执行顺序时随机的。

源码部分

关于channel源码解析 golang的channel实现原理。

src/runtime/chan.go

// compiler implements // // select { 
    // case c <- v: // ... foo // default: // ... bar // } // // as // // if selectnbsend(c, v) { 
    // ... foo // } else { 
    // ... bar // } // func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { 
    return chansend(c, elem, false, getcallerpc()) } // compiler implements // // select { 
    // case v, ok = <-c: // ... foo // default: // ... bar // } // // as // // if selected, ok = selectnbrecv(&v, c); selected { 
    // ... foo // } else { 
    // ... bar // } // func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { 
    return chanrecv(c, elem, false) } func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 

src/runtime/select.go

讯享网type scase struct { 
    c *hchan // chan elem unsafe.Pointer // data element } type selectDir int const ( _ selectDir = iota selectSend // case Chan <- Send selectRecv // case <-Chan: selectDefault // default ) func reflect_rselect(cases []runtimeSelect) (int, bool) { 
    if len(cases) == 0 { 
    block() } sel := make([]scase, len(cases)) orig := make([]int, len(cases)) nsends, nrecvs := 0, 0 dflt := -1 for i, rc := range cases { 
    var j int switch rc.dir { 
    case selectDefault: dflt = i continue case selectSend: j = nsends nsends++ case selectRecv: nrecvs++ j = len(cases) - nrecvs } // sel 中,将 send 排在前面,recv 排在后面 // 因此需要通过 orig 来辅助定位 sel[j] = scase{ 
   c: rc.ch, elem: rc.val} orig[j] = i } // 只有 default case. if nsends+nrecvs == 0 { 
    return dflt, false } // 只保留 send 和 recv if nsends+nrecvs < len(cases) { 
    copy(sel[nsends:], sel[len(cases)-nrecvs:]) copy(orig[nsends:], orig[len(cases)-nrecvs:]) } // order 为空切片,长度是 2*(nsends+nrecvs) order := make([]uint16, 2*(nsends+nrecvs)) // 竞争探测用的 var pc0 *uintptr if raceenabled { 
    pcs := make([]uintptr, nsends+nrecvs) for i := range pcs { 
    selectsetpc(&pcs[i]) } pc0 = &pcs[0] } chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1) // Translate chosen back to caller's ordering. if chosen < 0 { 
    chosen = dflt } else { 
    chosen = orig[chosen] } return chosen, recvOK } // selectgo implements the select statement. // // cas0 points to an array of type [ncases]scase, and order0 points to // an array of type [2*ncases]uint16 where ncases must be <= 65536. // Both reside on the goroutine's stack (regardless of any escaping in // selectgo). // // For race detector builds, pc0 points to an array of type // [ncases]uintptr (also on the stack); for other builds, it's set to // nil. // // selectgo returns the index of the chosen scase, which matches the // ordinal position of its respective select{recv,send,default} call. // Also, if the chosen scase was a receive operation, it reports whether // a value was received. func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { 
    if debugSelect { 
    print("select: cas0=", cas0, "\n") } // NOTE: In order to maintain a lean stack size, the number of scases // is capped at 65536. // cas0 为指向切片[]scase的首地址,通过unsafe.Pointer将其转化为 *[65536]scase,超出部分被舍弃 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) // 同样的道理,order1为 *[]uint16 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // 实际长度 ncases := nsends + nrecvs // 截取实际的长度,从0开始,截取ncases个,并且cap为ncases scases := cas1[:ncases:ncases] // 截取实际的长度 pollorder := order1[:ncases:ncases] // 首先取 order1[ncases:],也就是 order1 的后半部分,得到新切片后,再来取 [:ncases:ncases] lockorder := order1[ncases:][:ncases:ncases] // NOTE: pollorder/lockorder's underlying array was not zero-initialized by compiler. // Even when raceenabled is true, there might be select // statements in packages compiled without -race (e.g., // ensureSigM in runtime/signal_unix.go). var pcs []uintptr if raceenabled && pc0 != nil { 
    pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0)) pcs = pc1[:ncases:ncases] } casePC := func(casi int) uintptr { 
    if pcs == nil { 
    return 0 } return pcs[casi] } var t0 int64 if blockprofilerate > 0 { 
    t0 = cputicks() } // The compiler rewrites selects that statically have // only 0 or 1 cases plus default into simpler constructs. // The only way we can end up with such small sel.ncase // values here is for a larger select in which most channels // have been nilled out. The general code handles those // cases correctly, and they are rare enough not to bother // optimizing (and needing to test). // generate permuted order norder := 0 for i := range scases { 
    cas := &scases[i] // Omit cases without channels from the poll and lock orders. // 如果 cas 中没有 ch,就过滤掉 if cas.c == nil { 
    cas.elem = nil // allow GC continue } // 随机打乱顺序 j := fastrandn(uint32(norder + 1)) pollorder[norder] = pollorder[j] // 给 pollorder 赋值 pollorder[j] = uint16(i) norder++ } // 重新截取切片 pollorder = pollorder[:norder] lockorder = lockorder[:norder] // sort the cases by Hchan address to get the locking order. // simple heap sort, to guarantee n log n time and constant stack footprint. // 使用 Hchan 的地址来对 cases 进行排序 // 简单的堆排序,确保查找时间为 logn,和恒定的堆栈占用量 for i := range lockorder { 
    j := i // Start with the pollorder to permute cases on the same channel. c := scases[pollorder[i]].c for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { 
    k := (j - 1) / 2 lockorder[j] = lockorder[k] j = k } lockorder[j] = pollorder[i] } for i := len(lockorder) - 1; i >= 0; i-- { 
    o := lockorder[i] c := scases[o].c lockorder[i] = lockorder[0] j := 0 for { 
    k := j*2 + 1 if k >= i { 
    break } if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() { 
    k++ } if c.sortkey() < scases[lockorder[k]].c.sortkey() { 
    lockorder[j] = lockorder[k] j = k continue } break } lockorder[j] = o } if debugSelect { 
    for i := 0; i+1 < len(lockorder); i++ { 
    if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() { 
    print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n") throw("select: broken sort") } } } // lock all the channels involved in the select // 锁住所有的 chan sellock(scases, lockorder) var ( gp *g sg *sudog c *hchan k *scase sglist *sudog sgnext *sudog qp unsafe.Pointer nextp **sudog ) // pass 1 - look for something already waiting // 看是否有就绪的读或写chan var casi int var cas *scase var caseSuccess bool var caseReleaseTime int64 = -1 var recvOK bool for _, casei := range pollorder { 
    casi = int(casei) cas = &scases[casi] c = cas.c if casi >= nsends { 
    sg = c.sendq.dequeue() if sg != nil { 
    goto recv } if c.qcount > 0 { 
    goto bufrecv } if c.closed != 0 { 
    goto rclose } } else { 
    if raceenabled { 
    racereadpc(c.raceaddr(), casePC(casi), chansendpc) } if c.closed != 0 { 
    goto sclose } sg = c.recvq.dequeue() if sg != nil { 
    goto send } if c.qcount < c.dataqsiz { 
    goto bufsend } } } if !block { 
    selunlock(scases, lockorder) casi = -1 goto retc } // pass 2 - enqueue on all chans // 没有就绪的chan,那么就需要将当前G加入所有chan的sendq或recvq中去,并挂起 gp = getg() if gp.waiting != nil { 
    throw("gp.waiting != nil") } nextp = &gp.waiting for _, casei := range lockorder { 
    casi = int(casei) cas = &scases[casi] c = cas.c sg := acquireSudog() sg.g = gp sg.isSelect = true // No stack splits between assigning elem and enqueuing // sg on gp.waiting where copystack can find it. sg.elem = cas.elem sg.releasetime = 0 if t0 != 0 { 
    sg.releasetime = -1 } sg.c = c // Construct waiting list in lock order. *nextp = sg nextp = &sg.waitlink if casi < nsends { 
    c.sendq.enqueue(sg) } else { 
    c.recvq.enqueue(sg) } } // wait for someone to wake us up gp.param = nil // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) gp.activeStackChans = false sellock(scases, lockorder) gp.selectDone = 0 sg = (*sudog)(gp.param) gp.param = nil // pass 3 - dequeue from unsuccessful chans // otherwise they stack up on quiet channels // record the successful case, if any. // We singly-linked up the SudoGs in lock order. casi = -1 cas = nil caseSuccess = false sglist = gp.waiting // Clear all elem before unlinking from gp.waiting. for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { 
    sg1.isSelect = false sg1.elem = nil sg1.c = nil } gp.waiting = nil for _, casei := range lockorder { 
    k = &scases[casei] if sg == sglist { 
    // sg has already been dequeued by the G that woke us up. casi = int(casei) cas = k caseSuccess = sglist.success if sglist.releasetime > 0 { 
    caseReleaseTime = sglist.releasetime } } else { 
    c = k.c if int(casei) < nsends { 
    c.sendq.dequeueSudoG(sglist) } else { 
    c.recvq.dequeueSudoG(sglist) } } sgnext = sglist.waitlink sglist.waitlink = nil releaseSudog(sglist) sglist = sgnext } if cas == nil { 
    throw("selectgo: bad wakeup") } c = cas.c if debugSelect { 
    print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n") } if casi < nsends { 
    if !caseSuccess { 
    goto sclose } } else { 
    recvOK = caseSuccess } if raceenabled { 
    if casi < nsends { 
    raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } else if cas.elem != nil { 
    raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) } } if msanenabled { 
    if casi < nsends { 
    msanread(cas.elem, c.elemtype.size) } else if cas.elem != nil { 
    msanwrite(cas.elem, c.elemtype.size) } } if asanenabled { 
    if casi < nsends { 
    asanread(cas.elem, c.elemtype.size) } else if cas.elem != nil { 
    asanwrite(cas.elem, c.elemtype.size) } } selunlock(scases, lockorder) goto retc bufrecv: // can receive from buffer if raceenabled { 
    if cas.elem != nil { 
    raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) } racenotify(c, c.recvx, nil) } if msanenabled && cas.elem != nil { 
    msanwrite(cas.elem, c.elemtype.size) } if asanenabled && cas.elem != nil { 
    asanwrite(cas.elem, c.elemtype.size) } recvOK = true qp = chanbuf(c, c.recvx) if cas.elem != nil { 
    typedmemmove(c.elemtype, cas.elem, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { 
    c.recvx = 0 } c.qcount-- selunlock(scases, lockorder) goto retc bufsend: // can send to buffer if raceenabled { 
    racenotify(c, c.sendx, nil) raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { 
    msanread(cas.elem, c.elemtype.size) } if asanenabled { 
    asanread(cas.elem, c.elemtype.size) } typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) c.sendx++ if c.sendx == c.dataqsiz { 
    c.sendx = 0 } c.qcount++ selunlock(scases, lockorder) goto retc recv: // can receive from sleeping sender (sg) recv(c, sg, cas.elem, func() { 
    selunlock(scases, lockorder) }, 2) if debugSelect { 
    print("syncrecv: cas0=", cas0, " c=", c, "\n") } recvOK = true goto retc rclose: // read at end of closed channel selunlock(scases, lockorder) recvOK = false if cas.elem != nil { 
    typedmemclr(c.elemtype, cas.elem) } if raceenabled { 
    raceacquire(c.raceaddr()) } goto retc send: // can send to a sleeping receiver (sg) if raceenabled { 
    raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { 
    msanread(cas.elem, c.elemtype.size) } if asanenabled { 
    asanread(cas.elem, c.elemtype.size) } send(c, sg, cas.elem, func() { 
    selunlock(scases, lockorder) }, 2) if debugSelect { 
    print("syncsend: cas0=", cas0, " c=", c, "\n") } goto retc retc: if caseReleaseTime > 0 { 
    blockevent(caseReleaseTime-t0, 1) } return casi, recvOK sclose: // send on closed channel selunlock(scases, lockorder) panic(plainError("send on closed channel")) } 

select执行过程就是输入cases数组,输出选中的case索引,然后程序流程转到选中的case块。

如果cases为空,那么当前协程将会被永远forever挂起,让出执行权利。因为Golang自带死锁检测机制,当发现当前协程再也没有机会被唤醒时,则会panic。


讯享网

func block() { 
    gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // forever } 

也就是说,select{}的写法是会报错的,但是特殊情况

讯享网func main() { 
    go func() { 
    for i := 0; i < 10; i++ { 
    fmt.Println(i) } os.Exit(0) }() select{ 
   } } 

程序可以正常运行。

cas0为scase数组的首地址。

order0为一个两倍cas0数组长度的buffer,pollorder和lockorder
1、pollorder:每次selectgo执行都会把pollorder序列打乱,以达到随机检测case的目的。
2、lockorder:所有case语句中channel的地址来排序,构成一个简单的堆排序,也可以达到去重防止对channel加锁时重复加锁的目的,所以 sellock() 函数要带上 lockorder 参数。

函数返回值:

1、int:选中case的编号,这个case编号跟代码一致。

2、bool:如果是读操作,则表示是否成功从channle中读取了数据。

大致原理:

从语法层面,将所有的case存到一个数组中,然后抽象出两个轻量级的数组pollorder和lockorder,用来遍历,然后就是判断读操作是否能读(缓冲队列是否有值,sendq是否有sudog),判断写操作是否可写(缓冲队列是否有值,recvq是否有sudog),如果都没有,那就将当前G加入到所有通道的sendq或者recvq;然后gopack挂起,直到被唤醒了,则说明,有一条case被激活了,然后去找到是哪条case,返回对于的ID。

小讯
上一篇 2025-03-31 07:39
下一篇 2025-03-31 10:38

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/129064.html