2澳门新蒲京平台.任务场景

一、简介

  Celery是由Python开辟、轻巧、灵活、可信赖的布满式职分队列,其本质是生产者消费者模型,生产者发送职分到音信队列,费用者承受管理任务。Celery侧重于实时操作,但对调节辅助也很好,其每一日能够拍卖数以百万计的职务。特点爬山涉水

  • 归纳跋山涉水的近义词领悟celery的做事流程后,配置利用简易
  • 高可用:当义务实践倒闭或施行进度中生出三番五次中断,celery会自动尝试再次实践职责
  • 迅猛爬山涉水三个单进程的celery每分钟可管理上百万个职分
  • 灵活爬山涉水差不多celery的逐一零部件都得以被扩张及自定制

利用场景举例爬山涉水

  1.web利用爬山涉水当客商在网站实行有个别操作供给不长日子成功时,大家得以将这种操作交给Celery实行,直接重返给顾客,等到Celery实施到位之后公告顾客,大大提好网址的出现以致客户的体验感。

  2.职分场景爬山涉水例如在运营场景下必要批量在几百台机械推行某个命令或然任务,此时Celery能够轻易解决。

  3.定期职责跋山涉水的近义词向准时导数据报表、依期发送文告相近情状,就算Linux的计划职务能够帮本身达成,不过足够不便于处理,而Celery能够提供管理接口和加多的API。

二、框架结构&工作原理

  Celery由以下三局地构成跋山涉水的近义词音信中间件(Broker)、职责试行单元Worker、结果存款和储蓄(Backend),如下图爬山涉水

  澳门新蒲京平台 1

行事规律跋山涉水的近义词

  1. 职责模块Task饱含异步职责和依期职责。此中,异步任务常常在事情逻辑中被触发并发往音信队列,而按期任务由Celery
    Beat进度周期性地将任务发往信息队列;
  2. 任务实行单元Worker实时监视音信队列获取队列中的职务施行;
  3. Woker实践完职务后将结果保存在Backend中;

新闻中间件Broker

  音讯中间件Broker官方提供了数不完预备方案,扶植RabbitMQ、Redis、AmazonSQS、MongoDB、Memcached 等,官方推荐RabbitMQ。

职分实行单元Worker

  Worker是职分试行单元,担负从音信队列中抽取义务实行,它能够运转一个照旧八个,也足以运行在区别的机械节点,这正是其达成布满式的基本。

结果存款和储蓄Backend

  Backend结果存款和储蓄官方也提供了不菲的蕴藏情势协理:RabbitMQ、 Redis、Memcached,SQLAlchemy,
Django ORM、Apache Cassandra、Elasticsearch。

三、安装使用 

  此处本身使用的redis作为音讯中间件,redis安装能够参照

Celery安装: 

pip3 install celery

轻巧易行利用

  目录结构爬山涉水

project/
├── __init__.py  
├── config.py
└── tasks.py

各目录文件表达跋山涉水的近义词

__init__.py:开始化Celery以至加载配置文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from celery import Celery
app = Celery('project')                                # 创建 Celery 实例
app.config_from_object('project.config')               # 加载配置模块

config.py: 
Celery相关配置文件,更加多配备参照他事他说加以考察爬山涉水

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
)

tasks.py 爬山涉水职分定义文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
@app.task
def show_name(name):
    return name

启动Worker:

celery worker -A project -l debug

逐个参数含义跋山涉水的近义词

  worker: 代表第运营的剧中人物是work当然还可能有beat等任何剧中人物;

  -A 跋山涉水的近义词项目路径,这里小编的目录是project

  -l爬山涉水运营的日记等第,越来越多参数使用celery –help查看

查看日志输出,会意识大家定义的职分,以致相关安排:

澳门新蒲京平台 2

 

  尽管起步了worker,不过大家还须要经过delay或apply_async来将义务增多到worker中,这里大家经过交互式方法增添职分,并回到AsyncResult对象,通过AsyncResult对象得到结果跋山涉水的近义词

澳门新蒲京平台 3

AsyncResult除了get方法用于常用获取结果方法外还提以下常用艺术或质量跋山涉水的近义词

  • state: 重返职责意况;
  • task_id: 再次回到职责id;
  • result: 再次回到任务结果,同get()方法;
  • ready(): 判别任务是或不是以致有结果,有结果为True,不然False;
  • info(): 获取职分消息,默感觉结果;
  • wait(t):
    等待t秒后获得结果,若职分执行完成,则不等待间接拿走结果,若职责在实践中,则wait时期一直不通,直到超时报错;
  • successfu(): 判别职分是不是中标,成功为True,不然为False;

