python - Celery: access all previous results in a chain -
so have quite complex workflow, looks similar this:
>>> res = (add.si(2, 2) | add.s(4) | add.s(8))() >>> res.get() 16
afterwards it's rather trivial me walk result chain , collect individual results:
>>> res.parent.get() 8 >>> res.parent.parent.get() 4
my problem is, if third task depends on knowing result of first one, in example receives result of second?
also chains quite long , results aren't small, passing through input result unnecessarily pollute result-store. redis, limitations when using rabbitmq,zeromq,... don't apply.
i assign every chain job id , track job saving data in database.
launching queue
if __name__ == "__main__": # generate unique id job job_id = uuid.uuid4().hex # root parent parent_level = 1 # pack data. last value value add parameters = job_id, parent_level, 2 # build chain. added clean task removes data # created during process (if want it) add_chain = add.s(parameters, 2) | add.s(4) | add.s(8)| clean.s() add_chain.apply_async()
now tasks
#function store result. used sqlalchemy (mysql) can # change whatever want (distributed file system example) @inject.params(entity_manager=entitymanager) def save_result(job_id, level, result, entity_manager): r = result() r.job_id = job_id r.level = level r.result = result entity_manager.add(r) entity_manager.commit() #restore result 1 parent @inject.params(entity_manager=entitymanager) def get_result(job_id, level, entity_manager): result = entity_manager.query(result).filter_by(job_id=job_id, level=level).one() return result.result #clear data or final result @inject.params(entity_manager=entitymanager) def clear(job_id, entity_manager): entity_manager.query(result).filter_by(job_id=job_id).delete() @app.task() def add(parameters, number): # extract data parameters list job_id, level, other_number = parameters #load result second parent (level - 2) #for level 3 parent level - 3 , on #second_parent_result = get_result(job_id, level - 2) # stuff, guess want add numbers result = number + other_number save_result(job_id, level, result) #return result of sum or want, have send because "add" function expects 3 values #of course should return actual job , increment parent level return job_id, level + 1, result @app.task() def clean(parameters): job_id, level, result = parameters #do final result or not #clear data clear(job_id)
i use entity_manager manages database operations. entity manager uses sql alchemy , mysql. used table "result" storage partial results. part should change best storage system (or use if mysql ok you)
from sqlalchemy.orm import sessionmaker sqlalchemy import create_engine import inject class entitymanager(): session = none @inject.params(config=configuration) def __init__(self, config): conf = config['persistence'] uri = conf['driver'] + "://" + conf['username'] + ":@" + conf['host'] + "/" + conf['database'] engine = create_engine(uri, echo=conf['debug']) session = sessionmaker(bind=engine) self.session = session() def query(self, entity_type): return self.session.query(entity_type) def add(self, entity): return self.session.add(entity) def flush(self): return self.session.flush() def commit(self): return self.session.commit() class configuration: def __init__(self, params): f = open(os.environ.get('pythonpath') + '/conf/config.yml') self.configmap = yaml.safe_load(f) f.close() def __getitem__(self, key: str): return self.configmap[key] class result(base): __tablename__ = 'result' id = column(integer, primary_key=true) job_id = column(string(255)) level = column(integer) result = column(integer) def __repr__(self): return "<result (job='%s', level='%s', result='%s')>" % (self.job_id, str(self.level), str(self.result))
i used package inject dependency injector. inject package reuse object can inject access database every time want , no worry connection.
the class configuration load database access data in config file. can replace , use static data (a map hardcoded) testing.
change dependency injection other thing suitable you. solution. added fast test.
the key here save partial results somewhere our queue system , in tasks return data access these results (job_id , parent level). send (but small) data address (job_id + parent level) points real data (some big stuff).
this solution i'm using in software