Multiple goroutines listening on one channel

I have multiple goroutines trying to receive on the same channel simultaneously. It seems like the last goroutine that starts receiving on the channel gets the value. Is this somewhere in the language spec or is it undefined behaviour?

c := make(chan string)
for i := 0; i < 5; i++ {
go func(i int) {
<-c
c <- fmt.Sprintf("goroutine %d", i)
}(i)
}
c <- "hi"
fmt.Println(<-c)

Output:

goroutine 4

Example On Playground

EDIT:

I just realized that it's more complicated than I thought. The message gets passed around all the goroutines.

c := make(chan string)
for i := 0; i < 5; i++ {
go func(i int) {
msg := <-c
c <- fmt.Sprintf("%s, hi from %d", msg, i)
}(i)
}
c <- "original"
fmt.Println(<-c)

Output:

original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4

NOTE: the above output is outdated in more recent versions of Go (see comments)

Example On Playground

98000 次浏览

这很复杂。

另外,看看 GOMAXPROCS = NumCPU+1会发生什么。例如,

package main


import (
"fmt"
"runtime"
)


func main() {
runtime.GOMAXPROCS(runtime.NumCPU() + 1)
fmt.Print(runtime.GOMAXPROCS(0))
c := make(chan string)
for i := 0; i < 5; i++ {
go func(i int) {
msg := <-c
c <- fmt.Sprintf("%s, hi from %d", msg, i)
}(i)
}
c <- ", original"
fmt.Println(<-c)
}

产出:

5, original, hi from 4

然后,看看使用缓冲通道会发生什么。例如,

package main


import "fmt"


func main() {
c := make(chan string, 5+1)
for i := 0; i < 5; i++ {
go func(i int) {
msg := <-c
c <- fmt.Sprintf("%s, hi from %d", msg, i)
}(i)
}
c <- "original"
fmt.Println(<-c)
}

Output:

original

你也应该能解释这些案子。

是的,这很复杂,但是有一些经验法则可以让事情看起来更简单。

  • 比起访问全局范围内的通道,我们更喜欢使用形式参数来表示传递给 go 例程的通道 。你可以通过这种方式得到更多的编译器检查,而且模块性也更好。
  • 避免在同一频道阅读和写作(包括主频道)。否则,死锁的风险要大得多。

下面是程序的另一个版本,它应用了这两条原则。这个案例展示了一个频道中的多个作者和一个读者:

c := make(chan string)


for i := 1; i <= 5; i++ {
go func(i int, co chan<- string) {
for j := 1; j <= 5; j++ {
co <- fmt.Sprintf("hi from %d.%d", i, j)
}
}(i, c)
}


for i := 1; i <= 25; i++ {
fmt.Println(<-c)
}

Http://play.golang.org/p/quqn7xeplw

It creates the five go-routines writing to a single channel, each one writing five times. The main go-routine reads all twenty five messages - you may notice that the order they appear in is often not sequential (i.e. the concurrency is evident).

这个示例演示了 Go 频道的一个特性: 可以让多个作者共享一个频道; Go 将自动交织消息。

这同样适用于一个频道上的一个作者和多个读者,如下面的第二个示例所示:

c := make(chan int)
var w sync.WaitGroup
w.Add(5)


for i := 1; i <= 5; i++ {
go func(i int, ci <-chan int) {
j := 1
for v := range ci {
time.Sleep(time.Millisecond)
fmt.Printf("%d.%d got %d\n", i, j, v)
j += 1
}
w.Done()
}(i, c)
}


for i := 1; i <= 25; i++ {
c <- i
}
close(c)
w.Wait()

