Go 并发设计模式¶
屏障模式(Barrier Mode)¶
屏障模式(Barrier Mode),用来阻塞goroutine直到聚合所有goroutine返回结果,可以用通道实现。使用场景:
多个网络请求并发,聚合结果
粗粒度任务拆分并发执行,聚合结果
package main
import (
"fmt"
"io/ioutil"
"net/http"
"time"
)
// 封装屏障模式响应的结构体(一般就是返回值和错误)
type BarrierResponse struct {
Err error
Resp string
Status int
}
// 执行单个请求,结果放入 channel
func doRequest(out chan<- BarrierResponse, url string) {
res := BarrierResponse{}
client := http.Client{Timeout: time.Duration(3 * time.Second)}
resp, err := client.Get(url)
if resp != nil {
res.Status = resp.StatusCode
}
if err != nil {
res.Err = err
out <- res
return
}
b, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
res.Err = err
out <- res
return
}
res.Resp = string(b)
out <- res // 结果放入通道
}
// 并发请求并聚合结果
func Barrier(urls ...string) {
n := len(urls)
in := make(chan BarrierResponse, n)
response := make([]BarrierResponse, n)
defer close(in)
for _, url := range urls {
go doRequest(in, url)
}
var hasError bool
for i := 0; i < n; i++ {
resp := <-in
if resp.Err != nil {
fmt.Println("Error: ", resp.Err, resp.Status)
hasError = true
}
response[i] = resp
}
if !hasError {
for _, resp := range response {
fmt.Println(resp.Status)
}
}
}
func main() {
urls := []string{
"https://www.baidu.com",
"https://www.weibo.com",
"https://www.zhihu.com",
}
Barrier(urls...)
}
未来模式(Future Mode)¶
Future模式(也称为Promise Mode)。使用 fire-and-forget
方式,主进程不等子进程执行完就直接返回,然后等到未来执行完的时候再去获取结果。
未来模式中主goroutine不用等待子goroutine返回的结果,可以先去做其他事情,等未来需要子goroutine结果的时候再来取。
如果子goroutine还没有返回结果,则一直等待。以下简单的代码示例说明了该模式的原理:
c := make(chan int) // future
go func() { c <- f() }() // async
value := <-c // await
可以针对 future 模式做一个统一的封装,方便后续使用,代码示例如下:
/* https://github.com/golang-collections/go-datastructures/blob/59788d5eb259/futures/futures.go
Package futures is useful for broadcasting an identical message to a multitude
of listeners as opposed to channels which will choose a listener at random
if multiple listeners are listening to the same channel. The future will
also cache the result so any future interest will be immediately returned
to the consumer.
*/
package main
import (
"fmt"
"sync"
"time"
)
// Completer is a channel that the future expects to receive
// a result on. The future only receives on this channel.
type Completer <-chan interface{}
// Future represents an object that can be used to perform asynchronous
// tasks. The constructor of the future will complete it, and listeners
// will block on getresult until a result is received. This is different
// from a channel in that the future is only completed once, and anyone
// listening on the future will get the result, regardless of the number
// of listeners.
type Future struct {
triggered bool // because item can technically be nil and still be valid
item interface{}
err error
lock sync.Mutex
wg sync.WaitGroup
}
// GetResult will immediately fetch the result if it exists
// or wait on the result until it is ready.
func (f *Future) GetResult() (interface{}, error) {
f.lock.Lock()
if f.triggered {
f.lock.Unlock()
return f.item, f.err
}
f.lock.Unlock()
f.wg.Wait()
return f.item, f.err
}
func (f *Future) setItem(item interface{}, err error) {
f.lock.Lock()
f.triggered = true
f.item = item
f.err = err
f.lock.Unlock()
f.wg.Done()
}
func listenForResult(f *Future, ch Completer, timeout time.Duration, wg *sync.WaitGroup) {
wg.Done()
select {
case item := <-ch:
f.setItem(item, nil)
case <-time.After(timeout):
f.setItem(nil, fmt.Errorf(`Timeout after %f seconds.`, timeout.Seconds()))
}
}
// New is the constructor to generate a new future. Pass the completed
// item to the toComplete channel and any listeners will get
// notified. If timeout is hit before toComplete is called,
// any listeners will get passed an error.
func New(completer Completer, timeout time.Duration) *Future {
f := &Future{}
f.wg.Add(1)
var wg sync.WaitGroup
wg.Add(1)
go listenForResult(f, completer, timeout, &wg)
wg.Wait()
return f
}
// 使用示例
func main() {
c := make(chan interface{})
go func() {
time.Sleep(time.Second)
c <- "hehe"
}()
f := New(c, time.Second*3)
res, err := f.GetResult()
fmt.Println(res, err)
}
管道模式(Pipeline Mode)¶
也称作流水线模式,一般有以下几个步骤:
流水线由一道道工序构成,每道工序通过通道把数据传递到下一个工序
每道工序一般会对应一个函数,函数里有协程和通道,协程一般用于处理数据并把它放入通道中,每道工序会返回这个通道以供下一道工序使用
最终要有一个组织者(示例中的main()函数)把这些工序串起来,这样就形成了一个完整的流水线,对于数据来说就是数据流
// 以组装计算机为例。三道工序:配件采购(Buy)-> 配件组装(Build) -> 打包成品(Pack)
func Buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
out <- fmt.Sprintf("配件%d", i)
}
}()
return out
}
func Build(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- fmt.Sprintf("组装(%s)", c)
}
}()
return out
}
func Pack(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- fmt.Sprintf("打包(%s)", c)
}
}()
return out
}
func main() {
accessories := Buy(6)
computers := Build(accessories)
packs := Pack(computers)
for p := range packs {
fmt.Println(p)
}
}
package main
import "fmt"
// 工序 1:数组生成器
func Generator(max int) <-chan int {
out := make(chan int, 100)
go func() {
for i := 1; i <= max; i++ {
out <- i
}
close(out)
}()
return out
}
// 工序 2:求整数的平方
func Square(in <-chan int) <-chan int {
out := make(chan int, 100)
go func() {
for v := range in {
out <- v * v
}
close(out)
}()
return out
}
// 工序 3:求和
func Sum(in <-chan int) <-chan int {
out := make(chan int, 100)
go func() {
var sum int
for v := range in {
sum += v
}
out <- sum
close(out)
}()
return out
}
func main() {
arr := Generator(5)
squ := Square(arr)
sum := <-Sum(squ)
fmt.Println(sum)
}
扇出和扇入模式(Fan-out Fan-in)¶
扇出(Fan-out)是指多个函数可以从同一个通道读取数据,直到该通道关闭。扇入(Fan-in)是指一个函数可以从多个输入中读取数据并继续进行, 直到所有输入都关闭。扇出和扇入模式的方法是将输入通道多路复用到一个通道上,当所有输入都关闭时,该通道才关闭。 扇出的数据流向是发散传递出去,是输出流;扇入的数据流向是汇聚进来,是输入流。
// 扇入函数,把多个channel 中的数据发送到一个 channel 中
func Merge(ins ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
p := func(in <-chan string) {
defer wg.Done()
for c := range in {
out <- c
}
}
wg.Add(len(ins))
// 扇入
for _, cs := range ins {
go p(cs)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
accessories := Buy(12)
computers1 := Build(accessories)
computers2 := Build(accessories)
computers3 := Build(accessories)
computers := Merge(computers1, computers2, computers3)
packs := Pack(computers)
for p := range packs {
fmt.Println(p)
}
}
协程池模式¶
即便 go 的协程比较轻量,但是当需要操作大量 goroutine 的时候,依然有内存开销和 GC 的压力。可以考虑使用协程池减少频繁创建销毁协程的开销。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// 任务处理器
type TaskHandler func(interface{})
// 任务结构体
type Task struct {
Param interface{}
Handler TaskHandler
}
// 协程池接口
type WorkerPoolImpl interface {
AddWorker()
SendTask(Task)
Release()
}
// 协程池
type WorkerPool struct {
wg sync.WaitGroup
inCh chan Task
}
func (d *WorkerPool) AddWorker() {
d.wg.Add(1)
go func() {
defer d.wg.Done()
for task := range d.inCh {
task.Handler(task.Param)
}
}()
}
func (d *WorkerPool) Release() {
close(d.inCh)
d.wg.Wait()
}
func (d *WorkerPool) SendTask(t Task) {
d.inCh <- t
}
func NewWorkerPool(buffer int) WorkerPoolImpl {
return &WorkerPool{
inCh: make(chan Task, buffer),
}
}
func main() {
bufferSize := 100
var workerPool = NewWorkerPool(bufferSize)
workers := 4
for i := 0; i < workers; i++ {
workerPool.AddWorker()
}
var sum int32
testFunc := func(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
}
var i, n int32
n = 100
for ; i < n; i++ {
task := Task{
i,
testFunc,
}
workerPool.SendTask(task)
}
workerPool.Release()
fmt.Println(sum) // 4950
}
发布订阅模式¶
基于消息通知的并发设计模式。发送者发送消息,订阅者通过订阅感兴趣的主题(Topic) 接收消息。
package main
import (
"fmt"
"strings"
"time"
)
import (
"sync"
)
type (
//订阅者通道
Subscriber chan interface{}
//主题函数
TopicFunc func(v interface{}) bool
)
//发布者结构体
type Publisher struct {
// subscribers 是程序的核心,订阅者都会注册在这里,
// publisher发布消息的时候也会从这里开始
subscribers map[Subscriber]TopicFunc
buffer int // 订阅者的缓冲区长度
timeout time.Duration // publisher 发送消息的超时时间
// m 用来保护 subscribers
// 当修改 subscribers 的时候(即新加订阅者或删除订阅者)使用写锁
// 当向某个订阅者发送消息的时候(即向某个 Subscriber channel 中写入数据),使用读锁
m sync.RWMutex
}
//实例化
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[Subscriber]TopicFunc),
}
}
//发布者订阅方法
func (p *Publisher) Subscribe() Subscriber {
return p.SubscribeTopic(nil)
}
//发布者订阅主题
func (p *Publisher) SubscribeTopic(topic TopicFunc) Subscriber {
ch := make(Subscriber, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
//Delete 删除掉某个订阅者
func (p *Publisher) Delete(sub Subscriber) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
//发布者发布
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
// 同时向所有订阅者写消息,订阅者利用 topic 过滤消息
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
//Close 关闭 Publisher,删除所有订阅者
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
//发送主题
func (p *Publisher) sendTopic(sub Subscriber, topic TopicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub <- v:
case <-time.After(p.timeout):
}
}
func main() {
//实例化
p := NewPublisher(100*time.Millisecond, 10)
defer p.Close()
// 订阅者订阅所有消息
all := p.Subscribe()
//订阅者仅订阅包含 golang 的消息
golang := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})
//发布消息
p.Publish("hello, world!")
p.Publish("hello, golang!")
//加锁
var wg sync.WaitGroup
wg.Add(2)
//开启goroutine
go func() {
for msg := range all {
_, ok := msg.(string)
fmt.Println(ok)
}
wg.Done()
}()
//开启goroutine
go func() {
for msg := range golang {
v, ok := msg.(string)
fmt.Println(v)
fmt.Println(ok)
}
wg.Done()
}()
p.Close()
wg.Wait()
}
参考:《Go 语言高级开发与实战》