深入理解Golangchannel的應(yīng)用_第1頁
深入理解Golangchannel的應(yīng)用_第2頁
深入理解Golangchannel的應(yīng)用_第3頁
深入理解Golangchannel的應(yīng)用_第4頁
深入理解Golangchannel的應(yīng)用_第5頁
已閱讀5頁,還剩13頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

第深入理解Golangchannel的應(yīng)用目錄前言整體結(jié)構(gòu)創(chuàng)建發(fā)送接收關(guān)閉

前言

channel是用于goroutine之間的同步、通信的數(shù)據(jù)結(jié)構(gòu)

channel的底層是通過mutex來控制并發(fā)的,但它為程序員提供了更高一層次的抽象,封裝了更多的功能,這樣并發(fā)編程變得更加容易和安全,得以讓程序員把注意力留到業(yè)務(wù)上去,提升開發(fā)效率

channel的用途包括但不限于以下幾點:

協(xié)程間通信,同步定時任務(wù):和timer結(jié)合解耦生產(chǎn)方和消費方,實現(xiàn)阻塞隊列控制并發(fā)數(shù)

本文將介紹channel的底層原理,包括數(shù)據(jù)結(jié)構(gòu),channel的創(chuàng)建,發(fā)送,接收,關(guān)閉的實現(xiàn)邏輯

整體結(jié)構(gòu)

Gochannel的數(shù)據(jù)結(jié)構(gòu)如下所示:

