同步的优点是什么?

我正在研究一个并发 Go 库,偶然发现了两种不同的 goroutine 同步模式,它们的结果是相似的:

等待小组

package main


import (
"fmt"
"sync"
"time"
)


var wg sync.WaitGroup


func main() {
words := []string{"foo", "bar", "baz"}


for _, word := range words {
wg.Add(1)
go func(word string) {
time.Sleep(1 * time.Second)
defer wg.Done()
fmt.Println(word)
}(word)
}
// do concurrent things here


// blocks/waits for waitgroup
wg.Wait()
}

频道

package main


import (
"fmt"
"time"
)


func main() {
words := []string{"foo", "bar", "baz"}
done := make(chan bool)
// defer close(done)
for _, word := range words {
// fmt.Println(len(done), cap(done))
go func(word string) {
time.Sleep(1 * time.Second)
fmt.Println(word)
done <- true
}(word)
}
// Do concurrent things here


// This blocks and waits for signal from channel
for range words {
<-done
}
}


有人告诉我,sync.WaitGroup的性能稍微好一点,而且我已经看到它被广泛使用。然而,我发现频道更加地道。在通道上使用 sync.WaitGroup的真正优势是什么? 当它更好的时候,情况会是怎样?

38080 次浏览

与第二个例子的正确性无关(正如评论中所解释的,你没有做你认为的事情,但它很容易修复) ,我倾向于认为第一个例子更容易理解。

现在,我甚至不能说频道更加地道。作为 Go 语言的一个标志性特征,通道不应该意味着只要有可能就使用它们。在围棋中惯用的方法是使用最简单、最容易理解的解决方案: 在这里,WaitGroup既传达了意思(你的主要功能是让工人完成 Waiting) ,也传达了机械师(工人在他们是 Done时会通知他们)。

除非您处于非常特殊的情况下,否则我不建议在此使用通道解决方案。

如果你特别喜欢只使用通道,那么就需要用不同的方式来做(如果我们使用你的例子,就像@Not _ a _ Golfer 指出的那样,它会产生不正确的结果)。

一种方法是创建一个 int 类型的通道。在工作进程中,每次完成作业时都发送一个数字(如果需要,也可以是唯一的作业 ID,可以在接收器中跟踪它)。

在接收器的 main go 例行程序(它将知道提交的确切数量的作业)-在一个通道上做一个范围循环,指望直到提交的作业数量没有完成,当所有作业完成时打破循环。如果您想要跟踪每个作业的完成情况,这是一个很好的方法(如果需要,也许可以做一些事情)。

这是你参考的代码。递减 total JobsLeft 将是安全的,因为它将永远只在通道的范围循环中完成!

//This is just an illustration of how to sync completion of multiple jobs using a channel
//A better way many a times might be to use wait groups


package main


import (
"fmt"
"math/rand"
"time"
)


func main() {


comChannel := make(chan int)
words := []string{"foo", "bar", "baz"}


totalJobsLeft := len(words)


//We know how many jobs are being sent


for j, word := range words {
jobId := j + 1
go func(word string, jobId int) {


fmt.Println("Job ID:", jobId, "Word:", word)
//Do some work here, maybe call functions that you need
//For emulating this - Sleep for a random time upto 5 seconds
randInt := rand.Intn(5)
//fmt.Println("Got random number", randInt)
time.Sleep(time.Duration(randInt) * time.Second)
comChannel <- jobId
}(word, jobId)
}


for j := range comChannel {
fmt.Println("Got job ID", j)
totalJobsLeft--
fmt.Println("Total jobs left", totalJobsLeft)
if totalJobsLeft == 0 {
break
}
}
fmt.Println("Closing communication channel. All jobs completed!")
close(comChannel)


}

这取决于用例。如果您正在分派要并行运行的一次性作业,而不需要知道每个作业的结果,那么可以使用 WaitGroup。但是如果你需要从 goroutine 中收集结果,那么你应该使用一个通道。

因为通道是双向的,所以我几乎总是使用通道。

另一方面,正如注释中指出的那样,您的通道示例没有正确实现。您将需要一个单独的通道来表明没有更多的工作要做(一个例子是 给你)。在您的示例中,因为您事先知道单词的数量,所以只需使用一个缓冲通道并接收固定的次数,就可以避免声明一个关闭通道。

也建议使用等待组,但仍然你想做的渠道,然后下面我提到一个简单的渠道使用

