Skip to content

定时任务

youlai-django 支持 XXL-JOB 分布式定时任务调度。

XXL-JOB 简介

XXL-JOB 是一款轻量级分布式任务调度平台,支持任务的动态管理、故障转移、分片执行等特性。

环境配置

配置文件

env
# XXL-Job 配置
XXL_JOB_ENABLED=false
XXL_JOB_ADMIN_ADDRESSES=http://127.0.0.1:8686/xxl-job-admin
XXL_JOB_ACCESS_TOKEN=default_token
XXL_JOB_EXECUTOR_APP_NAME=xxl-job-executor-youlai-django
XXL_JOB_EXECUTOR_PORT=9999
XXL_JOB_LOG_PATH=/data/applogs/xxl-job/jobhandler
XXL_JOB_LOG_RETENTION_DAYS=30
配置项默认值说明
XXL_JOB_ENABLEDfalse是否启用
XXL_JOB_ADMIN_ADDRESSES-调度中心地址
XXL_JOB_ACCESS_TOKENdefault_token访问令牌
XXL_JOB_EXECUTOR_APP_NAME-执行器名称
XXL_JOB_EXECUTOR_PORT9999执行器端口

Django Settings

python
# config/settings/base.py
XXL_JOB = {
    'ENABLED': os.environ.get('XXL_JOB_ENABLED', 'false').lower() == 'true',
    'ADMIN_ADDRESSES': os.environ.get('XXL_JOB_ADMIN_ADDRESSES', ''),
    'ACCESS_TOKEN': os.environ.get('XXL_JOB_ACCESS_TOKEN', 'default_token'),
    'EXECUTOR_APP_NAME': os.environ.get('XXL_JOB_EXECUTOR_APP_NAME', 'xxl-job-executor-youlai-django'),
    'EXECUTOR_PORT': int(os.environ.get('XXL_JOB_EXECUTOR_PORT', 9999)),
}

调度中心部署

Docker 部署

bash
docker run -d --name xxl-job-admin \
  -e PARAMS="--spring.datasource.url=jdbc:mysql://mysql:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8 --spring.datasource.username=root --spring.datasource.password=123456" \
  -p 8686:8080 \
  xuxueli/xxl-job-admin:2.4.0

访问调度中心

任务开发

Python 任务处理器

python
from xxl_job import XxlJobHandler

@XxlJobHandler(job_name='demo_job')
def demo_job():
    """示例任务"""
    print("执行定时任务...")
    # 业务逻辑
    return "success"

带参数的任务

python
from xxl_job import XxlJobHandler, XxlJobHelper

@XxlJobHandler(job_name='param_job')
def param_job():
    """带参数的任务"""
    # 获取任务参数
    param = XxlJobHelper.get_job_param()
    print(f"任务参数: {param}")
    
    # 获取分片参数
    shard_index = XxlJobHelper.get_shard_index()
    shard_total = XxlJobHelper.get_shard_total()
    print(f"分片: {shard_index}/{shard_total}")

分片任务

python
@XxlJobHandler(job_name='sharding_job')
def sharding_job():
    """分片任务"""
    shard_index = XxlJobHelper.get_shard_index()
    shard_total = XxlJobHelper.get_shard_total()
    
    # 根据分片处理数据
    # SELECT * FROM orders WHERE MOD(id, shard_total) = shard_index
    pass

Cron 表达式

表达式说明
0 2 * * *每天凌晨 2 点
0 */5 * * * *每 5 分钟
0 0 1 * *每月 1 号 0 点
0 9-17 * * 1-5工作日 9-17 点

最佳实践

任务幂等性

python
@XxlJobHandler(job_name='order_timeout_job')
def order_timeout_job():
    """订单超时取消(幂等)"""
    # 只查询待支付订单
    orders = Order.objects.filter(status='pending', created_at__lt=timeout_time)
    
    for order in orders:
        # 使用乐观锁确保幂等
        rows = Order.objects.filter(id=order.id, status='pending').update(status='cancelled')
        if rows > 0:
            print(f"订单 {order.id} 已取消")

异常处理

python
@XxlJobHandler(job_name='sync_job')
def sync_job():
    """数据同步任务"""
    try:
        # 业务逻辑
        sync_data()
        XxlJobHelper.handle_success("同步成功")
    except Exception as e:
        XxlJobHelper.handle_fail(f"同步失败: {str(e)}")

相关资源

基于 MIT 许可发布