Sunday, October 25, 2015

Using Apache Camel and CDI for creating scalable distributed services backed by JMS (part 2)

Continuing the example shown in part1 we now add a new feature, the Claim Check. This is the Apache Camel / CDI equivalent of another Spring Integration example shown before.  This pattern, along with most of the groundwork used in enterprise integration, was first codified in "Enterprise Integration Patterns" by
Gregor Hohpe and Bobby Woolf. This book is a must read for any one working in this area, regardless of the tool.

In this example we add the necessary pieces to store data in an external data store while our original message is route throughout our system. The main motivation is to avoid sending and receiving large amounts of data through JMS or other similar systems that are designed to handle low latency communication. Contrary to Spring Integration, Apache Camel does not offer an out of the box implementation of the the Claim Check pattern. The pattern and how to implemented are described here.There are talks of introducing one for Apache Camel 3.0, but reading the discussions it becomes evident that providing a flexible general case implementation is complicated. Spring Integration provides an implementation, but many times it's insufficient, as it becomes evident that it's necessary to only store parts of the message (most likely binary portions of the message), while keeping most of the message to be processed along the pipeline. The general flow of this pattern can be seen below:

The code

To implement the Claim Check pattern, five different new classes were created. The first two classes provide the actual implementation of the two pieces that make up the Claim Check pattern. The two pieces are in essence a Content Filter that removes part of the content (and stores it in separate database), and a Content Enricher that will pull the data from database and back into the message. The code is seen below:

@Named("claimCheckIn")
public class ClaimCheckIn {

    @Inject
    private DataStore dataStore;

    public void checkIn(Exchange exchange, @Body Object body) {
        String id = UUID.randomUUID().toString();
        // store the message in the data store
        dataStore.put(id, body);
        // add the claim check as a header
        exchange.getIn().setHeader("claimCheck", id);
        // remove the body from the message
        exchange.getIn().setBody(null);
    }
}

@Named("claimCheckOut")
public class ClaimCheckOut {

    @Inject
    private DataStore dataStore;

    public void checkOut(Exchange exchange, @Header("claimCheck") String claimCheck) {
        exchange.getIn().setBody(dataStore.get(claimCheck));
        // remove the message data from the data store
        dataStore.remove(claimCheck);
        // remove the claim check header
        exchange.getIn().removeHeader("claimCheck");
    }
}


The data store provides a very simple abstraction to put, get, and delete objects from the database. This current implementation is using JDBC, but shows the basics of how a data store can be implemented:
public class DataStore {

    @Inject
    DataSource dataSource;

    public void put(String id, Object body) {
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("INSERT INTO DATA (id,data) VALUES (?,?)")) {
            stmt.setString(1, id);
            stmt.setObject(2, body);
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public Object get(String id) {
        Object ret = null;
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("SELECT * FROM  DATA WHERE id=?")) {
            stmt.setString(1, id);
            try (ResultSet rs = stmt.executeQuery()) {
                if (rs.next()) {
                    ret = rs.getObject("data");
                }
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
        return ret;
    }

    public void remove(String id) {
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("DELETE FROM  DATA WHERE id=?")) {
            stmt.setString(1, id);
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}



The last two new classes, H2ServerWrapper and DataSourceProvider, provide the connections to the database, as well as starting the in memory H2 database. Theses classes are normally not needed, since connections are handled by the application server in most Java EE applications.  There were added here to provide similar functionality without an application server. You're welcome to look at the code, but in future post I'll explain in more detail how to provide these services from inside CDI.

Wiring it all together

Once we have our classes implementing the functionality, the wiring is very simple:
    RouteBuilder jmsClientCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:order").to("jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);
        }
    };

    RouteBuilder jmsServerCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("jms:queue:order?concurrentConsumers=5").to("bean:orderServiceHandler", "bean:claimCheckIn");
        }
    };

All it takes is adding one destination before and one after.  In Camel, declaring multiple destinations will create a pipeline. This means that the message will be sent to the first destination, then the result of the first destination will be sent to the second destination, and so on. This in contrast to multicast, which will send the same message to all of the destinations.

The first change is the addition of  "bean:claimCheckOut", after "jms:queue:order". What this does is to check out the data and putting it back on the message once the response arrives.  Something similar is done on the server side.  First we process the request using the orderServiceHandler bean, and then put the response in the data store.  This particular configuration is using the check in for only the response.

It's fairly easy to change it to use the claim check for both the request and the response. It's worth noting that such cases are not common in practice, as normally either the response or the request are large.
    RouteBuilder jmsClientCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:order").to("bean:claimCheckIn", "jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);
        }
    };

    RouteBuilder jmsServerCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("jms:queue:order?concurrentConsumers=5").to("bean:claimCheckOut", "bean:orderServiceHandler", "bean:claimCheckIn");
        }
    };