这个 第二个例子包括一个强加在主要 goroutine 上的等待,否则这个主要 goroutine 会迅速退出并导致其他五个 goroutine 在 (感谢 < a href = “ https://stackoverflow. com/users/1001457/olov”> olov )早期被终止。

在这两个示例中,都不需要缓冲。通常,将缓冲区仅视为性能增强器是一个很好的原则。如果你的程序没有死锁 没有缓冲区,它也不会死锁 缓冲区(但是相反的 不是总是真的)。因此,作为 另一条经验法则是,启动时不要使用缓冲,然后根据需要再添加

很晚才回复,但我希望这能帮助其他人在未来喜欢 长期投票,“全球”按钮,广播给每个人?

《有效的围棋》解释了这个问题:

接收器总是阻塞,直到有数据接收。

这意味着不能让多于1个 goroutine 监听1个通道,并期望所有 goroutine 接收相同的值。

运行这个 代码示例

package main


import "fmt"


func main() {
c := make(chan int)


for i := 1; i <= 5; i++ {
go func(i int) {
for v := range c {
fmt.Printf("count %d from goroutine #%d\n", v, i)
}
}(i)
}


for i := 1; i <= 25; i++ {
c<-i
}


close(c)
}

即使有5个 Goroutine 正在监听这个频道,你也不会多次看到“ count 1”。这是因为当第一个 goroutine 阻塞通道时,所有其他 goroutine 必须排队等待。当通道被解除阻塞时,计数已经被接收并从通道中移除,因此行中的下一个 goroutine 将获得下一个 count 值。

我研究了现有的解决方案,并创建了简单的广播库 https://github.com/grafov/bcast

    group := bcast.NewGroup() // you created the broadcast group
go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members


member := group.Join() // then you join member(s) from other goroutine(s)
member.Send("test message") // or send messages of any type to the group


member1 := group.Join() // then you join member(s) from other goroutine(s)
val := member1.Recv() // and for example listen for messages

对于多个 Goroutine 监听一个频道,是的,这是可能的。关键是信息本身,你可以定义一些这样的信息:

package main


import (
"fmt"
"sync"
)


type obj struct {
msg string
receiver int
}


func main() {
ch := make(chan *obj) // both block or non-block are ok
var wg sync.WaitGroup
receiver := 25 // specify receiver count


sender := func() {
o := &obj {
msg: "hello everyone!",
receiver: receiver,
}
ch <- o
}
recv := func(idx int) {
defer wg.Done()
o := <-ch
fmt.Printf("%d received at %d\n", idx, o.receiver)
o.receiver--
if o.receiver > 0 {
ch <- o // forward to others
} else {
fmt.Printf("last receiver: %d\n", idx)
}
}


go sender()
for i:=0; i<reciever; i++ {
wg.Add(1)
go recv(i)
}


wg.Wait()
}

输出是随机的:

5 received at 25
24 received at 24
6 received at 23
7 received at 22
8 received at 21
9 received at 20
10 received at 19
11 received at 18
12 received at 17
13 received at 16
14 received at 15
15 received at 14
16 received at 13
17 received at 12
18 received at 11
19 received at 10
20 received at 9
21 received at 8
22 received at 7
23 received at 6
2 received at 5
0 received at 4
1 received at 3
3 received at 2
4 received at 1
last receiver 4

这是个老问题了,但我想没人提过这个。

首先,如果多次运行代码,这两个示例的 can be different输出。这与 Go 版本无关。

第一个例子的输出可以是 goroutine 4goroutine 0goroutine 1,... 实际上 所有 goroutine 可以是一个将字符串发送到主 goroutine 的函数。

Main goroutine 是 goroutine 之一,因此它也在等待来自通道的数据。 哪个 goroutine 应该接收数据? 没人知道。这不在语言规范中。

另外,第二个例子的输出也可以是任意值:

(为了清楚起见,我添加了方括号)

// [original, hi from 4]
// [[[[[original, hi from 4], hi from 0], hi from 2], hi from 1], hi from 3]
// [[[[[original, hi from 4], hi from 1], hi from 0], hi from 2], hi from 3]
// [[[[[original, hi from 0], hi from 2], hi from 1], hi from 3], hi from 4]
// [[original, hi from 4], hi from 1]
// [[original, hi from 0], hi from 4]
// [[[original, hi from 4], hi from 1], hi from 0]
// [[[[[original, hi from 4], hi from 1], hi from 0], hi from 3], hi from 2]
// [[[[original, hi from 0], hi from 2], hi from 1], hi from 3]
//
// ......anything can be the output.

这不是魔法,也不是什么神秘的现象。

如果有多个线程正在执行,没有人确切地知道哪个线程将获取资源。语言不能决定一切。这就是多线程编程相当复杂的原因。

Goroutine is not OS thread, but it behaves somewhat similarly.