Wednesday, November 18, 2015

Using a rective stream as a data source for Drools

A few months ago we started redesigning the Drools lowest level executable model and making it accessible to end user with a Java 8 API. To demonstrate the flexibility of this approach I tried to integrate it with a reactive stream and in particular to use this stream as a data source for Drools.

To show how this works I created a simple temperature server that provides a RxJava Observable emitting every second the temperature for a given town and terminates after 5 seconds. There is also a second factory method that allows to merge more of these Observables in order to have a single Observable that emits the temperature for more than one town at the same time.

public class TempServer {
    public static Observable<TempInfo> getFeed(String town) {
        return Observable.create(subscriber ->
                                         Observable.interval(1, TimeUnit.SECONDS)
                                                   .subscribe(i -> {
                                                       if (i > 5) subscriber.onCompleted();
                                                       try {
                                                           subscriber.onNext(TempInfo.fetch(town));
                                                       } catch (Exception e) {
                                                           subscriber.onError(e);
                                                       }
                                                   }));
    }

    public static Observable<TempInfo> getFeeds(String... towns) {
        return Observable.merge(Arrays.stream(towns)
                                      .map(TempServer::getFeed)
                                      .collect(toList()));
    }
}

where the TempInfo.fetch method just returns a random temperature between -20 and 50 degrees

public TempInfo(String town, int temp) {
    this.town = town;
    this.temp = temp;
}

public static TempInfo fetch(String town) {
    return new TempInfo(town, random.nextInt(70) - 20);
}

Using an improved version of the Java 8 DSL presented in the former article I defined the following 2 rules:

Variable<TempInfo> temp = any( TempInfo.class );
Variable<Person> person = any( Person.class );

Rule r1 = rule("low temp")
        .view(
                subscribe(temp, "tempFeed"),
                expr(temp, t -> t.getTemp() < 0),
                input(person, "persons"),
                expr(person, temp, (p, t) -> p.getTown().equals(t.getTown()))
             )
        .then(on(person, temp)
                      .execute((p, t) -> System.out.println(p.getName() + " is freezing in " + p.getTown() + " - temp is " + t.getTemp())));

Rule r2 = rule("high temp")
        .view(
                subscribe(temp, "tempFeed"),
                expr(temp, t -> t.getTemp() > 30),
                input(person, "persons"),
                expr(person, temp, (p, t) -> p.getTown().equals(t.getTown()))
             )
        .then(on(person, temp)
                      .execute((p, t) -> System.out.println(p.getName() + " is sweating in " + p.getTown() + " - temp is " + t.getTemp())));

Here I'm using 2 different kinds of data sources: a passive one that can be considered a mere store of facts:


DataStore persons = storeOf(new Person("Mark", 37, "London"),
                            new Person("Edson", 35, "Toronto"),
                            new Person("Mario", 40, "Milano"));

that can be bound to a specific Drools KieSession with

bindDataSource(ksession, "persons", persons);

and a reactive one taken from the TempServer implemented above

Observable<TempInfo> tempFeed = TempServer.getFeeds( "Milano", "London", "Toronto" );

that can also be bound to the same KieSession in a similar way

bindRxObservable( ksession, "tempFeed", tempFeed );

Having done this you can fire those 2 rules and obtain an output like the following:

Mark is freezing in London - temp is -9
Edson is sweating in Toronto - temp is 42
Mario is sweating in Milano - temp is 42
Mario is sweating in Milano - temp is 49
Mark is freezing in London - temp is -17
Edson is sweating in Toronto - temp is 40
Edson is sweating in Toronto - temp is 47
Mario is freezing in Milano - temp is -14
Mark is freezing in London - temp is -8
Mark is freezing in London - temp is -17

The complete test case to run this example is available here.


Share/Bookmark

3 comments:

  1. In synopsis, maybe Hawking said all that needed to be said, while man-made brainpower might almost certainly represent a risk to mankind inside the following 100 years, current AI frameworks are demonstrating to be helpful apparatuses for people. ai courses

    ReplyDelete
  2. These things are very important, good think so - I think so too... data entry appraisal

    ReplyDelete