如何监听 N 个频道? (动态选择语句)

要开始执行两个 goroutine 的无止境循环,我可以使用下面的代码:

在收到短信后,它将开始一个新的常规程序,并永远继续下去。

c1 := make(chan string)
c2 := make(chan string)


go DoStuff(c1, 5)
go DoStuff(c2, 2)


for ; true;  {
select {
case msg1 := <-c1:
fmt.Println("received ", msg1)
go DoStuff(c1, 1)
case msg2 := <-c2:
fmt.Println("received ", msg2)
go DoStuff(c2, 9)
}
}

我现在希望 N 个 goroutines 具有相同的行为,但是在这种情况下 select 语句看起来如何呢?

这是我开始使用的代码位,但是我对如何编写 select 语句感到困惑

numChans := 2


//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}


for i:=0;i<numChans;i++{
tmp := make(chan string);
chans = append(chans, tmp);
go DoStuff(tmp, i + 1)


//How shall the select statment be coded for this case?
for ; true;  {
select {
case msg1 := <-c1:
fmt.Println("received ", msg1)
go DoStuff(c1, 1)
case msg2 := <-c2:
fmt.Println("received ", msg2)
go DoStuff(c2, 9)
}
}
53873 次浏览

您可以使用 反思包中的 Select函数来完成这项工作:

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

选择执行由案例列表描述的选择操作 Go select 语句,它会阻塞,直到至少有一种情况可以 然后执行一个统一的伪随机选择 它返回所选大小写的索引,如果该大小写为 接收操作、接收到的值和一个指示是否 该值对应于通道上的一个发送(与零相对) 值,因为通道已关闭)。

传入一个由 SelectCase结构组成的数组,该数组标识要选择的通道、操作的方向以及在发送操作的情况下要发送的值。

所以你可以这样做:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

您可以在这里试验一个更具体的示例: http://play.golang.org/p/8zwvSk4kjx

您可以通过在 goroutine 中包装每个通道来实现这一点,goroutine 将消息“转发”到一个共享的“聚合”通道。例如:

agg := make(chan string)
for _, ch := range chans {
go func(c chan string) {
for msg := range c {
agg <- msg
}
}(ch)
}


select {
case msg <- agg:
fmt.Println("received ", msg)
}

如果需要知道消息来自哪个通道,可以在将其转发到聚合通道之前将其包装在带有任何额外信息的结构中。

在我(有限的)测试中,这种方法的性能大大优于使用反射包:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

基准代码 给你

为了扩展对以前答案的一些评论,并在这里提供一个更清晰的比较,这里给出了两种方法的一个例子,这两种方法都给出了相同的输入,一个要从中读取的通道片段,以及一个要调用每个值的函数,这个函数还需要知道值来自哪个通道。

这两种方法有三个主要区别:

  • 复杂性。虽然这可能部分是读者的偏好,但我发现通道方法更加地道、直接和可读。

  • 表演。在我的至强 amd64系统上,goroutines + 的反射解决方案要比 goroutines + 的反射解决方案多出大约两个数量级(一般而言,Go 中的反射通常比较慢,只有在绝对必要的情况下才能使用)。当然,如果在处理结果的函数或将值写入输入通道的过程中有任何显著的延迟,那么这种性能差异很容易变得微不足道。

  • 阻塞/缓冲语义。这一点的重要性取决于用例。大多数情况下,要么无关紧要,要么 goroutine 合并解决方案中的轻微额外缓冲可能有助于提高吞吐量。但是,如果希望具有这样的语义,即只有一个写入器是未阻塞的,并且它的值完全处理了 之前,那么其他任何写入器都是未阻塞的,那么这只能通过反射解决方案来实现。

注意,如果不需要发送通道的“ id”或者源通道永远不会关闭,这两种方法都可以简化。

Goroutine 合并频道:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
// Setup
type item struct {
int    // index of which channel this came from
string // the actual string item
}
merged := make(chan item)
var wg sync.WaitGroup
wg.Add(len(chans))
for i, c := range chans {
go func(i int, c <-chan string) {
// Reads and buffers a single item from `c` before
// we even know if we can write to `merged`.
//
// Go doesn't provide a way to do something like:
//     merged <- (<-c)
// atomically, where we delay the read from `c`
// until we can write to `merged`. The read from
// `c` will always happen first (blocking as
// required) and then we block on `merged` (with
// either the above or the below syntax making
// no difference).
for s := range c {
merged <- item{i, s}
}
// If/when this input channel is closed we just stop
// writing to the merged channel and via the WaitGroup
// let it be known there is one fewer channel active.
wg.Done()
}(i, c)
}
// One extra goroutine to watch for all the merging goroutines to
// be finished and then close the merged channel.
go func() {
wg.Wait()
close(merged)
}()


// "select-like" loop
for i := range merged {
// Process each value
fn(i.int, i.string)
}
}

反射选择:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
// Setup
cases := make([]reflect.SelectCase, len(chans))
// `ids` maps the index within cases to the original `chans` index.
ids := make([]int, len(chans))
for i, c := range chans {
cases[i] = reflect.SelectCase{
Dir:  reflect.SelectRecv,
Chan: reflect.ValueOf(c),
}
ids[i] = i
}