四、进级使用

  对于日常的天职的话恐怕满意不断我们的职必得要,所以还亟需明白部分进级用法,Celery提供了众多调治措施,比如职务编排、依照义务状态实践分歧的操作、重试机制等,以下会对常用高阶用法举行描述。

准期义务&安插职分

  Celery的提供的准期义务重大靠schedules来成功,通过beat组件周期性将任务发送给woker推行。在演示中,新建文件period_task.py,并累计职务到布署文件中跋山涉水的近义词

period_task.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒执行add
    sender.add_periodic_task(
        crontab(hour=16, minute=56, day_of_week=1),      #每周一下午四点五十六执行sayhai
        sayhi.s('wd'),name='say_hi'
    )



@app.task
def add(x,y):
    print(x+y)
    return x+y


@app.task
def sayhi(name):
    return 'hello %s' % name

config.py

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task', #定时任务
)

启动worker和beat:

celery worker -A project -l debug #启动work
celery beat -A  project.period_task -l  debug #启动beat,注意此时对应的文件路径

咱俩得以洞察worker日志跋山涉水的近义词

澳门新蒲京平台 4

仍然是能够透过安插文件措施钦命准时和安排职务,此时的配置文件如下跋山涉水的近义词

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
    'period_add_task': {    # 计划任务
        'task': 'project.period_task.add',  #任务路径
        'schedule': crontab(hour=18, minute=16, day_of_week=1),
        'args': (3, 4),
    },
'add-every-30-seconds': {          # 每10秒执行
        'task': 'project.period_task.sayhi',  #任务路径
        'schedule': 10.0,
        'args': ('wd',)
    },
}

此时的period_task.py只需求登记到woker中就行了,如下爬山涉水

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    print(x+y)
    return x+y


@app.task
def sayhi(name):
    return 'hello %s' % name

同等运维worker和beat结果和率先种艺术相近。越来越多详细的源委请参考爬山涉水

职务绑定

  Celery可通过职分绑定到实例获取到职务的上下文,那样我们得以在职分局转时候取获得职分的景况,记录相关日志等。

改正职责中的period_task.py,如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
@app.task(bind=True)  # 绑定任务
def add(self,x,y):
    logger.info(self.request.__dict__)  #打印日志
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=3) # 出错每5秒尝试一次,总共尝试3次
    return x+y

在以上代码中,通过bind参数将职分绑定,self支职责的上下文,通过self获取职务情形,同不经常间在职分出错开上下班时间举办职务重试,我们着重日志跋山涉水的近义词

澳门新蒲京平台 5

内置钩子函数

  Celery在实践任务时候,提供了钩子方法用于在任务实行到位时候实行对应的操作,在Task源码中提供了重重动静钩子函数如跋山涉水的近义词on_success(成功后进行)、on_failure(战败时候实施)、on_retry(任务重试时候试行)、after_return(职责再次来到时候实践),在开展利用是我们只须要重写这个点子,完毕相应的操作就可以。

在偏下示例中,大家后续修改period_task.py,分别定义多个职分来演示任务战败、重试、任务成功后施行的操作跋山涉水的近义词

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger
from celery import Task

logger = get_task_logger(__name__)

class demotask(Task):

    def on_success(self, retval, task_id, args, kwargs):   # 任务成功执行
        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))



    def on_failure(self, exc, task_id, args, kwargs, einfo):  #任务失败执行
        logger.info('task id:{} , arg:{} , failed ! erros : {}' .format(task_id,args,exc))


    def on_retry(self, exc, task_id, args, kwargs, einfo):    #任务重试执行
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

@app.task(base=demotask,bind=True)
def add(self,x,y):
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=1) # 出错每5秒尝试一次,总共尝试1次
    return x+y

@app.task(base=demotask)
def sayhi(name):
    a=[]
    a[10]==1
    return 'hi {}'.format(name)

@app.task(base=demotask)
def sum(a,b):
    return 'a+b={} '.format(a+b)

那儿的配备文件config.py跋山涉水的近义词

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
'add': {          # 每10秒执行
        'task': 'project.period_task.add',  #任务路径
        'schedule': 10.0,
        'args': (10,12),
    },
'sayhi': {          # 每10秒执行
        'task': 'project.period_task.sayhi',  #任务路径
        'schedule': 10.0,
        'args': ('wd',),
    },
