在后端服务中,我们经常需要异步执行那些耗时任务——发送邮件、生成缩略图、导出报表、调用外部 API,把这些操作同步阻塞住,用户体验砸了,服务器资源也跟着白费。
这时候,异步任务队列就成了标配。而在 Go 语言里,asynq 这个库就是干这件事的——基于 Redis,可靠性高,用起来还特别顺手。
一、什么是 Asynq?为什么要用它?
简单说,它就是一个生产级的异步任务队列库,专门用来处理那些你不想让请求等着的活儿。比如:
发送用户激活邮件、图片或视频处理、定时任务调度,还有一些需要延迟执行的操作(像订单超时取消这种)。
二、Asynq 的基本工作原理
Asynq 的设计里,三个角色缺一不可:
Client(生产者)——创建任务并扔进队列;Server(消费者/worker)——从队列里取出任务再去执行;Redis——充当消息中介,存任务、协调两边通信。
整个流程大致是这样:
Client → Redis(任务入队)→ Server 从 Redis 取出任务 → Worker 处理任务
三、准备工作:Redis 环境与依赖安装
用 Asynq 前,你得先有个 Redis 服务器(版本 ≥ 4.0)。不想手动装?用 Docker 一把梭:
docker run -d -p 6379:6379 redis:latest
然后初始化 Go 项目:
mkdir asynq-demo
cd asynq-demo
go mod init github.com/yourname/asynq-demo
装库:
go get github.com/hibiken/asynq
四、定义任务与创建任务
1. 任务的概念
在 Asynq 里,任务由“类型”和“负载”组成。类型用来区分任务,比如 "email:send";负载就是任务数据,通常用 JSON 编码,带上用户 ID、文件 URL 这些信息。
2. 示例:创建一个发送邮件的任务
在 tasks/tasks.go 里写:
package tasks
import ( "encoding/json" "github.com/hibiken/asynq" )
// 任务类型
const TypeEmailSend = "email:send"
// 邮件发送所需数据
type EmailPayload struct { To string Subject string Body string }
// 创建一个发送邮件任务
func NewEmailTask(to, subject, body string) (*asynq.Task, error) {
p := EmailPayload{ To: to, Subject: subject, Body: body }
data, err := json.Marshal(p)
if err != nil { return nil, err }
return asynq.NewTask(TypeEmailSend, data), nil
}
这个函数里,数据先序列化成 JSON,然后交给 asynq.NewTask 生成任务对象。
五、将任务加入队列(Client 端)
写 main.go:
package main
import ( "log" "github.com/hibiken/asynq" "yourmodule/tasks" )
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "127.0.0.1:6379" })
defer client.Close()
task, err := tasks.NewEmailTask("alice@example.com", "Hello", "Welcome to Asynq!")
if err != nil { log.Fatalf("无法创建任务: %v", err) }
info, err := client.Enqueue(task)
if err != nil { log.Fatalf("无法入队任务: %v", err) }
log.Printf("任务已入队 ID=%s 队列=%s", info.ID, info.Queue)
}
跑一下 go run main.go,正常的话会看到任务成功入队的日志。
六、处理任务(Server 端)
任务入队只是第一步,还得有 worker 从队列里取出来并执行真正的逻辑。
1. Handler 定义
在 worker/worker.go 中:
package main
import ( "context" "encoding/json" "log" "github.com/hibiken/asynq" "yourmodule/tasks" )
func handleEmailSend(ctx context.Context, t *asynq.Task) error {
var payload tasks.EmailPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
log.Println("Payload 解析失败:", err)
return err
}
log.Printf("发送邮件给: %s 标题: %s", payload.To, payload.Subject)
// TODO: 真实邮件发送逻辑
return nil
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{ Addr: "127.0.0.1:6379" },
asynq.Config{
Concurrency: 5, // 并发 worker 数量
Queues: map[string]int{ "default": 1 }, // 队列优先级
},
)
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeEmailSend, handleEmailSend)
if err := srv.Run(mux); err != nil {
log.Fatal("Worker 运行失败:", err)
}
}
运行 go run worker/worker.go,worker 就会从 Redis 中取出任务,执行 handleEmailSend。
七、调度任务(延迟 / 定时)
Asynq 支持延迟执行,写法很直观:
info, err := client.Enqueue(task, asynq.ProcessIn(10*time.Second))
上面这段让任务在 10 秒后 执行。也可以用 asynq.ProcessAt(time) 指定绝对时间。
八、失败重试与超时控制
默认情况下,handler 返回错误时任务会自动重试(最多 25 次,指数退避)。你完全可以自定义:
client.Enqueue(task,
asynq.MaxRetry(5),
asynq.Timeout(30*time.Second),
)
这样就能控制重试次数和单次执行的最长时间,避免某个任务卡死整个 worker。
九、任务优先级队列
你还可以定义多个队列,并设置不同权重:
srv := asynq.NewServer(
asynq.RedisClientOpt{ Addr: "127.0.0.1:6379" },
asynq.Config{
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
},
)
权重高的队列会被更频繁地消费,特别适合区分紧急任务和普通任务。
十、可视化与监控
Asynq 提供了专门的 Web UI 和 CLI 工具,方便你实时查看队列状态:
go install github.com/hibiken/asynq/tools/asynq@latest
asynq stats # 显示队列统计信息
asynq dash # 启动交互式监控界面
Web UI 可以直观地看到任务分布和处理情况。

十一、其他常用功能
任务重试(Retry)
默认 handler 返回错误就自动重试,默认 25 次,指数退避。你可以通过 asynq.MaxRetry(3) 修改,或用 asynq.SkipRetry 错误来跳过那些重试也没意义的任务(比如数据格式错误)。
唯一任务(Unique)
防止任务重复入队。比如确保 1 分钟内不给同一个用户发两封相同邮件:asynq.Unique(1 * time.Minute)
超时控制(Timeout)
任务执行太久?直接强制超时:asynq.Timeout(10 * time.Second)
如果你正在构建微服务、后台任务系统,或者需要提升系统性能和可伸缩性,Asynq 是一个非常值得尝试的工具。
