如何检查频道是否关闭而不读它?

这是由@Jimt 编写的 Go 中 worker & controller 模式的一个很好的例子 "是否有一些优雅的方式来暂停和恢复任何其他戈兰程序?"

package main


import (
"fmt"
"runtime"
"sync"
"time"
)


// Possible worker states.
const (
Stopped = 0
Paused  = 1
Running = 2
)


// Maximum number of workers.
const WorkerCount = 1000


func main() {
// Launch workers.
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)


workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int)


go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}


// Launch controller routine.
go func() {
controller(workers)
wg.Done()
}()


// Wait for all goroutines to finish.
wg.Wait()
}


func worker(id int, ws <-chan int) {
state := Paused // Begin in the paused state.


for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}


default:
// We use runtime.Gosched() to prevent a deadlock in this case.
// It will not be needed of work is performed here which yields
// to the scheduler.
runtime.Gosched()


if state == Paused {
break
}


// Do actual work here.
}
}
}


// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
// Start workers
for i := range workers {
workers[i] <- Running
}


// Pause workers.
<-time.After(1e9)
for i := range workers {
workers[i] <- Paused
}


// Unpause workers.
<-time.After(1e9)
for i := range workers {
workers[i] <- Running
}


// Shutdown workers.
<-time.After(1e9)
for i := range workers {
close(workers[i])
}
}

但是这段代码也有一个问题: 如果您想在 worker()退出时删除 workers中的工作通道,就会发生死锁。

如果你的 close(workers[i]),下次控制器写入它将造成恐慌,因为去不能写入一个关闭的通道。如果你使用一些互斥体来保护它,那么它将被卡在 workers[i] <- Running上,因为 worker没有从通道读取任何东西,写将被阻塞,互斥体将导致死锁。您还可以提供一个更大的缓冲区作为通道的解决方案,但这还不够好。

所以我认为最好的解决方法是 worker()关闭通道时退出,如果控制器发现一个通道关闭,它将跳过它,什么也不做。但在这种情况下,我找不到如何检查频道是否已经关闭。如果我尝试读取控制器中的通道,控制器可能会被阻塞。所以我现在很困惑。

附言: 我已经尝试过恢复恐慌,但是它会关闭引起恐慌的常规程序。在这种情况下,它将是控制器,所以它是没有用的。

尽管如此,我认为围棋团队在围棋的下一个版本中实现这个功能是有用的。

193051 次浏览

如果你听这个频道,你总能发现那个频道是关闭的。

case state, opened := <-ws:
if !opened {
// channel was closed
// return or made some final work
}
switch state {
case Stopped:

但是记住,你不能关闭一个频道两次,这会引起恐慌。

通过一种古怪的方式,它可以用于通过恢复已经引起的恐慌来尝试写信的频道。但是,如果不从读取通道进行读取,则无法检查该读取通道是否已关闭。

你也一样

  • 最终从中读取“ true”值(v <- c)
  • 读取“真”值和“未关闭”指示器(v, ok <- c)
  • 读取 零价值和“关闭”指示器(v, ok <- c)(例子)
  • will block in the channel read forever (v <- c)

只有最后一个从技术上讲不是从频道读取的,但是没什么用。

没有办法编写一个安全的应用程序,您需要知道一个通道是否打开,而不与它进行交互。

完成您想要完成的任务的最佳方法是使用两个通道——一个用于工作,另一个用于表示希望更改状态(以及完成状态更改(如果这很重要))。

通道很便宜,但复杂的设计过载语义就不便宜了。

[也]

<-time.After(1e9)

是一种非常混乱和不明显的写作方式

time.Sleep(time.Second)

让事情保持简单,每个人(包括你)都能理解它们。

首先检查通道是否有元素会更容易,这样可以确保通道是活的。

func isChanClosed(ch chan interface{}) bool {
if len(ch) == 0 {
select {
case _, ok := <-ch:
return !ok
}
}
return false
}

我知道这个答案太晚了,我已经写了这个解决方案,黑进 运行时间,它不安全,它可能会崩溃:

import (
"unsafe"
"reflect"
)




func isChanClosed(ch interface{}) bool {
if reflect.TypeOf(ch).Kind() != reflect.Chan {
panic("only channels!")
}
    

// get interface value pointer, from cgo_export
// typedef struct { void *t; void *v; } GoInterface;
// then get channel real pointer
cptr := *(*uintptr)(unsafe.Pointer(
unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
))
    

// this function will return true if chan.closed > 0
// see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go
// type hchan struct {
// qcount   uint           // total data in the queue
// dataqsiz uint           // size of the circular queue
// buf      unsafe.Pointer // points to an array of dataqsiz elements
// elemsize uint16
// closed   uint32
// **
    

cptr += unsafe.Sizeof(uint(0))*2
cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
cptr += unsafe.Sizeof(uint16(0))
return *(*uint32)(unsafe.Pointer(cptr)) > 0
}

根据文件:

通道可以关闭与内置功能关闭。接收运算符的多值赋值表单报告在通道关闭之前是否发送了接收值。

Https://golang.org/ref/spec#receive_operator

Example by Golang in Action shows this case:

// This sample program demonstrates how to use an unbuffered
// channel to simulate a game of tennis between two goroutines.
package main


import (
"fmt"
"math/rand"
"sync"
"time"
)


// wg is used to wait for the program to finish.
var wg sync.WaitGroup


func init() {
rand.Seed(time.Now().UnixNano())
}


// main is the entry point for all Go programs.
func main() {
// Create an unbuffered channel.
court := make(chan int)
// Add a count of two, one for each goroutine.
wg.Add(2)
// Launch two players.
go player("Nadal", court)
go player("Djokovic", court)
// Start the set.
court <- 1
// Wait for the game to finish.
wg.Wait()
}


// player simulates a person playing the game of tennis.
func player(name string, court chan int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for {
// Wait for the ball to be hit back to us.
ball, ok := <-court
fmt.Printf("ok %t\n", ok)
if !ok {
// If the channel was closed we won.
fmt.Printf("Player %s Won\n", name)
return
}
// Pick a random number and see if we miss the ball.
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
// Close the channel to signal we lost.
close(court)
return
}


// Display and then increment the hit count by one.
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
// Hit the ball back to the opposing player.
court <- ball
}
}

