RabbitMQ : Work Queues (Task Queues)
RabbitMQ & Celery Tutorials
Installing RabbitMQ & Celery
Hello World RabbitMQ
Work Queues (Task Queues) : RabbitMQ
Exchanges - Publish/Subscribe : RabbitMQ
Multiple bindings - Routing : RabbitMQ
Queueing Messages using Celery with RabbitMQ Message Broker Server
In this chapter, we'll create a Work Queues (Task Queues) that will be used to distribute time-consuming tasks among multiple workers. By creating the Work Queues, we can avoid starting a resource-intensive task immediately and having to wait for it to complete. We may want to schedule the heavy task to be done later.
We'll simulate the busy work loads using the sleep() function.
The code doing task schedule is task.py, and it looks like this:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
Note that we can now send arbitrary messages from the command line.
This worker.py will display the work in the message body. We simulate the task load as doc('.'), each accounts for 5 seconds duration. It will pop messages from the queue and perform the task, so let's call it worker.py:It will pop messages from the queue and perform the task:
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
Let's have three workers perform their tasks at the same time. We can simulate this with three console terminals each running worker.py and the 4th console, we run task.py to create works for our workers.
Let the three worker in waiting mode:
W1$ python worker.py [*] Waiting for messages. To exit press CTRL+C W2$ python worker.py [*] Waiting for messages. To exit press CTRL+C W3$ python worker.py [*] Waiting for messages. To exit press CTRL+C
Then, we run the task.py with several(6) tasks:
$ python task.py taks1.;python task.py taks2..;python task.py taks3...;python task.py taks4....;python task.py taks5.....;python task.py taks6......
We can see how the workers are doing on each console:
W1$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'taks1.' [x] Done [x] Received 'taks4...' [x] Done W2$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'taks2..' [x] Done [x] Received 'taks5.....' [x] Done W3$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'taks3...' [x] Done [x] Received 'taks6......' [x] Done
RabbitMQ supports message acknowledgments to make sure a message is never lost. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it. The following lines of code is doing it in callback() function:
ch.basic_ack(delivery_tag = method.delivery_tag)
We compare the line with a line in send.py we used in previous chapter (Hello world : RabbitMQ) where we explicitly turned them off via the no_ack=True flag:
channel.basic_consume(callback, queue='hello', no_ack=True)
With acknowledgments turned on which is default setting, we can be sure that even if we kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.
In previous section, we can make sure that even if the consumer dies, the task isn't lost.
However, our tasks will still be lost if RabbitMQ server stops!
When RabbitMQ quits or crashes it won't remember the queues and messages unless we tell it not to. How?
We should mark both the queue and messages as durable:
For queue to be persistent:
channel.queue_declare(queue='task_queue', durable=True)
Then, the message as well:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
By using the basic.qos() method with the prefetch_count=1 setting, we can tell RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
channel.basic_qos(prefetch_count=1)
RabbitMQ & Celery Tutorials
Installing RabbitMQ & Celery
Hello World RabbitMQ
Work Queues (Task Queues) : RabbitMQ
Exchanges - Publish/Subscribe : RabbitMQ
Multiple bindings - Routing : RabbitMQ
Queueing Messages using Celery with RabbitMQ Message Broker Server
Python tutorial
Python Home
Introduction
Running Python Programs (os, sys, import)
Modules and IDLE (Import, Reload, exec)
Object Types - Numbers, Strings, and None
Strings - Escape Sequence, Raw String, and Slicing
Strings - Methods
Formatting Strings - expressions and method calls
Files and os.path
Traversing directories recursively
Subprocess Module
Regular Expressions with Python
Regular Expressions Cheat Sheet
Object Types - Lists
Object Types - Dictionaries and Tuples
Functions def, *args, **kargs
Functions lambda
Built-in Functions
map, filter, and reduce
Decorators
List Comprehension
Sets (union/intersection) and itertools - Jaccard coefficient and shingling to check plagiarism
Hashing (Hash tables and hashlib)
Dictionary Comprehension with zip
The yield keyword
Generator Functions and Expressions
generator.send() method
Iterators
Classes and Instances (__init__, __call__, etc.)
if__name__ == '__main__'
argparse
Exceptions
@static method vs class method
Private attributes and private methods
bits, bytes, bitstring, and constBitStream
json.dump(s) and json.load(s)
Python Object Serialization - pickle and json
Python Object Serialization - yaml and json
Priority queue and heap queue data structure
Graph data structure
Dijkstra's shortest path algorithm
Prim's spanning tree algorithm
Closure
Functional programming in Python
Remote running a local file using ssh
SQLite 3 - A. Connecting to DB, create/drop table, and insert data into a table
SQLite 3 - B. Selecting, updating and deleting data
MongoDB with PyMongo I - Installing MongoDB ...
Python HTTP Web Services - urllib, httplib2
Web scraping with Selenium for checking domain availability
REST API : Http Requests for Humans with Flask
Blog app with Tornado
Multithreading ...
Python Network Programming I - Basic Server / Client : A Basics
Python Network Programming I - Basic Server / Client : B File Transfer
Python Network Programming II - Chat Server / Client
Python Network Programming III - Echo Server using socketserver network framework
Python Network Programming IV - Asynchronous Request Handling : ThreadingMixIn and ForkingMixIn
Python Coding Questions I
Python Coding Questions II
Python Coding Questions III
Python Coding Questions IV
Python Coding Questions V
Python Coding Questions VI
Python Coding Questions VII
Python Coding Questions VIII
Python Coding Questions IX
Python Coding Questions X
Image processing with Python image library Pillow
Python and C++ with SIP
PyDev with Eclipse
Matplotlib
Redis with Python
NumPy array basics A
NumPy Matrix and Linear Algebra
Pandas with NumPy and Matplotlib
Celluar Automata
Batch gradient descent algorithm
Longest Common Substring Algorithm
Python Unit Test - TDD using unittest.TestCase class
Simple tool - Google page ranking by keywords
Google App Hello World
Google App webapp2 and WSGI
Uploading Google App Hello World
Python 2 vs Python 3
virtualenv and virtualenvwrapper
Uploading a big file to AWS S3 using boto module
Scheduled stopping and starting an AWS instance
Cloudera CDH5 - Scheduled stopping and starting services
Removing Cloud Files - Rackspace API with curl and subprocess
Checking if a process is running/hanging and stop/run a scheduled task on Windows
Apache Spark 1.3 with PySpark (Spark Python API) Shell
Apache Spark 1.2 Streaming
bottle 0.12.7 - Fast and simple WSGI-micro framework for small web-applications ...
Flask app with Apache WSGI on Ubuntu14/CentOS7 ...
Fabric - streamlining the use of SSH for application deployment
Ansible Quick Preview - Setting up web servers with Nginx, configure enviroments, and deploy an App
Neural Networks with backpropagation for XOR using one hidden layer
NLP - NLTK (Natural Language Toolkit) ...
RabbitMQ(Message broker server) and Celery(Task queue) ...
OpenCV3 and Matplotlib ...
Simple tool - Concatenating slides using FFmpeg ...
iPython - Signal Processing with NumPy
iPython and Jupyter - Install Jupyter, iPython Notebook, drawing with Matplotlib, and publishing it to Github
iPython and Jupyter Notebook with Embedded D3.js
Downloading YouTube videos using youtube-dl embedded with Python
Machine Learning : scikit-learn ...
Django 1.6/1.8 Web Framework ...
Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization