首页>>后端>>Golang->图文并茂:彻底理解Go中的Chan同步原理.

图文并茂:彻底理解Go中的Chan同步原理.

时间:2023-11-30 本站 点击:0

深入Chan底层源码进行分析Chan工作原理。

学习比较枯草,但贵在坚持. 有关于源码的理解我都已经写在源码中的注释中了。Go源码看起来要比Java舒服多了,仅仅只是对于环境上,哈哈哈,Java 看个Spring源码麻烦的一批。

一、向Chan中发送数据

1.1 总览全局:chan 发送数据的几个步骤

第一种情况:chan关闭状态,直接panic,表示不能向已经关闭状态的chan读取数据

第二种情况:直接查看当前chan中等待接受数据的等待队列是否为空,如果不为空直接发送

第三种情况:查看当前与chan相关的环形队列是否为空,如果不为空则将当前要进行发送的数据放入进去

第四种情况:如果是非阻塞状态直接进行return,如果是阻塞状态,那么就将当前的goroutine添加到chan关联的等待发送数据的等待队列中

1.2 源码分析.

不管是通过x<-1进行发送还是通过select关键字发送,其最后调用runtime.chansend1,而该方法内部就是对函数chansend的一次调用. 所以说chan发送数据最终就是通过chansend函数来进行发送。

funcchansend1(c*hchan,elemunsafe.Pointer){chansend(c,elem,true,getcallerpc())}

下面我们先看看整体的源码。

下面就是我们的源码分析, 这里我会把无关的源码和注释清理掉

funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{//chan发送数据时一定会调用的方法.ifc==nil{if!block{returnfalse}gopark(nil,nil,waitReasonChanSendNilChan,traceEvGoStop,2)throw("unreachable")}if!block&&c.closed==0&&full(c){//非阻塞.returnfalse}lock(&c.lock)//0应该是未关闭状态.ifc.closed!=0{unlock(&c.lock)panic(plainError("sendonclosedchannel"))}//第一步:不管环形队列中是否有空闲的位置,我现在进行发送,如果有等待接收的goroutine,那么我直接将当前要进行发送的数据发送到等待队列中的G.ifsg:=c.recvq.dequeue();sg!=nil{send(c,sg,ep,func(){unlock(&c.lock)},3)returntrue}//第二步:当不存在等待接收的G时,并且环形队列中存在剩余的空间,这个时候直接写入Chan的缓冲区.ifc.qcount<c.dataqsiz{//Spaceisavailableinthechannelbuffer.Enqueuetheelementtosend.//计算可以进行存储的下标.qp:=chanbuf(c,c.sendx)ifraceenabled{racenotify(c,c.sendx,nil)}//将发送的数据拷贝到缓冲区.//ep就是本次要进行发送的数据.typedmemmove(c.elemtype,qp,ep)//移动下标.c.sendx++ifc.sendx==c.dataqsiz{c.sendx=0}//增加count.c.qcount++unlock(&c.lock)returntrue}//是否为阻塞.//使用select关键字可以向Chan非阻塞的发送消息.//select关键字触发的chan写block为false.//阻塞的话,会让出CPU等待时间.if!block{//select关键字会走这个.//非阻塞的话,CPU不会让出执行时间.unlock(&c.lock)returnfalse}//等待队列中既没有可以接受的goroutine.//环形队列又没有可以存储数据的剩余空间,这个时候将当前的goroutine添加到等待发送队列.//然后进行gopack.//下面实现起来就很复杂.//Blockonthechannel.Somereceiverwillcompleteouroperationforus.//getg获取当前发送数据使用的Goroutine.gp:=getg()//获取表示当前G的结构体:runtime.sudog.并设置这一次阻塞发送的相关消息.mysg:=acquireSudog()mysg.releasetime=0ift0!=0{mysg.releasetime=-1}mysg.elem=epmysg.waitlink=nilmysg.g=gpmysg.isSelect=falsemysg.c=c//将刚刚创建的sudog加入发送等待队列,并设置当前的Goroutine的waiting上.gp.waiting=mysggp.param=nil//将表示当前的Goroutine加入到当前Chan中的等待发送队列中.c.sendq.enqueue(mysg)atomic.Store8(&gp.parkingOnChan,1)//将当前的goroutine陷入沉睡并且等待唤醒.gopark(chanparkcommit,unsafe.Pointer(&c.lock),waitReasonChanSend,traceEvGoBlockSend,2)//Ensurethevaluebeingsentiskeptaliveuntilthe//receivercopiesitout.Thesudoghasapointertothe//stackobject,butsudogsaren'tconsideredasrootsofthe//stacktracer.KeepAlive(ep)//表示当前的goroutine被唤醒之后汇之星一些收尾工作,将一些属性置为零值,并且释放runtime.sudog结构体.//sudog结构体应该和goroutine是有关联的.//someonewokeusup.ifmysg!=gp.waiting{throw("Gwaitinglistiscorrupted")}//下面就是无关代码.gp.waiting=nilgp.activeStackChans=falseclosed:=!mysg.successgp.param=nilifmysg.releasetime>0{blockevent(mysg.releasetime-t0,2)}mysg.c=nilreleaseSudog(mysg)ifclosed{ifc.closed==0{throw("chansend:spuriouswakeup")}panic(plainError("sendonclosedchannel"))}returntrue}

二、从Chan中读取数据.

2.1总览全局:从chan中接受数据

第一种情况:非阻塞,缓存为空,直接return.

第二种情况:chan关闭,且缓存为空 return.

第三种情况:如果chan上,待发送的队列队头不为nil,直接获取该对头上的数据.

第四种情况:如果环形队列qcount不为0,则直接从环形队列上进行获取.

第五种情况:如果是非阻塞,那么直接return 如果是阻塞状态,那么将会进入等待接受数据的阻塞等待队列.

2.2 源码分析

Go 中通过chan发送数据,最终都会调用chanrecv函数来完成发送. 下面的代码我会删掉无用代码以及无用注释.

funcchanrecv(c*hchan,epunsafe.Pointer,blockbool)(selected,receivedbool){//chan接受数据.//chan结构体为nil和chan结构体关闭两种情况.ifc==nil{//block为true表示是阻塞状态.if!block{//非阻塞状态直接return,这里直接返回默认零值.return}//gopark表示睡眠.gopark(nil,nil,waitReasonChanReceiveNilChan,traceEvGoStop,2)throw("unreachable")}//block为true表示阻塞.//empty判断的是chan中的环形队列中的容器.//Fastpath:checkforfailednon-blockingoperationwithoutacquiringthelock.//如果是非阻塞,且缓存为空,那么直接return.if!block&&empty(c){//非阻塞且为空那么剩余的就应该是:非阻塞且不为空.阻塞的其他状态.//CAS加载chan是否为关闭状态.ifatomic.Load(&c.closed)==0{//非阻塞环形队列为空,且未关闭,直接returnfalse.return}//如果chan关闭.ifempty(c){//chan中的环形队列容量为0或者就是容量不为0,但是qcount为0.//Thechannelisirreversiblyclosedandempty.ifraceenabled{raceacquire(c.raceaddr())}//发送的数据不为nil.ifep!=nil{typedmemclr(c.elemtype,ep)}//有个问题:这里也没有判断发送队列是不是为空.//Go中应该只有select才会触发非阻塞.returntrue,false}}//代码能运行到这里就说要么是非阻塞且缓存不为空要么就是阻塞状态.vart0int64ifblockprofilerate>0{t0=cputicks()}lock(&c.lock)//closed为0表示未关闭.//chan关闭,但是缓存为空.ifc.closed!=0&&c.qcount==0{ifraceenabled{raceacquire(c.raceaddr())}unlock(&c.lock)ifep!=nil{typedmemclr(c.elemtype,ep)}returntrue,false}//能执行到这里的说明一定可以接受数据.ifsg:=c.sendq.dequeue();sg!=nil{//接受数据.recv(c,sg,ep,func(){unlock(&c.lock)},3)returntrue,true}//缓存不为空.ifc.qcount>0{//Receivedirectlyfromqueueqp:=chanbuf(c,c.recvx)ifraceenabled{racenotify(c,c.recvx,nil)}ifep!=nil{typedmemmove(c.elemtype,ep,qp)}typedmemclr(c.elemtype,qp)c.recvx++ifc.recvx==c.dataqsiz{c.recvx=0}c.qcount--unlock(&c.lock)returntrue,true}//如果是非阻塞状态,直接return.if!block{unlock(&c.lock)returnfalse,false}//nosenderavailable:blockonthischannel.gp:=getg()mysg:=acquireSudog()mysg.releasetime=0ift0!=0{mysg.releasetime=-1}//将要接受数据的goroutine添加到要接受数据的等待队列中.//Nostacksplitsbetweenassigningelemandenqueuingmysg//ongp.waitingwherecopystackcanfindit.mysg.elem=epmysg.waitlink=nilgp.waiting=mysgmysg.g=gpmysg.isSelect=falsemysg.c=cgp.param=nilc.recvq.enqueue(mysg)//Signaltoanyonetryingtoshrinkourstackthatwe'reabout//toparkonachannel.ThewindowbetweenwhenthisG'sstatus//changesandwhenwesetgp.activeStackChansisnotsafefor//stackshrinking.atomic.Store8(&gp.parkingOnChan,1)gopark(chanparkcommit,unsafe.Pointer(&c.lock),waitReasonChanReceive,traceEvGoBlockRecv,2)//someonewokeusupifmysg!=gp.waiting{throw("Gwaitinglistiscorrupted")}gp.waiting=nilgp.activeStackChans=falseifmysg.releasetime>0{blockevent(mysg.releasetime-t0,2)}success:=mysg.successgp.param=nilmysg.c=nilreleaseSudog(mysg)returntrue,success}

公众号:小马教你写Bug


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Golang/4578.html