任务调度神器python celery分布式任务队列

2021-07-12 20:43 php7

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。运行模式是生产者消费者模式:


图片

Celery的架构


Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。


消息中间件


Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,Redis,MongoDB等,这里我先去了解RabbitMQ,Redis。


任务执行单元


Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中


任务结果存储


Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等,这里我先不去看它是如何存储的,就先选用Redis来存储任务执行结果。

图片

1、安装celery:


pip install celery

1

Celery 起初可能令人却步——但不要担心——此教程会快速带你入门。


2、选择消息中间件


在Celery帮助文档中称呼为中间人:broker

Celery 需要一个发送和接收消息的解决方案,其通常以独立服务形式出现, 称为 消息中间人 。俗称消息队列,

可行的选择包括:

RabbitMQ

Redis

mysql

oracle

mongodb

3、应用demo:


redis 安装使用这里就不作描述了,如果对这块有不清楚的同学,先了解下REDIS的基础使用。

这里有一点需要注意:

如果你使用的REDIS需要密码,broker链接需要使用下面这种方式

broker = 'redis://:%s@%s:6379/1' % (Config.REDIS_PASSWORD, Config.REDIS_HOST)

注意: 双斜线后面还有一个冒号,否则会报鉴权失败的问题


为了运行一个简单的任务,从中说明 celery 的使用方式。在项目文件夹内创建 app.py 和 tasks.py 。tasks.py 用来定义任务:

tasks.py


broker 指定任务队列的消息中间件,backend 指定了任务执行结果的存储。app 就是我们创建的 Celery 对象。通过 app.task 修饰器将 add 函数变成一个一部的任务。


tasks.py

import timefrom celery import Celery

broker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'app = Celery('my_tasks', broker=broker, backend=backend)

@app.taskdef add(x, y): print('enter task') time.sleep(3) return x + y



app.py

add.delay 函数将任务序列化发送到消息中间件

from tasks import add

if __name__ == '__main__': print('start task') result = add.delay(3, 18) print('end task') print(result)



执行app.py


生产任务:

python app.py


运行结果:

start taskend taskbcdbf6e7-7d5f-4dd9-a4ba-8809172bf150Process finished with exit code 0



消费任务:

可以看到输出一个任务的唯一识别,这个只是将任务推送到 redis,任务还没被消费,任务会在 celery 队列中。

开启 celery woker 可以将任务进行消费,下面开启消费:


celery worker -A tasks -l info   # -A 后是模块名

新版本的celery 命令有修改:

 celery -A compute_tasks worker -l info



如果一切顺利,woker 正常启动,就能在终端看到任务被消费了,成功看到了消费了任务。

本文章转载自公众号:php-is-best

首页 - php 相关的更多文章: