深入解析Go语言Channel的底层实现与最佳实践
本文基于Go 1.24版本,从汇编和源码层面深入剖析Channel的内部机制,并探讨在实际开发中的正确使用方式。
前景提要
上文我们了解了golang中channel分类和用法,以及hchan数据结构。
但我们还不知道这些元素都有什么用,需要了解更多源码。
接下来我们会频繁用到一条查看汇编代码的指令:
go tool compile -S .xx.go
go tool compile
:- 这是 Go 工具链中的编译器命令
- 它负责将 Go 源代码编译成目标文件(通常是 .o 文件)
-S
标志:- 这个选项告诉编译器输出汇编代码
- 生成的汇编代码会显示在控制台中
- 不会生成最终的二进制可执行文件
.xx.go
:- 这是要编译的 Go 源文件
- 在 Windows 系统上使用反斜杠路径分隔符
- 文件名为 xx.go,位于当前目录
执行这条命令后,你会在控制台看到xx.go 文件编译生成的汇编代码输出,包括:
- 函数调用的汇编实现
- 数据结构的底层表示
- 各种操作的机器级指令
如果你想将汇编代码保存到文件,可以使用重定向:
go tool compile -S .xx.go >##.s
channel的创建机制
我们编写这样一段代码make.go
package main
func main() {
ch2 := make(chan int, 10)
ch2 <- 2
println(<-ch2)
}
在当前目录下使用命令:
go tool compile -S .make.go
会得到整个文件的汇编代码
我们先看前五行,也就是channel的创建
0x0000 00000 make.go:3) TEXT main.main(SB), ABIInternal, $40-0
0x0000 00000 make.go:3) CMPQ SP, 16(R14)
0x0004 00004 make.go:3) PCDATA $0, $-2
0x0004 00004 make.go:3) JLS 107
0x0006 00006 make.go:3) PCDATA $0, $-1
0x0006 00006 make.go:3) PUSHQ BP
0x0007 00007 make.go:3) MOVQ SP, BP
0x000a 00010 make.go:3) SUBQ $32, SP
0x000e 00014 make.go:3) FUNCDATA $0, gclocals·ISb46fRPFoZ9pIfykFK/kQ==(SB)
0x000e 00014 make.go:3) FUNCDATA $1, gclocals·jCgrU8XAg0ifiSJZPFgpKw==(SB)
0x000e 00014 make.go:5) LEAQ type:chan int(SB), AX
0x0015 00021 make.go:5) MOVL $10, BX
0x001a 00026 make.go:5) PCDATA $1, $0
0x001a 00026 make.go:5) CALL runtime.makechan(SB)
0x001f 00031 make.go:5) MOVQ AX, main.ch2+24(SP)
TEXT main.main(SB), ABIInternal, $40-0
:定义main.main
函数,栈帧大小为 40 字节($40),返回值大小为 0 字节(-0)。SUBQ $32, SP
:为局部变量分配 32 字节栈空间。以上是函数入口func main
LEAQ type:chan int(SB), AX
:将chan int
类型的指针加载到 AX 寄存器。MOVL $10, BX
:将通道容量 10 加载到 BX 寄存器。CALL runtime.makechan(SB)
:调用 Go 运行时的makechan
函数创建通道。MOVQ AX, main.ch2+24(SP)
:将创建的通道指针(AX)保存到栈上的ch2
变量位置。
关键就在于runtime.makechan函数,我们进入到源码找到这个函数,在https://github.com/golang/go/blob/master/src/runtime/chan.go中
func makechan(t *chantype, size int) *hchan
核心代码:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem // channel类型信息
// 此处省略安全性检查代码
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) // 计算缓冲区需要的总内存
var c *hchan
switch {
case mem == 0: // 无缓冲区
c = (*hchan)(mallocgc(hchanSize, nil, true))
/
c.buf = c.raceaddr()
case !elem.Pointers(): // 元素不包含指针
// 一次性分配hchan结构体和缓冲区内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 元素包含指针情况
c = new(hchan)
c.buf = mallocgc(mem, elem, true) //GC跟踪缓冲区中指针
}
// 初始化hchan字段
c.elemsize = uint16(elem.Size_) // 元素大小
c.elemtype = elem // 通道元素类型
c.dataqsiz = uint(size) // 循环数组长度
if b := getg().bubble; b != nil {
c.bubble = b
}
lockInit(&c.lock, lockRankHchan) // 初始化锁
return c
}
runtime.makechan
函数是Channel创建的核心,其主要逻辑包括:
- 计算所需内存大小
- 根据元素类型选择不同的内存分配策略:
- 无缓冲区:仅分配hchan结构体
- 元素不含指针:一次性分配hchan和缓冲区
- 元素含指针:分别分配hchan和缓冲区
- 初始化hchan各字段
- 初始化锁等同步原语
这种精细的内存管理策略确保了Channel在各种使用场景下的高效性。
channel如何发送数据
汇编代码:找第七行
0x0024 00036 make.go:7) LEAQ main..stmp_0(SB), BX
0x002b 00043 make.go:7) PCDATA $1, $1
0x002b 00043 make.go:7) CALL runtime.chansend1(SB)
发现主要是runtime的chansend1函数,源码中找到这个函数
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, sys.GetCallerPC())
}
调用了chansend函数,实现了 ch <- value 的逻辑
chansend函数:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 此处忽略一系列的安全性检查和快速路径检查
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁,锁住channel,并发安全
lock(&c.lock)
// 如果channel关闭
if c.closed != 0 {
unlock(&c.lock) // 解锁
panic(plainError("send on closed channel"))
}
// case1:有等待的接受者,接收队列不为空
if sg := c.recvq.dequeue(); sg != nil {
// 直接将数据传递给接收者
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// case2:缓冲区有空间
if c.qcount < c.dataqsiz {
// 将数据拷贝到缓冲区,从 ep 处拷贝到 qp
typedmemmove(c.elemtype, qp, ep)// qp 指向 buf 的 sendx 位置
// 更新缓冲区索引和计数
c.sendx++
c.qcount++
unlock(&c.lock) // 解锁
return true
}
// case3:channel不满足以上两个case,说明需要阻塞等待
gp := getg() // 获取当前 goroutine 的指针
mysg := acquireSudog() // 构造一个sudog
mysg.releasetime = 0
// 初始化sudog里面元素
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 加入发送队列
c.sendq.enqueue(mysg)
// 挂起当前goroutine
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
KeepAlive(ep)
// 从这里开始被唤醒了(channel 有机会可以发送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil // 去掉 mysg 上绑定的 channel
releaseSudog(mysg) // 释放sudog
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
} // goroutine唤醒,但channel突发关闭(耍猴)
panic(plainError("send on closed channel"))
}
return true
}
- 三种发送场景处理:
- 直接传递(有等待接收者)
- 缓冲写入(缓冲区有空位)
- 阻塞等待(缓冲区满)
- 内存安全:
typedmemmove
处理类型安全的拷贝KeepAlive
确保发送值在传输完成前不被回收
- 并发控制:
- 通过
lock
保护 channel 内部状态 - 使用
sudog
队列管理等待的 goroutines
- 通过
从channel接收数据的过程
接收操作有两种写法,
一种带 “ok”,反应 channel 是否关闭;
一种不带 “ok”,这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。
val := <-ch // 形式1
val, ok := <-ch // 形式2
从汇编代码分析接收数据主要对应两个函数
// entry points for <- c from compiled code.
// 不带ok,无返回值
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//带ok,返回bool类型的received
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
都调用了另一个主要函数chanrecv:
c *hchan
:目标 channel。ep unsafe.Pointer
:接收数据的目标内存地址,可为 nil(忽略接收到的数据)。block bool
:是否阻塞模式接收。
返回值 (selected, received bool)
含义分别为:
selected
:本次操作是否被选中(即操作是否已完成)。received
:是否真的收到了数据(false 可能是因为 channel 被关闭)。
// 负责从 channel c 接收数据,将数据写入 ep 指向的内存,或者根据条件决定是否阻塞、返回 channel 状态等
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// c是目标channel,c为nil
if c == nil {
if !block {
return // 非阻塞模式
}
// 阻塞模式,goroutinue挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
// 此处忽略同步逻辑,不分析
// 如果 channel 关联了定时器,先触发可能的定时操作。
if c.timer != nil {
c.timer.maybeRunChan(c)
}
// 快速路径: 非阻塞接收
// empty(c)能识别无缓存通道、定时通道、有缓存通道是否为空
if !block && empty(c) {
// 原子检查channel是否关闭
if atomic.Load(&c.closed) == 0 {
return // flase, false
}
// 若channel关闭且仍然无数据,清零 ep 所指数据,返回 (true, false)
if empty(c) {
...
if ep != nil {
typedmemclr(c.elemtype, ep) // 记住这里
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁进入主进程
lock(&c.lock)
// closed == 1 表明通道已经关闭
if c.closed != 0 {
// channel关闭且循环数组buf中没有元素
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// channel已经关闭,但是buf里面还有数据
} else {
// 如果存在等待发送方(sendq非空),与其配对
if sg := c.sendq.dequeue(); sg != nil {
// 无缓冲则直接收发,有缓冲则从缓冲头部读取,发送方的数据补到缓冲尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 缓冲区有数据
if c.qcount > 0 {
// 直接读取
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 非阻塞,也没有数据,直接返回俩false
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
// 阻塞情况,构造sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 把等待接收的数据地址保存下来
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 把新增的sudog加入到等待接收队列中(就是hchan结构体的recvq)
c.recvq.enqueue(mysg)
if c.timer != nil {
blockTimerChan(c)
}
// 声明即将阻塞在 channel 上,
//这样做的目的是让调度器、栈收缩、死锁检测等机制
//知道这个 goroutine 正在等待 channel,
//不要在这期间移动它的栈或做其他危险操作
gp.parkingOnChan.Store(true)
// 准备阻塞原因
reason := waitReasonChanReceive
if c.bubble != nil {
reason = waitReasonSynctestChanReceive
}
// 挂起当前 goroutine,等待唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)
// someone woke us up,被唤醒,开始收尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
接收操作核心流程
- 非阻塞、无数据、未关闭 → 立即返回。
- 已关闭、无数据 → 返回 selected=true, received=false。
- 优先与等待发送方直接配对。
- 若缓冲区有数据,直接从缓冲区读取。
- 阻塞模式、无数据、无发送方 → 当前 goroutine 入队、挂起,等待唤醒。
疑问
不是向已经关闭的channel读取会返回0值吗?以上代码并没有出现0的赋值,这是如何做到的?
答:回看以上的typedmemclr(c.elemtype, ep)函数
typedmemclr
是 Go runtime(运行时)中的一个底层函数,主要用于将一块指定类型的内存区域清零,即填充为该类型的“零值”,不仅仅是简单的 memset
,还要考虑 Go 类型系统(如内存屏障、垃圾回收相关)。
func typedmemclr(typ *_type, ptr unsafe.Pointer)
typ
:类型描述符(Go 运行时类型信息)。ptr
:要清理的内存地址。
关闭一个channel过程
关闭某个 channel,会执行函数 closechan
:
func closechan(c *hchan) {
// 关闭一个为空的channel,会触发panic
if c == nil {
panic(plainError("close of nil channel"))
}
if c.bubble != nil && getg().bubble != c.bubble {
panic(plainError("close of synctest channel from outside bubble"))
}
lock(&c.lock)
// 知识点:关闭一个已经关闭的channel会触发panic
// 这里的c.closed是一个uint32类型的字段,0表示未关闭,1表示已关闭
if c.closed != 0 { // channel已经关闭
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
//
if raceenabled {
callerpc := sys.GetCallerPC()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
// 关闭channel
c.closed = 1
var glist gList
// 将 channel 所有等待接收队列的里 sudog 释放
for {
sg := c.recvq.dequeue()
if sg == nil { // 空了,跳出循环
break
}
if sg.elem != nil { // 如果接收的元素不为nil,清空它
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g // 获取等待接收的 goroutine
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp) // 将等待接收的 goroutine 添加到 glist 中
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
close 逻辑比较简单,对于一个 channel,recvq 和 sendq 队列中分别保存了阻塞的发送者和接收者。关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。
close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。
疑问
问:向已经关闭的channel读取出来的0,是读取时产生的还是channel关闭时放置的0
答:根据以上代码可以看到,关闭 channel 只是把 channel 的状态标记为“已关闭”,并唤醒所有等待的接收者/发送者,当你调用
close(ch)
时,channel 的缓冲区如果还有数据,数据会保留,不会被清空也不会被置零。
关闭channel的正确姿势
关闭一个channel是有点麻烦的,如果对一个closed channel关闭,触发panic,那是不是要外置一个变量去描述channel状态呢?那这个变量又要进行加锁,这也太麻烦了吧。
能不能不关闭channel?
不能。可能出现内存泄漏。
看下面例子
ich := make(chan int)
// sender:两个
go func() {
for i := 0; i < 2; i++ {
ich <- i
}
}()
// receiver:三个
go func() {
for i := 0; i < 3; i++ {
fmt.Println(<-ich)
}
}()
接收者 goroutine 由于等待发送者发送一直阻塞。因此接收者 go routine 一直未退出,ich 也由于一直被接收者使用无法被垃圾回收。未退出的 go routine 和未被回收的 channel 都造成了内存泄漏的问题。
如果发送者 == 接收者数量,是可以不关闭的,sender goroutine在发送后没有东西再发而结束goroutine,接收者也同样接收完成,channel也因为没有被代码使用而自动gc回收,不关闭channel没有副作用,那么我们认为sender == recver时,可以不关闭。
有两个不那么优雅地关闭 channel 的方法:
- 使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,有 defer-recover 在兜底。
- 使用 sync.Once 来保证只关闭一次。
关闭原则
原则是:只允许发送方关闭 channel,并且只能关闭一次。
- 关闭者原则:由发送方负责关闭channel
- 单一关闭原则:确保channel只被关闭一次
- 根据场景选择策略:
- 单发送者多接收者:由发送者直接关闭
- 多发送者单接收者:使用辅助channel通知关闭
- 多发送者多接收者:使用sync.WaitGroup协调
case1:sender = 1, recver = M
关闭权利交给sender就行了,没有数据了直接关闭,剩余在channel里面数据也能被正确接收。
case2:sender = N,recver = 1
这就麻烦了,sender是无法决定关闭channel的,因为sender2可能没发完数据,recver也没法决定呀,不知道哪个sender还有数据要发送。
解决方案就是增加一个传递关闭信号的 channel,recver通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止发送数据。
package main
import (
"fmt"
"time"
)
func main() {
dataCh := make(chan int)
stopCh := make(chan struct{})
const closeSemph = 100 // 手动设置
const N = 10000
// 启动N个发送协程
for i := 1; i <= N; i++ {
go func(id int) {
for {
select {
case <-stopCh:
return // 选中这个分支会直接跳出发送
case dataCh <- id:
time.Sleep(100 * time.Millisecond)
}
}
}(i)
}
// 接收协程
go func() {
for v := range dataCh {
fmt.Println("Receiver got:", v)
if v == closeSemph { // 某个条件下要求关闭
close(stopCh)
return
}
}
}()
<-stopCh
close(dataCh)
time.Sleep(time.Second) // 等待所有发送协程退出
fmt.Println("All done")
}
这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,退出函数,不再发送数据。
需要说明的是,上面的代码并没有明确关闭 dataCh。在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 gc 代劳。
case4:sender = N, recver = M
如果用上面case3的方法,那么会有多个recver去关闭stopChan,这就造成了“关闭一个closed chan”的错误,触发panic。
利用计数器
- 发送方数量已知:使用 sync.WaitGroup 或计数器,全部 sender 退出后,由最后一个 sender 关闭 channel。
- 接收方数量任意:receiver 只需 for range 读取 channel,直到 channel 被关闭并且数据读完。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
dataCh := make(chan int)
var wgSend sync.WaitGroup
var wgRecv sync.WaitGroup
senderCount := 3
receiverCount := 2
// 启动sender
for i := 1; i <= senderCount; i++ {
wgSend.Add(1)
go func(id int) {
defer wgSend.Done()
for j := 0; j < 5; j++ {
dataCh <- id*100 + j
time.Sleep(50 * time.Millisecond)
}
}(i)
}
// 启动receiver
for i := 1; i <= receiverCount; i++ {
wgRecv.Add(1)
go func(id int) {
defer wgRecv.Done()
for v := range dataCh {
fmt.Printf("Receiver %d got: %dn", id, v)
}
}(i)
}
// 等所有sender结束,主协程关闭channel
go func() {
wgSend.Wait()
close(dataCh)
}()
wgRecv.Wait()
fmt.Println("All done")
}
- 只有 sender 关闭 channel,receiver 只负责读。
- 每个 sender 结束时调用
wgSend.Done()
,全部 sender 结束后close(dataCh)
。 - receiver 用
for v := range dataCh
读取,直到 channel 关闭。 - 这样不会发生 panic,也能保证所有数据被消费。
常见问题深度解析
Problem1 :已关闭Channel的零值机制
向已经关闭的channel读取到0,这个0实在读取的时候设置的还是 channel关闭时设置的?
向已关闭的 channel 读取到的“零值”是在“读取的时候”设置的,而不是在 channel 关闭时就设置好的。
1. channel 关闭时
- 当你调用
close(ch)
时,channel 的缓冲区如果还有数据,数据会保留,不会被清空也不会被置零。 - 关闭 channel 只是把 channel 的状态标记为“已关闭”,并唤醒所有等待的接收者/发送者。
2. 读取时
- 当你从 channel 读取时:
- 如果缓冲区还有数据,会正常读取这个数据,返回
ok == true
。 - 如果缓冲区已经空了,且 channel 已关闭,Go runtime 会把目标变量(ep 指向的内存)用typedmemclr清零,把类型的零值赋给它,并且返回ok == false。
- 这正是你在源码看到的
typedmemclr(c.elemtype, ep)
的作用。 - 也就是说,这个“零值”是在你读取的那一刻由 runtime 代码写入的,并不是在
close
的时候就写好的。
- 这正是你在源码看到的
- 如果缓冲区还有数据,会正常读取这个数据,返回
Problem2:channel使用中的内存泄漏
泄漏场景主要发生在:
- goroutine因channel操作永久阻塞
- channel被阻塞的goroutine引用而无法被GC回收
防范措施:
- 确保channel最终会被关闭或不再使用
- 使用context等机制设置超时
- 合理设计并发架构,避免永久阻塞
最佳实践建议
- 明确通信范式:设计清晰的channel使用规范
- 资源管理:确保channel最终被正确关闭或释放
- 错误处理:合理处理channel操作可能引发的panic
- 性能考量:根据场景选择带缓冲或无缓冲channel
- 调试技巧:利用runtime包检查channel状态
通过深入理解这些底层机制,开发者可以编写出更高效、更可靠的Go并发程序。