数据结构

Channel在运行时使用hchan结构体,结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type hchan struct {
qcount uint //队列中当前元素的个数
dataqsiz kuint //环形队列的总容量
buf unsafe.Pointer //指向底层环形数组的指针

elemsize uint16 //元素的大小
elemtype *_type //元素的类型信息

sendx uint //发送索引,下一次发送写入的位置
recvq uint //接受索引,下一次接收读取的位置

recvq waitq //双向链表,等待接收的goruntine队列
sendq waitq //等待发送的goruntine队列

lock mutex //互斥锁,保护结构体
}

recvq 和 sendq 的作用是存储那些“因为无法立即完成操作”而被迫阻塞(睡眠)的 Goroutine

  • recvq:想读数据但读不到的 Goroutine
  • sendq:想发数据但发不出去的 Goroutine

waitq:阻塞的协程队列

1
2
3
4
type waitq struct {
first *sudog //队列头部
last *sudog //尾部
}
1
2
3
4
5
6
7
8
9
10
11
12
//sudog是对goruntine的再封装
type sudog struct {
g *g
//双向链表
next *sudog
prev *sudog

elem unsafe.Pointer
isSelect bool//是不是select的标识,如果处于select里,即使条件不满足,也不要直接阻塞

c *hchan//指向它所从属的channel
}

channel的三种类型

  • 无缓冲区
  • 有缓冲区
  • 缓冲区为指针类型的
    底层通过makechan创建channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func makechan(t *chantype, size int) *hchan {  
elem := t.Elem

// compiler checks this but be safe.
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}

mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.

var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()

case !elem.Pointers():
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
//
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
if b := getg().bubble; b != nil {
c.bubble = b
}
lockInit(&c.lock, lockRankHchan)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}

工作流程

  1. 写数据 ch <- data

    在开始写数据之前,会先处理特殊情况

    1. 如果向一个空Channel写数据,会直接调用runtime.gopark,让出处理器的使用(也就是永远挂起)
    2. 如果channel已经关闭,立刻返回
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
       //向一个没有经过初始化的channel写数据,会永远挂起
    if c == nil {
    if !block {
    return false
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
    }
    ///...
    lock(&c.lock)
    //向一个已经关闭的写数据,会panic
    if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
    }
  • 存在阻塞等待读goruntine
    意味着缓冲区是空的或者就是没有缓冲区,否则不会有阻塞等待的goruntine
    加锁->将写的数据直接给等待读的goruntine -> 解锁 -> 唤醒已经得到数据的goruntine

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
     lock(&c.lock)
    //从阻塞的读队列取出一个被封装为sudog的读goruntine
    if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    //send方法会基于memmove方法,将元素直接拷贝给sudog对应的goruntine
    //并且send方法会完成解锁动作
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
    }
  • 没有阻塞但是环形缓冲区仍然有空间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
     if c.qcount < c.dataqsiz {  
    // Space is available in the channel buffer. Enqueue the element to send.
    qp := chanbuf(c, c.sendx)
    if raceenabled {
    racenotify(c, c.sendx, nil)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    //环形
    if c.sendx == c.dataqsiz {
    c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
    }
  • 写时没有阻塞读goruntine,且环形缓冲区没有空间
    把当前写的goruntine放到阻塞的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Block on the channel. Some receiver will complete our operation for us.  
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//封装构造goruntine的sudog
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)

gp.parkingOnChan.Store(true)
reason := waitReasonChanSend
if c.bubble != nil {
reason = waitReasonSynctestChanSend
}
//现在应该陷入一个waiting的状态,等待读goruntine的读取
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
// sudog被唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
  1. 读操作
    如果如果向一个空Channel读数据,会直接调用runtime.gopark,让出处理器的使用(也就是永远挂起)
    读一个已经关闭的channel,并且里面没有元素了,会读到零值
    1
    	
    • 如果读得时候有阻塞的写协程
      上锁 -> 把缓冲区的第一个goruntine读出来 ->把阻塞等待的写goruntine转移到环形缓冲区的尾部 -> 解锁 -> 把写goruntine唤醒
    • 读没有阻塞的,并且缓冲区有空间
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      	if c.qcount > 0 {  
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if raceenabled {
      racenotify(c, c.recvx, nil)
      }
      if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
      c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
      }

    • 读时没有阻塞写goruntine并且缓冲区没有元素
      把当前的goruntine封装为sudog,然后这个读goruntine将会被放到阻塞等待的队列,然后gopark挂起,等待写goruntine向里面写数据唤醒

阻塞模式与非阻塞模式

多路复用情况下,会变成非阻塞模式,不能因为一个分支就把goruntine挂起,所以用block标识,在挂起操作前进行判断

关闭

一道题目

要求实现一个 map:
(1)面向高并发;
(2)只存在插入和查询操作 O(1);
(3)查询时,若 key 存在,直接返回 val;若 key 不存在,阻塞直到 key val 对被放入后,获取 val 返回; 等待指定时长仍未放入,返回超时错误;
(4)写出真实代码,不能有死锁或者 panic 风险.
解答

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
type MyConcurrentMap struct {
sync.Mutex
mp map[int]int
keyToCh map[int]chan struct{}
}
func NewMyConcurrentMap() *MyConcurrentMap{
return &MyConcurrentMap {
mp:make(map[int]int),
keyToCh: make(map[int]chan struct{])
}
}
func (m *MyConcurrentMap)Put(k, v int) {
m.Lock()
defer m.UnLock()
m.mp[k] = v
ch, ok := m.keyToCh[k]
if !ok {
return
}
close(ch)
}
func (m *MyConcurrentMap)Get(k int, maxwaitingDuration time.Duration) (int, error) {
m.Lock()
v, ok := m.mp[k]
if ok {
m.UnLock()
return v, nil
}
ch, ok := m.keyToCh[k]
if !ok{
ch = make(chan struct{})
m.keyToCh[k] = ch
}
tCtx, cancel := context.WithTineout(context.Background(), maxWaitingDuration)
defer cancel()
m.Unlock()
select {
case <- tCtx.DOne():
return -1, tCtx.Err()
case <- ch:
}
m.Lock()
v = m.mp[k]
m.Unlock()
return v, nil
}