In this case, request is also stored.  Since our check in method stores the whole object, this will an instance of BeanInvokation. This method contains the parameters and other information need to invoke the service method.  Based on your needs, you could of course only extract part of this object, for example one or more of the parameters, while keeping the rest of the message intact.

Side by side comparison with Spring Integration

The following sections compare mostly equivalent Spring Integration XML to their Apache Camel DSL. I have not included the implementation of the extra infrastructure pieces which Apache Camel requires.

Routing to a JMS queue and then retrieing the data from the data store
<int:chain input-channel="requestChannel">
    <int-jms:outbound-gateway request-destination="amq.outbound" extract-request-payload="true"/>
    <int:claim-check-out message-store="clientMessageStore"/>
</int:chain>
vs
from("direct:order").to( "jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);

Listening to a JMS queue and routing messages to a bean and storing the result

<int:chain  input-channel="inChannel"> 
    <int:service-activator ref="orderServiceHandler" method="processOrder"/>
    <int:claim-check-in message-store="serverMessageStore"/> 
</int:chain>
vs
from("jms:queue:order?concurrentConsumers=5").to( "bean:orderServiceHandler", "bean:claimCheckIn");

Conclusion

From this short tutorial, it's evident that Apache Camel requires quite a bit extra code to accomplish the same tasks we did with Spring Integration. However, most of it becomes irrelevant if running inside an application server, which is the main target for CDI.  The biggest piece that I feel should be provided by Apache Camel is a set of data stores using common data sources such as JDBC, JPA, and some of the more popular NoSQL databases.  However, once that hurdle is overcome, Apache Camel shines in the ease in which complex problems can be solved in clear concise and easy to maintain code. So which one is better? A lot of it depends on what framework you're already using (Spring vs CDI), and if any of the integration platforms provides a particular esoteric feature. But in the end, both tools are very capable and easy to use.

Source code

You can find the source code for this example in github.
To check out the code clone the following repository:https://github.com/aolarte/camel-integration-samples.git.
git clone https://github.com/aolarte/camel-integration-samples.git
git checkout branches/part2
This example can be run directly from maven. For example :
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="server client"

Monday, October 19, 2015

Using Apache Camel and CDI for creating scalable distributed services backed by JMS (part 1)

As an exercise and follow up to my Spring Integration post, and after a discussion with a coworker, I decided to reimplement the test application using Apache Camel and CDI (Context and Dependency Injection). I will show some the basic pieces that make the solution work, as well as doing a side by side comparison with the XML from the Spring Integration solution.

The main business logic of the application is an almost exact copy of the Spring Integration example (Application.java, OrderServiceHandler.class, etc...). The only difference is in the annotations, since we're using JSR-299 CDI annotations like @Inject and @Produces.

The application has a "client" portion that generates messages that are serviced by a bean. This bean is either located in the same JVM, or remotely accesible through a JMS queue. The objective of this example is to show how easy it is to use this pattern create distributed services that enable spreading particularly resource intensive operations to backend nodes. This is the same operation of the Spring Integration post mentioned previously, therefore the flow can be shown using the exact same diagrams as in previous example:

The main routing logic is found in CamelContext.java.
For anyone used to Spring Integration, one of the most obvious things that jumps out is the abundance of code to wire the Camel and CDI components together. On the other hand, there is basically no XML. The only XML file (beans.xml) is basically empty.
CDI favors using code to wire components. This is done mostly through annotations, and simple producer methods (which are annotated with @Produces). This is philosophically very different from Spring. Camel does provide a way to configure its routes using XML, but it requires Spring, so it's not really viable with CDI.
The general format of a Camel route is to define a source (using RouteBuilder.from()) and a destination (using RouteBuilder.to()). RouteBuilder provides a fluent API, and extra parameters can be set, for example the Exchange Pattern, which is needed to get a response back.
The format of uri passed to the from and to methods is always prefixed by the component name.

For example the following definition will route requests from a direct endpoint named "order", to a bean named "orderServiceHandler":
from("direct:order").to("bean:orderServiceHandler");
In this case we use the "direct" component which defines an internal connection, and the "jms" component. The "bean" component provides a container agnostic way to access beans in Dependency Injection container.
In this case we're including the "camel-cdi" artifact, which allows Camel to locate beans in a JSR-299 (CDI) container such as Weld. Similar modules exist for other containers such as Spring, or a manual Register may be manually maintained.
The uri also includes the destination name, as well as extra parameters. For example we can define that 5 concurrent listeners (threads) will consume from a single queue:
jms:queue:order?concurrentConsumers=5

Camel can be hooked into CDI to provide beans that can be consumed by using @Produces. This method will produce objects of type IOrderService. The resulting object will be a proxy that will be backed by the "direct:order" endpoint.

    @Produces
    public IOrderService createService() throws Exception {
        IOrderService service = new ProxyBuilder(camelCtx).endpoint("direct:order").build(IOrderService.class);
        return service;
    }
A bit of extra logic was change to determine which routes to add based on the functionality desired by the user and passed as command line parameters.  In the Spring example this was controlled based on which files were to be included.
This does limit the flexibility of the solution, since there are no XML files to tweak after building the application.

It worth noting that CDI only supports mapping one method. In practice the interface can have more than one method, but the invocation of any of those methods will be sent to same endpoint.
More sophisticated manipulation can be done, but in my experience it's not worth the effort unless absolutely necessary.

Other functions that were defined in XML in the Spring example are now done in code, for example starting an embedded ActiveMQ JMS Broker and JMS connection factory.  These are handled inside JMSConnectionFactoryProvider, but in most cases will be handled by a Java EE container.

Running the example

This example can be run in the same manner as the Spring Integration one. The code must first be compiled:
mvn compile
Then it can be run from Maven, passing one (or two) of three parameters. The three possible parameters are:
  • direct: Uses a direct connection between the client and the bean handling the requests
  • server: Starts a server that will monitor an ActiveMQ queue, and process any requests sent to it. This option will also start an embedded ActiveMQ server, which prevents more than one server from running at once. If using an external ActiveMQ server, no such restriction exists.
  • client: Start the distributed client, which will send and receive requests through ActiveMQ. This can be used 
For example :

mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="server client"

Will result in the following output:

Running in client mode
Requesting order processing on thread: 38
Requesting order processing on thread: 41
Requesting order processing on thread: 39
Requesting order processing on thread: 40
Requesting order processing on thread: 42
Got order with id 100 on thread: 33
Got order with id 100 on thread: 32
Got order with id 100 on thread: 30
Got order with id 100 on thread: 29
Got order with id 100 on thread: 31
Order was requested by 40  and by processed by thread: 33
Order was requested by 42  and by processed by thread: 32
Order was requested by 41  and by processed by thread: 29
Order was requested by 39  and by processed by thread: 31
Order was requested by 38  and by processed by thread: 30
Stop Camel

Side by side comparison with Spring Integration

The following sections compare mostly equivalent Spring Integration XML to their Apache Camel DSL.
Directly routing to a bean
<int:channel id="requestChannel"/>
<int:service-activator input-channel="requestChannel"
 ref="orderServiceHandler" method="processOrder" />
vs.
from("direct:order").to("bean:orderServiceHandler");
Routing to a JMS queue
<int:channel id="requestChannel"/>
<int-jms:outbound-gateway request-channel="requestChannel" 
 request-destination="amq.outbound" extract-request-payload="true"/>
vs.
from("direct:order").to("jms:queue:order").setExchangePattern(ExchangePattern.InOut);
Listening to a JMS queue and routing messages to a bean for service
<int:channel id="inChannel" />
<int-jms:inbound-gateway request-channel="inChannel"
 request-destination="amq.outbound"
 concurrent-consumers="10"/>
<int:service-activator  input-channel="inChannel"
 ref="orderServiceHandler" method="processOrder"/>
vs.
from("jms:queue:order?concurrentConsumers=10").to("bean:orderServiceHandler");
Creating a bean that is a proxy connected to a route
<int:gateway id="orderService"
 service-interface="com.javaprocess.examples.integration.interfaces.IOrderService"
 default-request-channel="requestChannel" />
vs.
@Produces
public IOrderService createService() throws Exception {
 IOrderService service = new ProxyBuilder(camelCtx).endpoint("direct:order").build(IOrderService.class);
 return service;
}

Notes about CDI

I'm a big fan of CDI to wire Java EE applications. However, for wiring standalone applications, CDI is lacking.  For a small application I would rather go with something simpler like Google Guice.
However, to show a comparable application to the standalone Spring Integration example, I have used Weld, the reference implementation for CDI and one of the most popular implementations out there.

One particular challenge was starting a bean eagerly. If running inside a Java EE container, this could have achieved with a single annotation @Startup. It is in cases like this that it becomes obvious that CDI is meant to compliment Java EE. However, for my standalone example I had to implement an extension that achieved this behavior. While CDI provides the way to do it, it still not ideal. More information on how this is achieved can be seen in this detailed post.

Conclusion

I hope that this short post has shown the value the Enterprise Integration Patterns, regardless of the implementation. Both Apache Camel and Spring Integration provides a rich set of the Enterprise Integration Patterns, which can be leveraged to solve complex real world problems.

Source code

You can find the source code for this example in github.
To check out the code clone the following repository:https://github.com/aolarte/camel-integration-samples.git.
git clone https://github.com/aolarte/camel-integration-samples.git