'sum': {          # 每10秒执行
        'task': 'project.period_task.sum',  #任务路径
        'schedule': 10.0,
        'args': (1,3),
    },
}

下一场重启worker和beat,查看日志爬山涉水

澳门新蒲京平台 6

 

职务编排

  在许多处境下,一个任务急需由多少个子义务依旧多少个职分需求多多步骤技能成就,Celery一样也能兑现如此的任务,完结那项目标天职通过以下模块形成跋山涉水的近义词

  • group: 并行调节职务

  • chain: 链式职务调节

  • chord:
    雷同group,但分header和body2个部分,header能够是三个group任务,实施到位后调用body的天职

  • map: 映射调解,通过输入五个入参来多次调整同贰个职分

  • starmap: 相似map,入参近似*args

  • chunks: 将职分遵照一定数量进行分组

 

修改tasks.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    return x+y


@app.task
def mul(x,y):
    return x*y


@app.task
def sum(data_list):
    res=0
    for i in data_list:
        res+=i
    return res

 

group: 组职分,组内每一个任务并行实践

和project同级目录新建consumer.py如下:

from celery import group
from project.tasks import add,mul,sum
res = group(add.s(1,2),add.s(1,2))()  # 任务 [1+2,1+2] 
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

结果:

澳门新蒲京平台 7

 

chain跋山涉水的近义词链式职务

链式职务中,私下认可上多少个职分的回到结果作为参数字传送递给子职责

from celery import chain
from project.tasks import add,mul,sum
res = chain(add.s(1,2),add.s(3),mul.s(3))()  # 任务((1+2)+3)*3
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break
#结果
#res:18

还足以应用|表示链式职务,上边职责也能够表示为跋山涉水的近义词

res = (add.s(1,2) | add.s(3) | (mul.s(3)))()
res.get()

 

chord跋山涉水的近义词任务分割,分为header和body两有些,hearder职分实行完在执行body,此中hearder重返结果作为参数字传送递给body

from celery import chord
from project.tasks import add,mul,sum
res = chord(header=[add.s(1,2),mul.s(3,4)],body=sum.s())()  # 任务(1+2)+(3*4)
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

#结果:
#res:15

 

chunks跋山涉水的近义词职责分组,依照职分的个数分组

from project.tasks import add,mul,sum
res = add.chunks(zip(range(5),range(5)),4)()  # 4 代表每组的任务的个数
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

结果:

澳门新蒲京平台 8

 

delay &apply_async

  对于delay和apply_async都足以用来打开任务的调节,本质上是delay对apply_async举行了再贰遍封装(或许能够说是连忙情势),两个都回去AsyncResult对象,以下是八个方式源码。

澳门新蒲京平台 9澳门新蒲京平台 10

    def delay(self, *args, **kwargs):
        """Star argument version of :meth:`apply_async`.

        Does not support the extra options enabled by :meth:`apply_async`.

        Arguments:
            *args (Any): Positional arguments passed on to the task.
            **kwargs (Any): Keyword arguments passed on to the task.
        Returns:
            celery.result.AsyncResult: Future promise.
        """
        return self.apply_async(args, kwargs)

delay源码