那么,你可以用 default分支来检测它,对于一个封闭的通道将被选中,例如: 下面的代码将选择 defaultchannelchannel,第一个选择是不封闭的。

func main() {
ch := make(chan int)


go func() {
select {
case <-ch:
log.Printf("1.channel")
default:
log.Printf("1.default")
}
select {
case <-ch:
log.Printf("2.channel")
}
close(ch)
select {
case <-ch:
log.Printf("3.channel")
default:
log.Printf("3.default")
}
}()
time.Sleep(time.Second)
ch <- 1
time.Sleep(time.Second)
}

指纹

2018/05/24 08:00:00 1.default
2018/05/24 08:00:01 2.channel
2018/05/24 08:00:01 3.channel

注意,请参考@Angad 在这个答案下面的评论:

It doesn't work if you're using a Buffered Channel and it contains 未读数据

除了关闭频道之外,您还可以将频道设置为 Nil。这样你就可以检查它是否为零。

例如: Https://play.golang.org/p/v0f3d4discz

edit: This is actually a bad solution as demonstrated in the next example, 因为在函数中将通道设置为 nil 会破坏它: Https://play.golang.org/p/yve2-lv9top

我经常遇到多个并发 goroutine 的问题。

这可能是一个好的模式,也可能不是,但是我为 worker 定义了一个 struct,其中包含一个退出通道和 worker 状态的字段:

type Worker struct {
data chan struct
quit chan bool
stopped bool
}

然后你可以让控制器为 worker 调用 stop 函数:

func (w *Worker) Stop() {
w.quit <- true
w.stopped = true
}


func (w *Worker) eventloop() {
for {
if w.Stopped {
return
}
select {
case d := <-w.data:
//DO something
if w.Stopped {
return
}
case <-w.quit:
return
}
}
}

这为您提供了一种非常好的方法,可以在不挂起任何东西或不产生错误的情况下对工人进行干净的停止,这在容器中运行时尤其有效。

ch1 := make(chan int)
ch2 := make(chan int)
go func(){
for i:=0; i<10; i++{
ch1 <- i
}
close(ch1)
}()
go func(){
for i:=10; i<15; i++{
ch2 <- i
}
close(ch2)
}()
ok1, ok2 := false, false
v := 0
for{
ok1, ok2 = true, true
select{
case v,ok1 = <-ch1:
if ok1 {fmt.Println(v)}
default:
}
select{
case v,ok2 = <-ch2:
if ok2 {fmt.Println(v)}
default:
}
if !ok1 && !ok2{return}
    

}

}