Go协程泄漏详解
什么是协程泄漏?
协程泄漏(goroutine leak)是指协程启动后无法正常退出,持续占用内存和CPU资源,最终导致程序性能下降甚至崩溃。
常见泄漏场景及解决方案
1. 无缓冲通道阻塞泄漏
// ❌ 错误示例:发送/接收方不匹配导致泄漏
func leak1() {
ch := make(chan int)
go func() {
val := <-ch // 阻塞等待数据,但无人发送
fmt.Println(val)
}()
// 主函数退出,协程永远阻塞
}
// ✅ 正确方案:使用context控制超时
func safe1(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done(): // 可被取消
fmt.Println("goroutine exited")
}
}()
}2. 通道未关闭导致接收方阻塞
// ❌ 错误示例
func leak2() {
ch := make(chan int)
go func() {
for val := range ch { // 等待通道关闭
fmt.Println(val)
}
}()
// 忘记关闭通道,协程永远等待
}
// ✅ 正确方案:明确关闭通道
func safe2() {
ch := make(chan int)
done := make(chan struct{})
go func() {
defer close(done)
for val := range ch {
fmt.Println(val)
}
}()
// 业务逻辑...
close(ch) // 明确关闭通道
<-done // 等待协程退出
}3. 死循环无退出条件
// ❌ 错误示例
func leak3() {
go func() {
for { // 无限循环,无退出机制
// 处理任务...
}
}()
}
// ✅ 正确方案:使用退出信号
func safe3() {
quit := make(chan struct{})
go func() {
defer fmt.Println("goroutine exited")
for {
select {
case <-quit: // 收到退出信号
return
default:
// 处理任务
time.Sleep(time.Second)
}
}
}()
// 需要退出时
close(quit)
time.Sleep(time.Second) // 等待协程退出
}4. WaitGroup使用不当
// ❌ 错误示例:Add和Done不匹配
func leak4() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
if id == 1 {
return // 直接返回,没有调用wg.Done()
}
defer wg.Done()
// 处理任务
}(i)
}
wg.Wait() // 永远等待
}
// ✅ 正确方案:确保Done被调用
func safe4() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done() // 使用defer确保执行
if id == 1 {
return // 即使return,defer也会执行
}
// 处理任务
}(i)
}
wg.Wait()
}5. 使用context进行超时控制
// ✅ 推荐方案:使用context管理协程生命周期
func processWithTimeout(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := make(chan error, 1)
go func() {
// 模拟耗时操作
time.Sleep(10 * time.Second)
result <- nil
}()
select {
case <-ctx.Done():
return ctx.Err() // 超时或取消
case err := <-result:
return err // 正常完成
}
}检测和调试协程泄漏
1. 使用runtime包监控
func monitorGoroutines() {
go func() {
for {
time.Sleep(10 * time.Second)
num := runtime.NumGoroutine()
fmt.Printf("当前协程数量: %d\n", num)
if num > 100 { // 设置阈值报警
log.Printf("警告: 协程数量过多: %d", num)
// 可以打印堆栈信息
buf := make([]byte, 1<<16)
stackSize := runtime.Stack(buf, true)
fmt.Printf("堆栈信息:\n%s\n", buf[:stackSize])
}
}
}()
}2. 使用pprof分析
import (
_ "net/http/pprof"
"net/http"
)
func startProfiling() {
go func() {
http.ListenAndServe(":6060", nil)
}()
}
// 使用方式:
// 1. 访问 http://localhost:6060/debug/pprof/goroutine?debug=1
// 2. 使用go tool pprof分析3. 编写测试检测泄漏
func TestGoroutineLeak(t *testing.T) {
initial := runtime.NumGoroutine()
// 执行被测函数
myFunction()
// 等待一段时间让协程有机会退出
time.Sleep(100 * time.Millisecond)
// 强制GC,清理已完成的协程
runtime.GC()
final := runtime.NumGoroutine()
if final > initial {
t.Errorf("检测到协程泄漏: 初始%d, 最终%d", initial, final)
}
}最佳实践
1. 使用worker池模式
type WorkerPool struct {
tasks chan func()
quit chan struct{}
wg sync.WaitGroup
}
func NewWorkerPool(size int) *WorkerPool {
pool := &WorkerPool{
tasks: make(chan func(), 100),
quit: make(chan struct{}),
}
for i := 0; i < size; i++ {
pool.wg.Add(1)
go pool.worker()
}
return pool
}
func (p *WorkerPool) worker() {
defer p.wg.Done()
for {
select {
case task := <-p.tasks:
task()
case <-p.quit:
return
}
}
}
func (p *WorkerPool) Close() {
close(p.quit)
p.wg.Wait()
}2. 统一的协程管理
type GoroutineManager struct {
goroutines []func(context.Context)
ctx context.Context
cancel context.CancelFunc
}
func NewGoroutineManager() *GoroutineManager {
ctx, cancel := context.WithCancel(context.Background())
return &GoroutineManager{
ctx: ctx,
cancel: cancel,
}
}
func (gm *GoroutineManager) Launch(name string, fn func(context.Context)) {
gm.goroutines = append(gm.goroutines, fn)
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("goroutine %s panicked: %v", name, r)
}
}()
fn(gm.ctx)
}()
}
func (gm *GoroutineManager) Shutdown(timeout time.Duration) {
gm.cancel()
// 等待所有协程退出
done := make(chan struct{})
go func() {
time.Sleep(timeout)
close(done)
}()
<-done
}3. 使用errgroup管理相关协程
import "golang.org/x/sync/errgroup"
func processConcurrently(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
// 启动多个协程
for i := 0; i < 5; i++ {
i := i
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return doWork(i)
}
})
}
// 等待所有协程完成,或第一个错误
return g.Wait()
}总结
协程泄漏的预防需要:
- 明确生命周期:每个协程都应有清晰的退出路径
- 使用context:统一管理取消和超时
- 资源清理:确保通道关闭、WaitGroup匹配
- 监控和测试:定期检查协程数量,编写泄漏测试
- 代码审查:特别注意协程的退出逻辑
通过良好的设计和规范,可以显著减少协程泄漏的发生。
