概述
Go语言的协程(Goroutine)是一种相对线程而言更廉价的方式,虽然是轻量级的,但Goroutine太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 Goroutine的数量、重用显然很有价值。
解决方案
要解决这个问题,首先要考虑的是以下几点:
- Goroutine的数量如何限制
- Goroutine如何重用,不要频繁创建
- Goroutine如何执行,管理等
关于限制数量和重用
说到限制和重用, 那么最先想到的就是池化(如 JDBC池、线程池等都是最佳实践)所以我们也来造一个轮子,来实现一个轻量级的协程池(暂且不管市面上已有很成熟的项目,本文只是为了技术研究)
任务如何执行
在使用原生goroutine的场景中, 运行一个任务直接启动一个goroutine来运行, 在池化的场景而言, 任务也是要在goroutine中执行, 但是任务需要任务池来放入 goroutine。
使用生产者消费者模型
在一般连接池中, 连接在使用时从池中取出, 用完后放入池中即可。
但是对于goroutine而言, 其通过语言关键字启动, 无法像其他语言那么方便了。那么如何让goroutine可以执行任务, 且执行后可以重新用来执行其它任务呢?这里就需要使用生产者消费者模型了:
生产者 –(生产任务)–> 队列 –(消费任务)–> 消费者
用来执行任务的goroutine可以作为消费者, 操作任务池的goroutine作为生产者, 而队列则可以使用buffer channel。
具体实现
定义 Task
1
2
3
4
5
6
7
8
|
type Task struct {
// 任务名
Name string
// 任务回调函数
Handler func(v ...interface{})
// 任务回调函数参数(如果有)
Params []interface{}
}
|
定义 TaskPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type State int32
const (
StateRunning State = iota
StateStopped
)
type TaskPool struct {
// 最大容量
capacity int32
// 协程池状态
state State
// 运行中的任务个数
runningTasks int32
// 任务管道
taskChannel chan *Task
}
|
构造函数
1
2
3
4
5
6
7
8
9
10
|
func NewTaskPool(capacity int32) (*TaskPool, error) {
if capacity <= 0 {
return nil, errors.New("capacity less than 0")
}
return &TaskPool{
capacity: capacity,
state: StateRunning,
taskChannel: make(chan *Task, capacity),
}, nil
}
|
启动 worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (tp *TaskPool) run() {
// 运行中的数量+1
atomic.AddInt32(&tp.runningTasks, 1)
go func() {
defer func() {
// 运行中的任务-1
atomic.AddInt32(&tp.runningTasks, -1)
// 错误收集(暂时只打印)
if err := recover(); err != nil {
log.Printf("Worker error: %sn", err)
}
}()
for {
select {
case task, ok := <-tp.taskChannel:
if ok {
log.Printf("Task[%s] start execution...n", task.Name)
task.Handler(task.Params...)
}
}
}
}()
}
|
上述代码中, runningTasks的加减运算使用sync.atomic包来保证其自增操作是原子的。
生产任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (tp *TaskPool) AddTask(task *Task) error {
if tp.state == StateStopped {
return errors.New("task pool is closed")
}
runningTasks := atomic.LoadInt32(&tp.runningTasks)
capacity := atomic.LoadInt32(&tp.capacity)
// 如果当前运行的任务小于协程池最大限制,则通知消费者开始消费
if runningTasks < capacity {
tp.run() // 消费者启动
}
// 生产者将 task 放入管道
tp.taskChannel <- task
return nil
}
|
安全(优雅)关闭
1
2
3
4
5
6
7
8
9
|
func (tp *TaskPool) Close() {
tp.state = StateStopped
for { // 一直阻塞,直到协程池中的所有 Task 被消费完毕,再关闭管道
if len(tp.taskChannel) <= 0 {
close(tp.taskChannel)
return
}
}
}
|
使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func TestTaskPool(t *testing.T) {
// 新建协程池
taskPool, err := NewTaskPool(10)
if err != nil {
panic(err)
}
// 提交 100 个任务,等待执行完成
waitGroup := &sync.WaitGroup{}
for i := 0; i < 100; i++ {
waitGroup.Add(1)
_ = taskPool.AddTask(&Task{
Name: fmt.Sprintf("Demo-%d", i),
Handler: func(v ...interface{}) {
defer waitGroup.Done()
fmt.Printf("Hello, %s n", v[0])
},
Params: []interface{}{fmt.Sprintf("name-%d", i)},
})
}
waitGroup.Wait()
taskPool.Close() // 安全关闭协程池
}
|
TODO
- 协程的复用(在实现中没有做到,目前来说只做到了限流)
- 性能测试(对比原生Goroutine性能如何,暂且未知,后续可以着重测试一下)
以上就是协程池的极简封装了,当然就目前来说,只是简单按照原理实现了一遍,可能还有很多细节需要完善的,这里就不再继续下去了。
评论