发布时间:2025-12-11 03:01:03 浏览次数:1
架构:
这里作为例子的celery app为myapp:
root@workgroup0:~/celeryapp#lsmyappagent.pycelery.pyconfig.py__init__.pyroot@workgroup0:~/celeryapp#
公用代码部分:
celery.py:(备注:172.16.77.175是任务发布节点的ip地址)
from__future__importabsolute_importfromceleryimportCeleryapp=Celery('myapp',broker='amqp://guest@172.16.77.175//',backend='amqp://guest@172.16.77.175//',include=['myapp.agent'])app.config_from_object('myapp.config')if__name__=='__main__':app.start()config.py:
from__future__importabsolute_importfromkombuimportQueue,ExchangefromdatetimeimporttimedeltaCELERY_TASK_RESULT_EXPIRES=3600CELERY_TASK_SERIALIZER='json'CELERY_ACCEPT_CONTENT=['json']CELERY_RESULT_SERIALIZER='json'CELERY_DEFAULT_EXCHANGE='agent'CELERY_DEFAULT_EXCHANGE_TYPE='direct'CELERT_QUEUES=(Queue('machine1',exchange='agent',routing_key='machine1'),Queue('machine2',exchange='agent',routing_key='machine2'),)__init__.py:(空白)
任务发布节点的agent.py:
from__future__importabsolute_importfrommyapp.celeryimportapp@app.taskdefadd(x,y):return{'thevalueis':str(x+y)}@app.taskdefwritefile():out=open('/tmp/data.txt','w')out.write('hello'+'\n')out.close()@app.taskdefmul(x,y):returnx*y@app.taskdefxsum(numbers):returnsum(numbers)@app.taskdefgetl(stri):returngetlength(stri)defgetlength(stri):returnlen(stri)docker1上的agent.py:
from__future__importabsolute_importfrommyapp.celeryimportapp@app.taskdefadd(x,y):return{'value':str(x+y),'node_name':'docker1'}#增加了node_name用来识别节点@app.taskdefwritefile():out=open('/tmp/data.txt','w')out.write('hello'+'\n')out.close()@app.taskdefmul(x,y):returnx*y@app.taskdefxsum(numbers):returnsum(numbers)@app.taskdefgetl(stri):returngetlength(stri)defgetlength(stri):returnlen(stri)docker2上的:
from__future__importabsolute_importfrommyapp.celeryimportapp@app.taskdefadd(x,y):return{'value':str(x+y),'node_name':'docker2'}@app.taskdefwritefile():out=open('/tmp/data.txt','w')out.write('hello'+'\n')out.close()@app.taskdefmul(x,y):returnx*y@app.taskdefxsum(numbers):returnsum(numbers)@app.taskdefgetl(stri):returngetlength(stri)defgetlength(stri):returnlen(stri)在这个例子中我只测试add()函数:
在docker1节点上启动worker:(用-Q指定监听的queue)
root@workgroup1:~/celeryapp#celery-Amyappworker-linfo-Qmachine1/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766:RuntimeWarning:Youarerunningtheworkerwithsuperuserprivileges,whichisabsolutelynotrecommended!Pleasespecifyadifferentuserusingthe-uoption.Userinformation:uid=0euid=0gid=0egid=0uid=uid,euid=euid,gid=gid,egid=egid,--------------celery@workgroup1.hzg.comv3.1.17(Cipater)----****--------*****--Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty--*-****----**----------[config]-**----------.>app:myapp:0x7f472d73f190-**----------.>transport:amqp://guest:**@172.16.77.175:5672//-**----------.>results:amqp://guest@172.16.77.175//-***---*---.>concurrency:1(prefork)--*******-------*****-----[queues]--------------.>machine1exchange=machine1(direct)key=machine1[tasks].myapp.agent.add.myapp.agent.getl.myapp.agent.mul.myapp.agent.writefile.myapp.agent.xsum[2015-10-1815:07:51,313:INFO/MainProcess]Connectedtoamqp://guest:**@172.16.77.175:5672//[2015-10-1815:07:51,340:INFO/MainProcess]mingle:searchingforneighbors[2015-10-1815:07:52,372:INFO/MainProcess]mingle:syncwith1nodes[2015-10-1815:07:52,374:INFO/MainProcess]mingle:synccomplete[2015-10-1815:07:52,423:WARNING/MainProcess]celery@workgroup1.hzg.comready.
启动docker2上的worker:
root@workgroup2:~/celeryapp#celery-Amyappworker-linfo-Qmachine2/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766:RuntimeWarning:Youarerunningtheworkerwithsuperuserprivileges,whichisabsolutelynotrecommended!Pleasespecifyadifferentuserusingthe-uoption.Userinformation:uid=0euid=0gid=0egid=0uid=uid,euid=euid,gid=gid,egid=egid,--------------celery@workgroup2.hzg.comv3.1.18(Cipater)----****--------*****--Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty--*-****----**----------[config]-**----------.>app:myapp:0x7f708cb8ec10-**----------.>transport:amqp://guest:**@172.16.77.175:5672//-**----------.>results:amqp://guest@172.16.77.175//-***---*---.>concurrency:1(prefork)--*******-------*****-----[queues]--------------.>machine2exchange=machine2(direct)key=machine2[tasks].myapp.agent.add.myapp.agent.getl.myapp.agent.mul.myapp.agent.writefile.myapp.agent.xsum[2015-10-1815:08:52,114:INFO/MainProcess]Connectedtoamqp://guest:**@172.16.77.175:5672//[2015-10-1815:08:52,144:INFO/MainProcess]mingle:searchingforneighbors[2015-10-1815:08:53,174:INFO/MainProcess]mingle:syncwith1nodes[2015-10-1815:08:53,176:INFO/MainProcess]mingle:synccomplete[2015-10-1815:08:53,227:WARNING/MainProcess]celery@workgroup2.hzg.comready.
在任务发布节点发布一个计算任务给docker1:
root@workgroup0:~/celeryapp#lsdefault.etcdhots.shhotswap.pymyappmyapp1tmppeople.dbrespsoratest.pyroot@workgroup0:~/celeryapp#pythonPython2.7.6(default,Mar222014,22:59:56)[GCC4.8.2]onlinux2Type"help","copyright","credits"or"license"formoreinformation.>>>frommyapp.agentimportadd>>>res=add.apply_async(args=[122,34],queue='machine1',routing_key='machine1')>>>res.get(){u'value':u'156',u'node_name':u'docker1'}用get()可以看到来自docker1的返回,再看看docker1的显示:
[2015-10-1815:11:51,217:INFO/MainProcess]Taskmyapp.agent.add[c487a9a2-e5cc-462b-a131-784b363a1952]succeededin0.03602907s:{'value':'156','node_name':'docker1'}至于docker2,一点没动:
[2015-10-1815:08:53,176:INFO/MainProcess]mingle:synccomplete[2015-10-1815:08:53,227:WARNING/MainProcess]celery@workgroup2.hzg.comready.
发布一个任务给docker2:
>>>res=add.apply_async(args=[1440,900],queue='machine2',routing_key='machine2')>>>res.get(){u'value':u'2340',u'node_name':u'docker2'}>>>