Flow based programming and ETL post

For quite some time I've been searching for a resonable approch on Extract, Transform, Load (ETL) in php where I can define a workflow, based on e.g. a UML diagram and just "run" it asynchronously. A solution with a fully fledged ETL tool like MS SSIS or talend were out of the question, due to their high complexity and hardware requirements. Also the possible solution has to integrate into our existing php environment.


If you have already read my other posts, you know me to already use RabbitMQ and php-amqp for asynchronously handlinge import processes. This goes one step further and introduces the flow based programming "design pattern".

In computer science, flow-based programming (FBP) is a programming paradigm that defines applications as networks of "black box" processes, which exchange data across predefined connections by message passing, where the connections are specified externally to the processes. These black box processes can be reconnected endlessly to form different applications without having to be changed internally. FBP is thus naturally component-oriented. (Wikipedia)

Developers used to the Unix philosophy should be immediately familiar with FBP:

This is the Unix philosophy: Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.

It also fits well in Alan Kay's original idea of object-oriented programming:

I thought of objects being like biological cells and/or individual computers on a network, only able to communicate with messages (so messaging came at the very beginning -- it took a while to see how to do messaging in a programming language efficiently enough to be useful).

Sounds good, doesn't it?

improvements and status quo

Initially I worked with phpflo, adapted it for symfony to use dependency injection instead of a factory-like process and was kind of happy. After a short while, the first problems arose:

Having serveral long running processes introduced the problem of "state" within components and also the network. So, already initialized networks could not be reused and had to be destroyed. Using a compiler-pass approach with a registry of components, also introduced port states within the process.

Several ideas came to my mind: Just restart the processes after every message from the queue or even fork the single ETL processes per message - but everything just lead into more problems:

  • Restarting processes means framework initialization overhead
  • Forking processes needs some kind of lowlevel process management

Overall, the best approach was to integrate some stage-management into phpflo, split the library into several components and implement a parser for the (more convinient) FBP domain specific language (DSL). You can find the implementation here. The split into several libraries was necessary due to separation of concerns, maintenance and possible future contributions of generic components.


Added to our technology stack, phpflo integrated fine with symfony and all components are loaded via DIC. This allows for easy configuration of processes:

CategoryCreator() out -> in MdbPersister()
CategoryCreator() route -> in RouteCreator()
CategoryCreator() media -> in MediaIterator()
MediaIterator(MediaIterator) out -> in MediaHandler()
CategoryCreator() bannerset -> in BannersetHandler()
BannersetHandler() out -> bannerset CategoryCreator()
CategoryCreator() tag -> tags TagCreator(TagCreator)
TagCreator() tags -> tag CategoryCreator()
CategoryCreator() hierarchy -> hierarchy TagCreator()
TagCreator(TagCreator) hierarchy -> hierarchy CategoryCreator()
CategoryCreator() sidebar -> in SidebarHandler()
SidebarHandler() out -> sidebar CategoryCreator()
SidebarHandler() build -> in JsonFieldFetcher()
JsonFieldFetcher() sidebar -> in SidebarCreator()
RouteCreator() out -> in MdbPersister()

This replaces a 450+ lines JSON-file!

So, given all processes are defined as symfony (private) services, they can use all dependencies they need and are even easier to test.

Thanks to the datatype checks I've introduced into phpflo, connections are checked for compatibility. For us this means: Every component with compatible ports could be stitched together and worked with. That removed a lot of inheritance, type-checks and so on.

If you need a similar solutions, I suggest you continue reading here: phpflo on GitHub

And last, but not least: Big thanks to James (@aretecode) for his code reviews and support concerning architectural descisions!

Categories: php, symfony2

Tags: php, symfony2