typehchanstruct{

qcountuint//totaldatainthequeue

dataqsizuint//sizeofthecircularqueue

bufunsafe.Pointer//pointstoanarrayofdataqsizelements

elemsizeuint16

closeduint32

elemtype*_type//elementtype

sendxuint//sendindex

recvxuint//receiveindex

recvqwaitq//listofrecvwaiters

sendqwaitq//listofsendwaiters

lockmutex

qcount:已經(jīng)存儲了多少個元素

dataqsie:最多存儲多少個元素,即緩沖區(qū)容量

buf:指向緩沖區(qū)的位置,實際上是一個數(shù)組

elemsize:每個元素占多大空間

closed:channel能夠關(guān)閉,這里記錄其關(guān)閉狀態(tài)

elemtype:保存數(shù)據(jù)的類型信息,用于go運行時使用

sendx,recvx:

記錄下一個要發(fā)送到的位置,下一次從哪里還是接收這里用數(shù)組模擬隊列,這兩個變量即表示隊列的隊頭,隊尾因此channel的緩沖也被稱為環(huán)形緩沖區(qū)

recvq,sendq:

當發(fā)送個接收不能立即完成時,需要讓協(xié)程在channel上等待,所以有兩個等待隊列,分別針對接收和發(fā)送

lock:channel支持協(xié)程間并發(fā)訪問,因此需要一把鎖來保護

創(chuàng)建

創(chuàng)建channel會被編譯器編譯為調(diào)用makechan函數(shù)

//無緩沖通道

ch1:=make(chanint)

//有緩沖通道

ch2:=make(chanint,10)

會根據(jù)創(chuàng)建的是帶緩存,還是無緩沖,決定第二個參數(shù)size的值

可以看出,創(chuàng)建出來的是hchan指針,這樣就能在函數(shù)間直接傳遞channel,而不用傳遞channel的指針

funcmakechan(t*chantype,sizeint)*hchan{

elem:=t.elem

//mem:緩沖區(qū)大小

mem,overflow:=math.MulUintptr(elem.size,uintptr(size))

ifoverflow||memmaxAlloc-hchanSize||size0{

panic(plainError("makechan:sizeoutofrange"))

varc*hchan

switch{

//緩沖區(qū)大小為空,只申請hchanSize大小的內(nèi)存

casemem==0:

c=(*hchan)(mallocgc(hchanSize,nil,true))

c.buf=c.raceaddr()

//元素類型不包含指針,一次性分配hchanSize+mem大小的內(nèi)存

caseelem.ptrdata==0:

c=(*hchan)(mallocgc(hchanSize+mem,nil,true))

c.buf=add(unsafe.Pointer(c),hchanSize)

//否則就是帶緩存,且有指針,分配兩次內(nèi)存

default:

//Elementscontainpointers.

c=new(hchan)

c.buf=mallocgc(mem,elem,true)

//保存元素類型,元素大小,容量

c.elemsize=uint16(elem.size)

c.elemtype=elem

c.dataqsiz=uint(size)

lockInit(c.lock,lockRankHchan)

returnc

發(fā)送

執(zhí)行以下代碼時:

ch-3

編譯器會轉(zhuǎn)化為對chansend的調(diào)用

funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{

//如果channel是空

ifc==nil{

//非阻塞,直接返回

if!block{

returnfalse

//否則阻塞當前協(xié)程

gopark(nil,nil,waitReasonChanSendNilChan,traceEvGoStop,2)

throw("unreachable")

//非阻塞,沒有關(guān)閉,且容量滿了,無法發(fā)送,直接返回

if!blockc.closed==0full(c){

returnfalse

//加鎖

lock(c.lock)

//如果已經(jīng)關(guān)閉,無法發(fā)送,直接panic

ifc.closed!=0{

unlock(c.lock)

panic(plainError("sendonclosedchannel"))

//從接收隊列彈出一個協(xié)程的包裝結(jié)構(gòu)sudog

ifsg:=c.recvq.dequeue();sg!=nil{

//如果能彈出,即有等到接收的協(xié)程,說明:

//該channel要么是無緩沖,要么緩沖區(qū)為空,不然不可能有協(xié)程在等待

//將要發(fā)送的數(shù)據(jù)拷貝到該協(xié)程的接收指針上

send(c,sg,ep,func(){unlock(c.lock)},3)

returntrue

//緩沖區(qū)還有空間

ifc.qcountc.dataqsiz{

//qp:計算要發(fā)送到的位置的地址

qp:=chanbuf(c,c.sendx)

//將數(shù)據(jù)從ep拷貝到qp

typedmemmove(c.elemtype,qp,ep)

//待發(fā)送位置移動

c.sendx++

//由于是數(shù)組模擬隊列,sendx到頂了需要歸零

ifc.sendx==c.dataqsiz{

c.sendx=0

//緩沖區(qū)數(shù)量++

c.qcount++

unlock(c.lock)

returntrue

//往下就是緩沖區(qū)無數(shù)據(jù),也沒有等到接收協(xié)程的情況了

//如果是非阻塞模式,直接返回

if!block{

unlock(c.lock)

returnfalse

//將當前協(xié)程包裝成sudog,阻塞到channel上

gp:=getg()

mysg:=acquireSudog()

mysg.releasetime=0

ift0!=0{

mysg.releasetime=-1

mysg.elem=ep

mysg.waitlink=nil

mysg.g=gp

mysg.isSelect=false

mysg.c=c

gp.waiting=mysg

gp.param=nil

//當前協(xié)程進入發(fā)送等待隊列

c.sendq.enqueue(mysg)

atomic.Store8(gp.parkingOnChan,1)

gopark(chanparkcommit,unsafe.Pointer(c.lock),waitReasonChanSend,traceEvGoBlockSend,2)

//被喚醒后從這里開始執(zhí)行

KeepAlive(ep)

ifmysg!=gp.waiting{

throw("Gwaitinglistiscorrupted")

gp.waiting=nil

gp.activeStackChans=false

closed:=!mysg.success

gp.param=nil

ifmysg.releasetime0{

blockevent(mysg.releasetime-t0,2)

mysg.c=nil

releaseSudog(mysg)

//被喚醒后發(fā)現(xiàn)channel關(guān)閉了,panic

ifclosed{

ifc.closed==0{

throw("chansend:spuriouswakeup")

panic(plainError("sendonclosedchannel"))

returntrue

整體流程為:

如果當前操作為非阻塞,channel沒有關(guān)閉,且容量滿了,無法發(fā)送,直接返回

從接收隊列彈出一個協(xié)程的包裝結(jié)構(gòu)sudog,如果能彈出,即有等到接收的協(xié)程,說明:

該channel要么是無緩沖,要么緩沖區(qū)為空,不然不可能有協(xié)程在等待將要發(fā)送的數(shù)據(jù)拷貝到該協(xié)程的接收指針上,返回這里直接從發(fā)送者拷貝到接收者的內(nèi)存,而不是先把數(shù)據(jù)拷貝到緩沖區(qū),再從緩沖區(qū)拷貝到接收者,節(jié)約了一次內(nèi)存拷貝

否則看看緩沖區(qū)還有空間,如果有,將數(shù)據(jù)拷貝到緩沖區(qū)上,也返回

接下來就是既沒有接收者等待,緩沖區(qū)也為空的情況,就需要將當前協(xié)程包裝成sudog,阻塞到channel上

將協(xié)程阻塞到channel的等待隊列時,將其包裝成了sudog結(jié)構(gòu):

typesudogstruct{

//協(xié)程

g*g

//前一個,后一個指針

next*sudog

prev*sudog

//等到發(fā)送的數(shù)據(jù)在哪,等待從哪個位置接收數(shù)據(jù)

elemunsafe.Pointer

acquiretimeint64

releasetimeint64

ticketuint32

isSelectbool

successbool

parent*sudog//semaRootbinarytree

waitlink*sudog//g.waitinglistorsemaRoot

waittail*sudog//semaRoot

//在哪個channel上等待

c*hchan//channel

其目的是:

g本身沒有存儲前一個,后一個指針,需要用sudog結(jié)構(gòu)包裝才能加入隊列elem字段存儲等到發(fā)送的數(shù)據(jù)在哪,等待從哪個位置接收數(shù)據(jù),用于從數(shù)據(jù)能從協(xié)程到協(xié)程的直接拷貝

來看看一些子函數(shù):

1.判斷channel是否是滿的

funcfull(c*hchan)bool{

//無緩沖

ifc.dataqsiz==0{

//并且沒有其他協(xié)程在等待

returnc.recvq.first==nil

//有緩沖,但容量裝滿了

returnc.qcount==c.dataqsiz

2.send方法:

/**

c:要操作的channel

sg:彈出的接收者協(xié)程

ep:要發(fā)送的數(shù)據(jù)在的位置

funcsend(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){

//如果接收者指針不為空,直接把數(shù)據(jù)從ep拷貝到sg.elem

ifsg.elem!=nil{

sendDirect(c.elemtype,sg,ep)

sg.elem=nil

gp:=sg.g

unlockf()

gp.param=unsafe.Pointer(sg)

sg.success=true

ifsg.releasetime!=0{

sg.releasetime=cputicks()

//喚醒該接收者協(xié)程

goready(gp,skip+1)

接收

從channel中接收數(shù)據(jù)有幾種寫法:

帶不帶ok接不接收返回值

根據(jù)帶不帶ok,決定用下面哪個方法

funcchanrecv1(c*hchan,elemunsafe.Pointer){

chanrecv(c,elem,true)

funcchanrecv2(c*hchan,elemunsafe.Pointer)(receivedbool){

_,received=chanrecv(c,elem,true)

return

根據(jù)接不接收返回值,決定elem是不是nil

最終都會調(diào)用chanrecv方法:

funcchanrecv(c*hchan,epunsafe.Pointer,blockbool)(selected,receivedbool){

//如果channel為nil,根據(jù)參數(shù)中是否阻塞來決定是否阻塞

ifc==nil{

if!block{

return

gopark(nil,nil,waitReasonChanReceiveNilChan,traceEvGoStop,2)

throw("unreachable")

//非阻塞,并且channel為空

if!blockempty(c){

//如果還沒關(guān)閉,直接返回

ifatomic.Load(c.closed)==0{

return

//否則已經(jīng)關(guān)閉,

//如果為空,返回該類型的零值

ifempty(c){

ifep!=nil{

typedmemclr(c.elemtype,ep)

returntrue,false

lock(c.lock)

//同樣,如果channel已經(jīng)關(guān)閉,且緩沖區(qū)沒有元素,返回該類型零值

ifc.closed!=0c.qcount==0{

unlock(c.lock)

ifep!=nil{

typedmemclr(c.elemtype,ep)

returntrue,false

//如果有發(fā)送者正在阻塞,說明:

//1.無緩沖

//2.有緩沖,但緩沖區(qū)滿了。因為只有緩沖區(qū)滿了,才可能有發(fā)送者在等待

ifsg:=c.sendq.dequeue();sg!=nil{

//將數(shù)據(jù)從緩沖區(qū)拷貝到ep,再將sg的數(shù)據(jù)拷貝到緩沖區(qū),該函數(shù)詳細流程可看下文

recv(c,sg,ep,func(){unlock(c.lock)},3)

returntrue,true

//如果緩存區(qū)有數(shù)據(jù),

ifc.qcount0{

//qp為緩沖區(qū)中下一次接收的位置

qp:=chanbuf(c,c.recvx)

//將數(shù)據(jù)從qp拷貝到ep

ifep!=nil{

typedmemmove(c.elemtype,ep,qp)

typedmemclr(c.elemtype,qp)

c.recvx++

ifc.recvx==c.dataqsiz{

c.recvx=0

c.qcount--

unlock(c.lock)

returntrue,true

//接下來就是既沒有發(fā)送者在等待,也緩沖區(qū)也沒數(shù)據(jù)

if!block{

unlock(c.lock)

returnfalse,false

//將當前協(xié)程包裝成sudog,阻塞到channel中

gp:=getg()

mysg:=acquireSudog()

mysg.releasetime=0

ift0!=0{

mysg.releasetime=-1

//記錄接收地址

mysg.elem=ep

mysg.waitlink=nil

gp.waiting=mysg

mysg.g=gp

mysg.isSelect=false

mysg.c=c

gp.param=nil

c.recvq.enqueue(mysg)

atomic.Store8(gp.parkingOnChan,1)

gopark(chanparkcommit,unsafe.Pointer(c.lock),waitReasonChanReceive,traceEvGoBlockRecv,2)

//從這里喚醒

ifmysg!=gp.waiting{

throw("Gwaitinglistiscorrupted")

gp.waiting=nil

gp.activeStackChans=false

ifmysg.releasetime0{

blockevent(mysg.releasetime-t0,2)

success:=mysg.success

gp.param=nil

mysg.c=nil

releaseSudog(mysg)

returntrue,success

接收流程如為:

如果channel為nil,根據(jù)參數(shù)中是否阻塞來決定是否阻塞

如果channel已經(jīng)關(guān)閉,且緩沖區(qū)沒有元素,返回該類型零值

如果有發(fā)送者正在阻塞,說明:

要么是無緩沖有緩沖,但緩沖區(qū)滿了。因為只有緩沖區(qū)滿了,才可能有發(fā)送者在等待將數(shù)據(jù)從緩沖區(qū)拷貝到ep,再將發(fā)送者的數(shù)據(jù)拷貝到緩沖區(qū),并喚該發(fā)送者

如果緩存區(qū)有數(shù)據(jù),則從緩沖區(qū)將數(shù)據(jù)復(fù)制到ep,返回

接下來就是既沒有發(fā)送者在等待,也緩沖區(qū)也沒數(shù)據(jù)的情況:

將當前協(xié)程包裝成sudog,阻塞到channel中

來看其中的子函數(shù)recv():

/**

c:操作的channel

sg:阻塞的發(fā)送協(xié)程

ep:接收者接收數(shù)據(jù)的地址

funcrecv(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){

//如果是無緩沖channel,直接將數(shù)據(jù)從發(fā)送者sg拷貝到ep

ifc.dataqsiz==0{

ifep!=nil{

recvDirect(c.elemtype,sg,ep)

//接下來是有緩沖,且緩沖區(qū)滿的情況

}else{

//qp為channel緩沖區(qū)中,接收者下一次接收的地址

qp:=chanbuf(c,c.recvx)

//將數(shù)據(jù)從qp拷貝到ep

ifep!=nil{

typedmemmove(c.elemtype,ep,qp)

//將發(fā)送者的數(shù)據(jù)從sg.elem拷貝到qp

typedmemmove(c.elemtype,qp,sg.elem)

c.recvx++

ifc.recvx==c.dataqsiz{

c.recvx=0

//由于一接收已發(fā)送,緩沖區(qū)還是滿的,因此c.sendx=c.recvx

c.sendx=c.recvx

sg.elem=nil

gp:=sg.g

unlockf()

gp.param=unsafe.Pointer(sg)

sg.success=true

ifsg.releasetime!=0{

sg.releasetime=cputicks()

//喚醒發(fā)送者

goready(gp,skip+1)

關(guān)閉

funcclosechan(c*hchan){

//不能關(guān)閉空channel

ifc==nil{

panic(plainError("closeofnilchannel"))

lock(c.lock)

//不能重復(fù)關(guān)閉

ifc.closed!=0{

unlock(c.lock)

panic(plainError("closeofclosedchannel"))

//修改關(guān)閉狀態(tài)

c.close

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論