/usr/share/pyshared/celery/task/sets.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | # -*- coding: utf-8 -*-
"""
celery.task.sets
~~~~~~~~~~~~~~~~
Creating and applying groups of tasks.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
from __future__ import with_statement
import warnings
from .. import registry
from ..app import app_or_default
from ..datastructures import AttributeDict
from ..exceptions import CDeprecationWarning
from ..utils import cached_property, reprcall, uuid
from ..utils.compat import UserList
TASKSET_DEPRECATION_TEXT = """\
Using this invocation of TaskSet is deprecated and will be removed
in Celery v2.4!
TaskSets now supports multiple types of tasks, the API has to reflect
this so the syntax has been changed to:
from celery.task import TaskSet
ts = TaskSet(tasks=[
%(cls)s.subtask(args1, kwargs1, options1),
%(cls)s.subtask(args2, kwargs2, options2),
...
%(cls)s.subtask(argsN, kwargsN, optionsN),
])
result = ts.apply_async()
Thank you for your patience!
"""
class subtask(AttributeDict):
"""Class that wraps the arguments and execution options
for a single task invocation.
Used as the parts in a :class:`TaskSet` or to safely
pass tasks around as callbacks.
:param task: Either a task class/instance, or the name of a task.
:keyword args: Positional arguments to apply.
:keyword kwargs: Keyword arguments to apply.
:keyword options: Additional options to
:func:`celery.execute.apply_async`.
Note that if the first argument is a :class:`dict`, the other
arguments will be ignored and the values in the dict will be used
instead.
>>> s = subtask("tasks.add", args=(2, 2))
>>> subtask(s)
{"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
"""
def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
init = super(subtask, self).__init__
if isinstance(task, dict):
return init(task) # works like dict(d)
# Also supports using task class/instance instead of string name.
try:
task_name = task.name
except AttributeError:
task_name = task
init(task=task_name, args=tuple(args or ()),
kwargs=dict(kwargs or {}, **ex),
options=options or {})
def delay(self, *argmerge, **kwmerge):
"""Shortcut to `apply_async(argmerge, kwargs)`."""
return self.apply_async(args=argmerge, kwargs=kwmerge)
def apply(self, args=(), kwargs={}, **options):
"""Apply this task locally."""
# For callbacks: extra args are prepended to the stored args.
args = tuple(args) + tuple(self.args)
kwargs = dict(self.kwargs, **kwargs)
options = dict(self.options, **options)
return self.type.apply(args, kwargs, **options)
def apply_async(self, args=(), kwargs={}, **options):
"""Apply this task asynchronously."""
# For callbacks: extra args are prepended to the stored args.
args = tuple(args) + tuple(self.args)
kwargs = dict(self.kwargs, **kwargs)
options = dict(self.options, **options)
return self.type.apply_async(args, kwargs, **options)
def __reduce__(self):
# for serialization, the task type is lazily loaded,
# and not stored in the dict itself.
return (self.__class__, (dict(self), ), None)
def __repr__(self):
return reprcall(self["task"], self["args"], self["kwargs"])
@cached_property
def type(self):
return registry.tasks[self.task]
def maybe_subtask(t):
if not isinstance(t, subtask):
return subtask(t)
return t
class TaskSet(UserList):
"""A task containing several subtasks, making it possible
to track how many, or when all of the tasks have been completed.
:param tasks: A list of :class:`subtask` instances.
Example::
>>> urls = ("http://cnn.com/rss", "http://bbc.co.uk/rss")
>>> taskset = TaskSet(refresh_feed.subtask((url, )) for url in urls)
>>> taskset_result = taskset.apply_async()
>>> list_of_return_values = taskset_result.join() # *expensive*
"""
_task = None # compat
_task_name = None # compat
#: Total number of subtasks in this set.
total = None
def __init__(self, task=None, tasks=None, app=None, Publisher=None):
self.app = app_or_default(app)
if task is not None:
if hasattr(task, "__iter__"):
tasks = [maybe_subtask(t) for t in task]
else:
# Previously TaskSet only supported applying one kind of task.
# the signature then was TaskSet(task, arglist),
# so convert the arguments to subtasks'.
tasks = [subtask(task, *arglist) for arglist in tasks]
task = self._task = registry.tasks[task.name]
self._task_name = task.name
warnings.warn(TASKSET_DEPRECATION_TEXT % {
"cls": task.__class__.__name__},
CDeprecationWarning)
self.data = list(tasks or [])
self.total = len(self.tasks)
self.Publisher = Publisher or self.app.amqp.TaskPublisher
def apply_async(self, connection=None, connect_timeout=None,
publisher=None, taskset_id=None):
"""Apply taskset."""
app = self.app
if app.conf.CELERY_ALWAYS_EAGER:
return self.apply(taskset_id=taskset_id)
with app.default_connection(connection, connect_timeout) as conn:
setid = taskset_id or uuid()
pub = publisher or self.Publisher(connection=conn)
try:
results = self._async_results(setid, pub)
finally:
if not publisher: # created by us.
pub.close()
return app.TaskSetResult(setid, results)
def _async_results(self, taskset_id, publisher):
return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
for task in self.tasks]
def apply(self, taskset_id=None):
"""Applies the taskset locally by blocking until all tasks return."""
setid = taskset_id or uuid()
return self.app.TaskSetResult(setid, self._sync_results(setid))
def _sync_results(self, taskset_id):
return [task.apply(taskset_id=taskset_id) for task in self.tasks]
@property
def tasks(self):
return self.data
@property
def task(self):
warnings.warn(
"TaskSet.task is deprecated and will be removed in 1.4",
CDeprecationWarning)
return self._task
@property
def task_name(self):
warnings.warn(
"TaskSet.task_name is deprecated and will be removed in 1.4",
CDeprecationWarning)
return self._task_name
|