channel底层实现
数据结构
Channel在运行时使用hchan结构体,结构如下
1 | type hchan struct { |
recvq 和 sendq 的作用是存储那些“因为无法立即完成操作”而被迫阻塞(睡眠)的 Goroutine
- recvq:想读数据但读不到的 Goroutine
- sendq:想发数据但发不出去的 Goroutine
waitq:阻塞的协程队列
1 | type waitq struct { |
1 | //sudog是对goruntine的再封装 |
channel的三种类型
- 无缓冲区
- 有缓冲区
- 缓冲区为指针类型的
底层通过makechan创建channel
1 | func makechan(t *chantype, size int) *hchan { |
工作流程
-
写数据 ch <- data
在开始写数据之前,会先处理特殊情况
- 如果向一个空Channel写数据,会直接调用runtime.gopark,让出处理器的使用(也就是永远挂起)
- 如果channel已经关闭,立刻返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15//向一个没有经过初始化的channel写数据,会永远挂起
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
///...
lock(&c.lock)
//向一个已经关闭的写数据,会panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
-
存在阻塞等待读goruntine
意味着缓冲区是空的或者就是没有缓冲区,否则不会有阻塞等待的goruntine
加锁->将写的数据直接给等待读的goruntine -> 解锁 -> 唤醒已经得到数据的goruntine1
2
3
4
5
6
7
8
9
10lock(&c.lock)
//从阻塞的读队列取出一个被封装为sudog的读goruntine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
//send方法会基于memmove方法,将元素直接拷贝给sudog对应的goruntine
//并且send方法会完成解锁动作
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
} -
没有阻塞但是环形缓冲区仍然有空间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
//环形
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
} -
写时没有阻塞读goruntine,且环形缓冲区没有空间
把当前写的goruntine放到阻塞的队列
1 | // Block on the channel. Some receiver will complete our operation for us. |
- 读操作
如果如果向一个空Channel读数据,会直接调用runtime.gopark,让出处理器的使用(也就是永远挂起)
读一个已经关闭的channel,并且里面没有元素了,会读到零值1
- 如果读得时候有阻塞的写协程
上锁 -> 把缓冲区的第一个goruntine读出来 ->把阻塞等待的写goruntine转移到环形缓冲区的尾部 -> 解锁 -> 把写goruntine唤醒 - 读没有阻塞的,并且缓冲区有空间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
} - 读时没有阻塞写goruntine并且缓冲区没有元素
把当前的goruntine封装为sudog,然后这个读goruntine将会被放到阻塞等待的队列,然后gopark挂起,等待写goruntine向里面写数据唤醒
- 如果读得时候有阻塞的写协程
阻塞模式与非阻塞模式
多路复用情况下,会变成非阻塞模式,不能因为一个分支就把goruntine挂起,所以用block标识,在挂起操作前进行判断
关闭
一道题目
要求实现一个 map:
(1)面向高并发;
(2)只存在插入和查询操作 O(1);
(3)查询时,若 key 存在,直接返回 val;若 key 不存在,阻塞直到 key val 对被放入后,获取 val 返回; 等待指定时长仍未放入,返回超时错误;
(4)写出真实代码,不能有死锁或者 panic 风险.
解答
1 | type MyConcurrentMap struct { |
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 EurekaYu!
评论


