Friday, December 14, 2007

Stream support in Drools - a glimpse

As some of you, our users, already know, we are working on a series of features for CEP and ESP support for the next major release of Drools. We weren't talking much about it (busy researching and writing code), but I think it is time to share with you a little bit of what we developed on the Stream support side.

The first thing to understand is the concept of an "Entry Point".

When we work with streams, where the volume of data is usually huge, it is mandatory that we develop a way of scoping where that data will evaluated. Another mandatory requirement is the ability to effectively work with multiple streams in parallel, and for that, we need to effectively parallelize parts of the network, so that we don't create bottlenecks among different streams.

So an "Entry Point" is a channel through which you can assert data into the engine. All facts asserted through one entry point are only visible by patterns associated with that entry point and all entry points are independent of each other, thread safe and can run in parallel.

So, to illustrate the idea, nothing better than a simple rule:

import event org.drools.StockTradeEvent;

rule "Correlate Trade Order"
Customer( $id : id )
StockTradeEvent( customer == $id, $asset : asset ) from entry-point HomeBrokerStream
StockTradeEvent( asset == $asset, status == Trade.CONFIRMED ) from entry-point StockExchangeStream
// correlate events

So, in the above example, you have one single rule correlating events from the working memory (Customer) and from 2 different streams (HomeBrokerStream and StockExchangeStream) at the same time it keeps each of the streams processing in parallel.

The good thing about the implementation we did is that we leverage the characteristics of RETE to support the streams, like node sharing, indexing etc. So, if you have multiple rules with patterns that are associated with the same streams, the patterns will share nodes, index facts, etc transparently.

Another interesting characteristics is that we handle heterogeneous streams. I.e., each stream is not limited to a single type of event/fact and this is all transparent to the user. The engine select the events/facts from the stream based on the patterns the rules are looking for.

From a code perspective, all the user needs to do is get the entry point interface from the session for each of his streams and it to insert the events/facts:

StatefulSession session = ruleBase.newStatefulSession();

// one thread may do this:
EntryPointInterface broker = session.getEntryPoint( "HomeBrokerStream" );
// insert each event as usual
broker.insert( event );

// another thread may do this:
EntryPointInterface stock = session.getEntryPoint( "StockExchangeStream" );
// insert each event as usual
stock.insert( event );

There is still a lot to do, but we are getting there.

Happy drooling,

1 comment:

  1. can you show an example of an implementation of a stream?