定时任务
youlai-django 支持 XXL-JOB 分布式定时任务调度。
XXL-JOB 简介
XXL-JOB 是一款轻量级分布式任务调度平台,支持任务的动态管理、故障转移、分片执行等特性。
环境配置
配置文件
env
# XXL-Job 配置
XXL_JOB_ENABLED=false
XXL_JOB_ADMIN_ADDRESSES=http://127.0.0.1:8686/xxl-job-admin
XXL_JOB_ACCESS_TOKEN=default_token
XXL_JOB_EXECUTOR_APP_NAME=xxl-job-executor-youlai-django
XXL_JOB_EXECUTOR_PORT=9999
XXL_JOB_LOG_PATH=/data/applogs/xxl-job/jobhandler
XXL_JOB_LOG_RETENTION_DAYS=30| 配置项 | 默认值 | 说明 |
|---|---|---|
XXL_JOB_ENABLED | false | 是否启用 |
XXL_JOB_ADMIN_ADDRESSES | - | 调度中心地址 |
XXL_JOB_ACCESS_TOKEN | default_token | 访问令牌 |
XXL_JOB_EXECUTOR_APP_NAME | - | 执行器名称 |
XXL_JOB_EXECUTOR_PORT | 9999 | 执行器端口 |
Django Settings
python
# config/settings/base.py
XXL_JOB = {
'ENABLED': os.environ.get('XXL_JOB_ENABLED', 'false').lower() == 'true',
'ADMIN_ADDRESSES': os.environ.get('XXL_JOB_ADMIN_ADDRESSES', ''),
'ACCESS_TOKEN': os.environ.get('XXL_JOB_ACCESS_TOKEN', 'default_token'),
'EXECUTOR_APP_NAME': os.environ.get('XXL_JOB_EXECUTOR_APP_NAME', 'xxl-job-executor-youlai-django'),
'EXECUTOR_PORT': int(os.environ.get('XXL_JOB_EXECUTOR_PORT', 9999)),
}调度中心部署
Docker 部署
bash
docker run -d --name xxl-job-admin \
-e PARAMS="--spring.datasource.url=jdbc:mysql://mysql:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8 --spring.datasource.username=root --spring.datasource.password=123456" \
-p 8686:8080 \
xuxueli/xxl-job-admin:2.4.0访问调度中心
- 访问地址:http://localhost:8686/xxl-job-admin
- 默认账号:admin
- 默认密码:123456
任务开发
Python 任务处理器
python
from xxl_job import XxlJobHandler
@XxlJobHandler(job_name='demo_job')
def demo_job():
"""示例任务"""
print("执行定时任务...")
# 业务逻辑
return "success"带参数的任务
python
from xxl_job import XxlJobHandler, XxlJobHelper
@XxlJobHandler(job_name='param_job')
def param_job():
"""带参数的任务"""
# 获取任务参数
param = XxlJobHelper.get_job_param()
print(f"任务参数: {param}")
# 获取分片参数
shard_index = XxlJobHelper.get_shard_index()
shard_total = XxlJobHelper.get_shard_total()
print(f"分片: {shard_index}/{shard_total}")分片任务
python
@XxlJobHandler(job_name='sharding_job')
def sharding_job():
"""分片任务"""
shard_index = XxlJobHelper.get_shard_index()
shard_total = XxlJobHelper.get_shard_total()
# 根据分片处理数据
# SELECT * FROM orders WHERE MOD(id, shard_total) = shard_index
passCron 表达式
| 表达式 | 说明 |
|---|---|
0 2 * * * | 每天凌晨 2 点 |
0 */5 * * * * | 每 5 分钟 |
0 0 1 * * | 每月 1 号 0 点 |
0 9-17 * * 1-5 | 工作日 9-17 点 |
最佳实践
任务幂等性
python
@XxlJobHandler(job_name='order_timeout_job')
def order_timeout_job():
"""订单超时取消(幂等)"""
# 只查询待支付订单
orders = Order.objects.filter(status='pending', created_at__lt=timeout_time)
for order in orders:
# 使用乐观锁确保幂等
rows = Order.objects.filter(id=order.id, status='pending').update(status='cancelled')
if rows > 0:
print(f"订单 {order.id} 已取消")异常处理
python
@XxlJobHandler(job_name='sync_job')
def sync_job():
"""数据同步任务"""
try:
# 业务逻辑
sync_data()
XxlJobHelper.handle_success("同步成功")
except Exception as e:
XxlJobHelper.handle_fail(f"同步失败: {str(e)}")