package main


import (
"fmt"
"time"
)


func main() {
c := make(chan string)
words := []string{"foo", "bar", "baz"}


go printWordrs(words, c)


for j := range c {
fmt.Println(j)
}
}




func printWordrs(words []string, c chan string) {
defer close(c)
for _, word := range words {
time.Sleep(1 * time.Second)
c <- word
}
}

我经常使用通道从 goroutine 收集可能产生错误的错误消息,下面是一个简单的例子:

func couldGoWrong() (err error) {
errorChannel := make(chan error, 3)


// start a go routine
go func() (err error) {
defer func() { errorChannel <- err }()


for c := 0; c < 10; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}


return
}()


// start another go routine
go func() (err error) {
defer func() { errorChannel <- err }()


for c := 10; c < 100; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}


return
}()


// start yet another go routine
go func() (err error) {
defer func() { errorChannel <- err }()


for c := 100; c < 1000; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}


return
}()


// synchronize go routines and collect errors here
for c := 0; c < cap(errorChannel); c++ {
err = <-errorChannel
if err != nil {
return
}
}


return
}

对于您的简单示例(发出完成作业的信号) ,WaitGroup是显而易见的选择。Go 编译器非常友好,不会因为使用通道完成简单的信号任务而责怪您,但是一些代码审查员会这样做。

  1. ”WaitGroup 等待 goroutine 集合完成。 主要的 goroutine 调用 Add(n)来设置 然后每一个 goroutine 运行并在完成时调用 Done()。同时, 等待可以用来阻止,直到所有的 goroutine 完成。”
words := []string{"foo", "bar", "baz"}
var wg sync.WaitGroup
for _, word := range words {
wg.Add(1)
go func(word string) {
defer wg.Done()
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
}(word)
}
wg.Wait()

可能性仅仅受限于你的想象力:

  1. 频道可以是 缓冲:
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, len(words))
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done <- struct{}{} // not blocking
}(word)
}
for range words {
<-done
}
  1. 信道可以是 没有缓冲,你可以只使用一个信令信道(例如 chan struct{}) :
words := []string{"foo", "bar", "baz"}
done := make(chan struct{})
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done <- struct{}{} // blocking
}(word)
}
for range words {
<-done
}
  1. 您可以使用 缓冲通道容量限制并发作业的数量:
t0 := time.Now()
var wg sync.WaitGroup
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, 1) // set the number of concurrent job here
for _, word := range words {
wg.Add(1)
go func(word string) {
done <- struct{}{}
time.Sleep(100 * time.Millisecond) // job
fmt.Println(word, time.Since(t0))
<-done
wg.Done()
}(word)
}
wg.Wait()
  1. 你可使用以下途径发送讯息:
done := make(chan string)
go func() {
for _, word := range []string{"foo", "bar", "baz"} {
done <- word
}
close(done)
}()
for word := range done {
fmt.Println(word)
}

基准:

    go test -benchmem -bench . -args -n 0
# BenchmarkEvenWaitgroup-8  1827517   652 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1000000  2373 ns/op  520 B/op  1 allocs/op
go test -benchmem -bench .
# BenchmarkEvenWaitgroup-8  1770260   678 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1560124  1249 ns/op  158 B/op  0 allocs/op

代码(main_test.go) :

package main


import (
"flag"
"fmt"
"os"
"sync"
"testing"
)


func BenchmarkEvenWaitgroup(b *testing.B) {
evenWaitgroup(b.N)
}
func BenchmarkEvenChannel(b *testing.B) {
evenChannel(b.N)
}
func evenWaitgroup(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i < n; i++ {
wg.Add(1)
go func(n int) {
select {
case ch <- n: // tx if channel is empty
case i := <-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
wg.Done()
}(i)
}
wg.Wait()
}
func evenChannel(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i < n; i++ {
go func(n int) {
select {
case ch <- n: // tx if channel is empty
case i := <-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
done <- struct{}{}
}(i)
}
for i := 0; i < n; i++ {
<-done
}
}
func TestMain(m *testing.M) {
var n int // We use TestMain to set up the done channel.
flag.IntVar(&n, "n", 1_000_000, "chan cap")
flag.Parse()
done = make(chan struct{}, n)
fmt.Println("n=", n)
os.Exit(m.Run())
}


var (
done chan struct{}
ch   = make(chan int)
wg   sync.WaitGroup
)