Most developers have to write programs that read and process items of stuff. Be it products, feed-items, images, you name it. I’ve certainly had to. Apprentice programmers would just write it all down sequentially, but – as many fellow programmers can testify – this approach leads to maintenance problems due to a mixing of responsibilities in one place. ((Wikipedia: Separation of concerns))
Pipe and filters architectures are a natural fit to this problem. They allow you to separate out responsibilities in distinct processing units. While this may lead to an increase in perceived complexity, it actually reduces the complexity you have to handle by confining it to some other level.
One especially brilliant example of this type of architecture is UNIX’ pipe system.
Disclaimer: For this post I will show you the outline of a simple yet extensible feed (RSS or other) parsing system. The implementation of the concrete parts is left as an excercise to the reader. 🙂
A naive solution with lots of boilerplate
We want to separate out processing steps (to stay maintainable) so we need to agree on an interface to pass information from one step to the next. We just start with simple class with some hook-methods.
[code language=”python”]# WARNING, Anti-Example!# Do not use. Better code will come up below!!1!
class ProcessingStep(object):
def __init__(self):
self.results = []
def add(self, items):
for item in items:
# This is built that way so that self.process()
# can either drop or add items to the results.
# (self.process() has to return the item again
# if it should be preserved, an empty list if
# it should be dropped)
for result in self.process(item):
def get(self):
return self.results
def process(self, item):
raise NotImplementedError
Creating new steps is trivially easy now. We just have to subclass ProcessingStep, override process()
and we’re done.
def process(self, item):
return [item]
# We need some management class which will control our pipeline
# and feed the items through it.
class Pipeline(object):
def __init__(self, steps):
self.steps = steps
def feed(self, items):
# Feed the item into one step, get the result, feed the
# result to the next step and so on.
for step in steps:
items = step.get()
return items
Nice! This solution looks very clean and simple. It has a reasonable small line count and no magic. It still has problems though.
Reality catches up
When implementing the feed parser we identified our basic steps: parsing the feed, processing each article, downloading images, storing the articles in our database.
Sometimes we want code to run only once in the process though, be it before the items get passed or afterwards. Examples are caching, database access, etc. So to achieve that we’ll have to change either the ProcessingStep or Pipeline class. We would add a before_loop()
and probably a after_loop()
hook method to every step.
Regardless, our next requirement would be: Buffer 500 entries per step to enable a grouped database flush. Grouping by some arbitrary key would also be nice, wouldn’t it? Although it’s actually quite simple to extend this solution, the base-class will just grow and grow with every feature. Testing it thorougly will also take some time. Another objection I gave myself was that these patterns look so common they ought to be solved already. It just doesn’t feel right to code at this level in Python.
This was the point where I decided to look at the itertools ((Python 2 documentation: itertools — Functions creating iterators for efficient looping)) module again. To my delight many of my desired patterns were already implemented, present in the recipe section of the docs, or could be coded up quickly without much boilerplate.
More flexible solution with less boilerplate
Fortunately Python has a very nice language feature, named generators. Generators are like a swiss-army knife of iterables. You can combine them, you can wrap them, you can even send them values. And the nice part about them is: you can write them as if they were writing regular sequential Python code. And they are a perfect fit for writing pipeline processing software.
[code language=”python”]def processing_step(items):
"""Example of a processing step."""
# This section of code is run before the current step
# starts processing.
for item in items:
# Your processing goes here.
yield item
# this section of code is run after all items have
# been processed by this step.
If we look at this example we really see the power of a generator. Every processing step already has support for pre-run and post-run code with no additional boilerplate code. You just put the code in your generator.
Another nice thing about using generators is: we can use the itertools module to help us with commonly occuring iterator patterns – like grouping, buffering, etc.
[code language=”python”]def chunk(chunk_size=32):
"""Group chunk_size elements into lists and emit them.
>>> chunker = chunk(32)
>>> gen = chunker(xrange(64))
>>> assert len(list(gen)) == 2
def chunker(gen):
gen = iter(gen)
steps = range(chunk_size)
chunk = []
while True:
for _ in steps:
yield chunk
chunk = []
except StopIteration:
if chunk:
yield chunk
return chunker
def unchunk(gen):
"""Flatten a sequence, but only one level deep."""
return itertools.chain.from_iterable(gen)
A simple filter solution would look something like this.
[code language=”python”]def filter_test_entries(gen):for entry in gen:
if entry.title.startswith("test"):
yield entry
Putting the band back together
We now have seen some individual components of the whole pipeline system, but to actually use the system we need two more (very simple) functions.
The first one actually constructs our processing-pipeline. It’s implemented in a very simple way and this actually was my first implementation of it. I may, in later blog-posts, elaborate a bit about the subsequent evolution of this particular function.
[code language=”python”]def combine_pipeline(source, pipeline):"""Combine source and pipeline and return a generator.
This is basically the same as the following function:
>>> def combine_pipeline_explicit(source, pipeline):
… gen = source
… for step in pipeline:
… gen = step(gen)
… return gen
return reduce(lambda x, y: y(x), pipeline, source)
The second one we use to kickstart the combined pipeline. Please note that, since we use generators here, the pipeline can only be run once. To reset the pipeline, just call combine_pipeline again.
[code language=”python”]def consume(iter):"""Consume an iterator, trowing away the results.
You may use this if you want to trigger some code in a
generator but don’t want to actually know the result because
everything happens as a "side-effect" of the generator running.
There are better ones, but this one is simple to explain.
Read the itertools recipe section for a more performant
implementation of this.
for _ in iter:
Then you can use it like that:
[code language=”python”]pipeline_steps = [parse_xml,
chunk(32), # This one combines images into
# chunks of 32 and passes these
# on to `save_database`. This is
# useful to reduce the accumulation
# of latency-costs of the db-access.
source = open("feed.xml", "r")
pipeline = combine_pipeline(source, pipeline_steps)
# You may also iterate yourself over the pipeline, e.g.:
# for entry in pipline:
# print entry
And that was it. Every entry coming out of pipeline will have gone through every processing step along the way. Any additional processing steps can be simply added/removed/replaced just by putting it in the pipeline_steps list. Neat.
I’m very happy with this system and although it changed a bit and gathered many processing steps, the core of it is exactly as presented here. The pipeline length in our system is nearing 30 distinct steps now and it is thoroughly tested (~95% coverage). The longest processing step has around 70 SLOC.
At first I was wary because I knew that generators can be detrimental to performance because of the function-call overhead inherent in every iteration, yet I found that the performance of the generators were negligable in comparison to the run-time of the steps themselves.
Aside: While writing this blog-post I found it very hard to (re-)create the initial (anti-)example in the first section. Knowing a solution to a problem seems to make you blind for other ones.
Discuss it on Hacker News:
On Reddit:
Ah, by the way: we’re hiring
Please have a look at to see if there’s any position that might appeal to you! It’s great to work for this company and Munich is a great city. So, come join us! 🙂
Interesting post.
Might want to check out my pipe_controller tool, which is very roughly for the same area – doing “pipes” in Python.
– Vasudev
Thanks for your feedback. That’s similar to how I wanted to do it initially, but I found using generators is just more flexible as processing-steps can hold local state without requiring the use of classes as pipeline-steps. It’s just so much less code that way. You should definitely try it. An adapter-generator for already existing call-every-step functions would look something like this:
[code language=”python”]
def callable_step(func):
assert callable(func)
def pipeline_step(iterator):
for entry in iterator:
yield func(entry)
return pipeline_step
Also you might want to make a so other people can use it easily if they see fit.
Some years ago I coded an internal bulk framework like yours. We got better read-ability (for-if-for-constructs) and the great test coverage (testing each step).
But a main feature was to speed up steps like your “download_image”. Instead use direct iterators I built it based on the Queue.Queue() (between each processing step). This offers the possibility to span many threads for IO-related steps.
Ah, by the way: Why are no python-offers for stuttgart?
Right now our pipeline doesn’t contain any IO-related steps (it’s basically CPU bound), so I can’t really use threads right now to speed things up. On top of that, because of the nature of the problem, some of the steps need to hold state and resist being split up (neatly).
I sucessfully used multiprocessing.Queue to fan-out CPU bound jobs to a worker-pool once, but I’m still searching for the right abstraction level. Also, the jobs currently run in Celery which brings in some new problems. 😐
As for Python in Stuttgart: My observation is that Python seeps into Germany starting with the bigger cities. First cities with major Python job opportunities were Hamburg and Berlin. Munich is now catching up. I expect some smaller cities to follow this pattern over the next years.
Nice Article.
Neat solution, excellent article. I love how clean and powerful is the consume_pipeline function. I think it deserves a few lines of explanations in the article. 🙂
I think consume_pipeline it’s pretty much self-explanatory. 🙂 I chose this one because I didn’t want to have to explain this
[code language=”python”]
def consume(iterator, n):
"Advance the iterator n-steps ahead. If n is none, consume entirely."
# Use functions that consume iterators at C speed.
if n is None:
# feed the entire iterator into a zero-length deque
collections.deque(iterator, maxlen=0)
# advance to the empty slice starting at position n
next(islice(iterator, n, n), None)
Perhaps in a later post.
Oops, I meant to write the combine_pipeline, not consume:
[code lang=”python”]
def combine_pipeline(source, pipeline):
return reduce(lambda x, y: y(x), pipeline, source)
Yes, true. I’ll
updateupdated the post with a bit of explanation. 🙂Pingback: Vrac de mini-messages n°11 : Debian Project Leader, OVH, build de Firefox, rachat de Sourcefire, Python, PyPy, Libreoffice et Pychef | Le blog de Carl Chenet
I dont get why you write:
[code language=”python”]
instead of simple :
[code language=”python”]
in the internal chunker function.
You are right. I thought that since Python3 almost everything got converted into lazy sequences by default (.items instead of .iteritems, etc.) range would be lazy as well and thus less performant. But I haven’t measured it, so I shouldn’t further complicate the code that way.
I’ll update the code and drop the list call. Good catch.
Hi Cristoph,
Very interesting approach and ideas here.
I’ve been working myself on a similar idea/framework using coroutines and queues focused towards IO pipeline servers:
Thanks for sharing,
Thanks for the very informative post!
I however needed the pipeline to return the final value. In order to do so, I modified your code a little bit.
[code language=”python”]
def consume(self, return_result=False):
"""Consume an iterator, trowing away the results.
You may use this if you want to trigger some code in a
generator but don’t want to actually know the result because
everything happens as a "side-effect" of the generator running.
There are better ones, but this one is simple to explain.
Read the itertools recipe section for a more performant
implementation of this.
Note: if return_result argument is True, the result will be returned,
instead of being thrown away
for result in self.combine_pipeline():
if self.return_result:
return result
Fair enough. Although in your case I would drop the boolean parameter (which is a code smell) altogether and always return the last result. It’s easier and doesn’t hurt at all.
Sorry, there was a small typo in my previous comment. The “return_result“ argument must be passed to the “__init__“ method, and not to the “consume“ one.
Hi Christoph,
I loved your solution so much that I modified it slightly and turned it into a small python library, named tubo. Check it out:
Oops… correct link:
Thanks for the notice. 🙂 Glad you like it.