python - Celery: access all previous results in a chain -


so have quite complex workflow, looks similar this:

>>> res = (add.si(2, 2) | add.s(4) | add.s(8))() >>> res.get() 16 

afterwards it's rather trivial me walk result chain , collect individual results:

>>> res.parent.get() 8  >>> res.parent.parent.get() 4 

my problem is, if third task depends on knowing result of first one, in example receives result of second?

also chains quite long , results aren't small, passing through input result unnecessarily pollute result-store. redis, limitations when using rabbitmq,zeromq,... don't apply.

i assign every chain job id , track job saving data in database.

launching queue

if __name__ == "__main__":   # generate unique id job   job_id = uuid.uuid4().hex   # root parent   parent_level = 1   # pack data. last value value add   parameters = job_id, parent_level, 2   # build chain. added clean task removes data   # created during process (if want it)   add_chain = add.s(parameters, 2) | add.s(4) | add.s(8)| clean.s()   add_chain.apply_async() 

now tasks

#function store result. used sqlalchemy (mysql) can # change whatever want (distributed file system example) @inject.params(entity_manager=entitymanager) def save_result(job_id, level, result, entity_manager):   r = result()   r.job_id = job_id   r.level = level   r.result = result   entity_manager.add(r)   entity_manager.commit()  #restore result 1 parent @inject.params(entity_manager=entitymanager) def get_result(job_id, level, entity_manager):   result = entity_manager.query(result).filter_by(job_id=job_id, level=level).one()   return result.result  #clear data or final result @inject.params(entity_manager=entitymanager)   def clear(job_id, entity_manager):   entity_manager.query(result).filter_by(job_id=job_id).delete()  @app.task() def add(parameters, number):   # extract data parameters list   job_id, level, other_number = parameters    #load result second parent (level - 2)   #for level 3 parent level - 3 , on   #second_parent_result = get_result(job_id, level - 2)    # stuff, guess want add numbers   result = number + other_number   save_result(job_id, level, result)    #return result of sum or want, have send because "add" function expects 3 values   #of course should return actual job , increment parent level   return job_id, level + 1, result  @app.task() def clean(parameters):   job_id, level, result = parameters   #do final result or not   #clear data   clear(job_id) 

i use entity_manager manages database operations. entity manager uses sql alchemy , mysql. used table "result" storage partial results. part should change best storage system (or use if mysql ok you)

from sqlalchemy.orm import sessionmaker sqlalchemy import create_engine import inject  class entitymanager():    session = none    @inject.params(config=configuration)   def __init__(self, config):     conf = config['persistence']     uri = conf['driver'] + "://" + conf['username'] + ":@" + conf['host'] + "/" + conf['database']      engine = create_engine(uri, echo=conf['debug'])      session = sessionmaker(bind=engine)     self.session = session()    def query(self, entity_type):     return self.session.query(entity_type)    def add(self, entity):     return self.session.add(entity)    def flush(self):     return self.session.flush()    def commit(self):     return self.session.commit()  class configuration:   def __init__(self, params):     f = open(os.environ.get('pythonpath') + '/conf/config.yml')     self.configmap = yaml.safe_load(f)     f.close()    def __getitem__(self, key: str):     return self.configmap[key]  class result(base):   __tablename__ = 'result'    id = column(integer, primary_key=true)   job_id = column(string(255))   level = column(integer)   result = column(integer)    def __repr__(self):     return "<result (job='%s', level='%s', result='%s')>" % (self.job_id, str(self.level), str(self.result)) 

i used package inject dependency injector. inject package reuse object can inject access database every time want , no worry connection.

the class configuration load database access data in config file. can replace , use static data (a map hardcoded) testing.

change dependency injection other thing suitable you. solution. added fast test.

the key here save partial results somewhere our queue system , in tasks return data access these results (job_id , parent level). send (but small) data address (job_id + parent level) points real data (some big stuff).

this solution i'm using in software


Popular posts from this blog