// Select loop
for len(cases) > 0 {
// A difference here from the merging goroutines is
// that `v` is the only value "in-flight" that any of
// the workers have sent. All other workers are blocked
// trying to send the single value they have calculated
// where-as the goroutine version reads/buffers a single
// extra value from each worker.
i, v, ok := reflect.Select(cases)
if !ok {
// Channel cases[i] has been closed, remove it
// from our slice of cases and update our ids
// mapping as well.
cases = append(cases[:i], cases[i+1:]...)
ids = append(ids[:i], ids[i+1:]...)
continue
}


// Process each value
fn(ids[i], v.String())
}
}

[完整代码 在围棋场上. ]

为什么这种方法在假设有人正在发送事件的情况下不起作用?

func main() {
numChans := 2
var chans = []chan string{}


for i := 0; i < numChans; i++ {
tmp := make(chan string)
chans = append(chans, tmp)
}


for true {
for i, c := range chans {
select {
case x = <-c:
fmt.Printf("received %d \n", i)
go DoShit(x, i)
default: continue
}
}
}
}

也许更简单的选择:

与其拥有一个信道数组,为什么不仅仅将一个信道作为参数传递给在不同 goroutine 上运行的函数,然后在使用者 goroutine 中侦听信道呢?

这允许您在您的侦听器中只选择一个通道,从而实现简单的选择,并避免创建新的 goroutine 来聚合来自多个通道的消息?

实际上,我们对这个课题进行了一些研究,找到了最佳的解决方案。我们使用 reflect.Select一段时间,它是一个伟大的问题的解决方案。它比每个通道的 goroutine 轻得多,操作简单。但不幸的是,它实际上并不支持大量的频道,这就是我们的情况,所以我们发现了一些有趣的东西,并写了一篇关于它的博客文章: https://cyolo.io/blog/how-we-enabled-dynamic-channel-selection-at-scale-in-go/

我将总结一下上面写的内容: 我们静态地创建了选择的批处理。.Case 语句的每个结果的指数幂最多可达32,还有一个函数,该函数路由到不同的情况,并通过一个聚合通道聚合结果。

这种批处理的一个例子:

func select4(ctx context.Context, chanz []chan interface{}, res chan *r, r *r, i int) {
select {
case r.v, r.ok = <-chanz[0]:
r.i = i + 0
res <- r
case r.v, r.ok = <-chanz[1]:
r.i = i + 1
res <- r
case r.v, r.ok = <-chanz[2]:
r.i = i + 2
res <- r
case r.v, r.ok = <-chanz[3]:
r.i = i + 3
res <- r
case <-ctx.Done():
break
}
}

以及使用这些 select..case批次从任意数量的通道聚合第一个结果的逻辑:

    for i < len(channels) {
l = len(channels) - i
switch {
case l > 31 && maxBatchSize >= 32:
go select32(ctx, channels[i:i+32], agg, rPool.Get().(*r), i)
i += 32
case l > 15 && maxBatchSize >= 16:
go select16(ctx, channels[i:i+16], agg, rPool.Get().(*r), i)
i += 16
case l > 7 && maxBatchSize >= 8:
go select8(ctx, channels[i:i+8], agg, rPool.Get().(*r), i)
i += 8
case l > 3 && maxBatchSize >= 4:
go select4(ctx, channels[i:i+4], agg, rPool.Get().(*r), i)
i += 4
case l > 1 && maxBatchSize >= 2:
go select2(ctx, channels[i:i+2], agg, rPool.Get().(*r), i)
i += 2
case l > 0:
go select1(ctx, channels[i], agg, rPool.Get().(*r), i)
i += 1
}
}

根据詹姆斯 · 亨斯特里奇的回答, 我创建了这个通用的(go > = 1.18) Select函数,它接受一个上下文和一段通道,并返回选中的通道:

func Select[T any](ctx context.Context, chs []chan T) (int, T, error) {
var zeroT T
cases := make([]reflect.SelectCase, len(chs)+1)
for i, ch := range chs {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
cases[len(chs)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}
// ok will be true if the channel has not been closed.
chosen, value, ok := reflect.Select(cases)
if !ok {
if ctx.Err() != nil {
return -1, zeroT, ctx.Err()
}
return chosen, zeroT, errors.New("channel closed")
}
if ret, ok := value.Interface().(T); ok {
return chosen, ret, nil
}
return chosen, zeroT, errors.New("failed to cast value")
}

下面是一个如何使用它的例子:

func TestSelect(t *testing.T) {
c1 := make(chan int)
c2 := make(chan int)
c3 := make(chan int)
chs := []chan int{c1, c2, c3}
go func() {
time.Sleep(time.Second)
//close(c2)
c2 <- 42
}()
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)


chosen, val, err := Select(ctx, chs)


assert.Equal(t, 1, chosen)
assert.Equal(t, 42, val)
assert.NoError(t, err)
}