澳门新蒲京平台 11澳门新蒲京平台 12

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        """Apply tasks asynchronously by sending a message.

        Arguments:
            args (Tuple): The positional arguments to pass on to the task.

            kwargs (Dict): The keyword arguments to pass on to the task.

            countdown (float): Number of seconds into the future that the
                task should execute.  Defaults to immediate execution.

            eta (~datetime.datetime): Absolute time and date of when the task
                should be executed.  May not be specified if `countdown`
                is also supplied.

            expires (float, ~datetime.datetime): Datetime or
                seconds in the future for the task should expire.
                The task won't be executed after the expiration time.

            shadow (str): Override task name used in logs/monitoring.
                Default is retrieved from :meth:`shadow_name`.

            connection (kombu.Connection): Re-use existing broker connection
                instead of acquiring one from the connection pool.

            retry (bool): If enabled sending of the task message will be
                retried in the event of connection loss or failure.
                Default is taken from the :setting:`task_publish_retry`
                setting.  Note that you need to handle the
                producer/connection manually for this to work.

            retry_policy (Mapping): Override the retry policy used.
                See the :setting:`task_publish_retry_policy` setting.

            queue (str, kombu.Queue): The queue to route the task to.
                This must be a key present in :setting:`task_queues`, or
                :setting:`task_create_missing_queues` must be
                enabled.  See :ref:`guide-routing` for more
                information.

            exchange (str, kombu.Exchange): Named custom exchange to send the
                task to.  Usually not used in combination with the ``queue``
                argument.

            routing_key (str): Custom routing key used to route the task to a
                worker server.  If in combination with a ``queue`` argument
                only used to specify custom routing keys to topic exchanges.

            priority (int): The task priority, a number between 0 and 9.
                Defaults to the :attr:`priority` attribute.

            serializer (str): Serialization method to use.
                Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
                serialization method that's been registered
                with :mod:`kombu.serialization.registry`.
                Defaults to the :attr:`serializer` attribute.

            compression (str): Optional compression method
                to use.  Can be one of ``zlib``, ``bzip2``,
                or any custom compression methods registered with
                :func:`kombu.compression.register`.
                Defaults to the :setting:`task_compression` setting.

            link (Signature): A single, or a list of tasks signatures
                to apply if the task returns successfully.

            link_error (Signature): A single, or a list of task signatures
                to apply if an error occurs while executing the task.

            producer (kombu.Producer): custom producer to use when publishing
                the task.

            add_to_parent (bool): If set to True (default) and the task
                is applied while executing another task, then the result
                will be appended to the parent tasks ``request.children``
                attribute.  Trailing can also be disabled by default using the
                :attr:`trail` attribute

            publisher (kombu.Producer): Deprecated alias to ``producer``.

            headers (Dict): Message headers to be included in the message.

        Returns:
            celery.result.AsyncResult: Promise of future evaluation.

        Raises:
            TypeError: If not enough arguments are passed, or too many
                arguments are passed.  Note that signature checks may
                be disabled by specifying ``@task(typing=False)``.
            kombu.exceptions.OperationalError: If a connection to the
               transport cannot be made, or if the connection is lost.

        Note:
            Also supports all keyword arguments supported by
            :meth:`kombu.Producer.publish`.
        """
        if self.typing:
            try:
                check_arguments = self.__header__
            except AttributeError:  # pragma: no cover
                pass
            else:
                check_arguments(*(args or ()), **(kwargs or {}))

        app = self._get_app()
        if app.conf.task_always_eager:
            with denied_join_result():
                return self.apply(args, kwargs, task_id=task_id or uuid(),
                                  link=link, link_error=link_error, **options)

        if self.__v2_compat__:
            shadow = shadow or self.shadow_name(self(), args, kwargs, options)
        else:
            shadow = shadow or self.shadow_name(args, kwargs, options)

        preopts = self._get_exec_options()
        options = dict(preopts, **options) if options else preopts

        options.setdefault('ignore_result', self.ignore_result)

        return app.send_task(
            self.name, args, kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, result_cls=self.AsyncResult,
            shadow=shadow, task_type=self,
            **options
        )

apply_async源码

对此其利用,apply_async帮助常用参数跋山涉水的近义词

  • eta爬山涉水钦赐任务执行时间,类型为datetime时间等级次序;
  • countdown爬山涉水倒计时,单位秒,浮点类型;
  • expires跋山涉水的近义词职分过期时间,如若职责在越过过期时刻还未有实践则回笼任务,浮点类型获取datetime类型;
  • retry爬山涉水任务实施倒闭时候是否尝试,布尔类型。;
  • serializer爬山涉水系列化方案,帮忙pickle、json、yaml、msgpack;
  • priority爬山涉水任务优先级,有0~9优先级可安装,int类型;
  • retry_policy跋山涉水的近义词职分重试机制,在那之中包括多少个重试参数,类型是dict如下跋山涉水的近义词

澳门新蒲京平台 13澳门新蒲京平台 14

max_retries:最大重试次数

interval_start:重试等待时间

interval_step:每次重试叠加时长,假设第一重试等待1s,第二次等待1+n秒

interval_max:最大等待时间

####示例
 add.apply_async((1, 3), retry=True, retry_policy={
        'max_retries': 1,
        'interval_start': 0,
        'interval_step': 0.8,
        'interval_max': 5,
    })

View Code

更加多参数参谋爬山涉水

 

  

 五、管理与监察和控制

  Celery管理和监察功用是由此flower组件完结的,flower组件不止提供监督功能,还提供HTTP
API可达成对woker和task的军管。

设置使用

pip3 install flower

启动

 flower -A project --port=5555   
# -A :项目目录
#--port 指定端口

访问http:ip:5555

澳门新蒲京平台 15

api使用,比如获取woker音信爬山涉水

curl http://127.0.0.1:5555/api/workers

结果:

澳门新蒲京平台 16

更多api参考:

 

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*
*
Website