Showing posts with label messaging. Show all posts
Showing posts with label messaging. Show all posts

Sunday, October 25, 2015

Using Apache Camel and CDI for creating scalable distributed services backed by JMS (part 2)

Continuing the example shown in part1 we now add a new feature, the Claim Check. This is the Apache Camel / CDI equivalent of another Spring Integration example shown before.  This pattern, along with most of the groundwork used in enterprise integration, was first codified in "Enterprise Integration Patterns" by
Gregor Hohpe and Bobby Woolf. This book is a must read for any one working in this area, regardless of the tool.

In this example we add the necessary pieces to store data in an external data store while our original message is route throughout our system. The main motivation is to avoid sending and receiving large amounts of data through JMS or other similar systems that are designed to handle low latency communication. Contrary to Spring Integration, Apache Camel does not offer an out of the box implementation of the the Claim Check pattern. The pattern and how to implemented are described here.There are talks of introducing one for Apache Camel 3.0, but reading the discussions it becomes evident that providing a flexible general case implementation is complicated. Spring Integration provides an implementation, but many times it's insufficient, as it becomes evident that it's necessary to only store parts of the message (most likely binary portions of the message), while keeping most of the message to be processed along the pipeline. The general flow of this pattern can be seen below:

The code

To implement the Claim Check pattern, five different new classes were created. The first two classes provide the actual implementation of the two pieces that make up the Claim Check pattern. The two pieces are in essence a Content Filter that removes part of the content (and stores it in separate database), and a Content Enricher that will pull the data from database and back into the message. The code is seen below:

@Named("claimCheckIn")
public class ClaimCheckIn {

    @Inject
    private DataStore dataStore;

    public void checkIn(Exchange exchange, @Body Object body) {
        String id = UUID.randomUUID().toString();
        // store the message in the data store
        dataStore.put(id, body);
        // add the claim check as a header
        exchange.getIn().setHeader("claimCheck", id);
        // remove the body from the message
        exchange.getIn().setBody(null);
    }
}

@Named("claimCheckOut")
public class ClaimCheckOut {

    @Inject
    private DataStore dataStore;

    public void checkOut(Exchange exchange, @Header("claimCheck") String claimCheck) {
        exchange.getIn().setBody(dataStore.get(claimCheck));
        // remove the message data from the data store
        dataStore.remove(claimCheck);
        // remove the claim check header
        exchange.getIn().removeHeader("claimCheck");
    }
}


The data store provides a very simple abstraction to put, get, and delete objects from the database. This current implementation is using JDBC, but shows the basics of how a data store can be implemented:
public class DataStore {

    @Inject
    DataSource dataSource;

    public void put(String id, Object body) {
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("INSERT INTO DATA (id,data) VALUES (?,?)")) {
            stmt.setString(1, id);
            stmt.setObject(2, body);
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public Object get(String id) {
        Object ret = null;
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("SELECT * FROM  DATA WHERE id=?")) {
            stmt.setString(1, id);
            try (ResultSet rs = stmt.executeQuery()) {
                if (rs.next()) {
                    ret = rs.getObject("data");
                }
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
        return ret;
    }

    public void remove(String id) {
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("DELETE FROM  DATA WHERE id=?")) {
            stmt.setString(1, id);
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}



The last two new classes, H2ServerWrapper and DataSourceProvider, provide the connections to the database, as well as starting the in memory H2 database. Theses classes are normally not needed, since connections are handled by the application server in most Java EE applications.  There were added here to provide similar functionality without an application server. You're welcome to look at the code, but in future post I'll explain in more detail how to provide these services from inside CDI.

Wiring it all together

Once we have our classes implementing the functionality, the wiring is very simple:
    RouteBuilder jmsClientCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:order").to("jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);
        }
    };

    RouteBuilder jmsServerCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("jms:queue:order?concurrentConsumers=5").to("bean:orderServiceHandler", "bean:claimCheckIn");
        }
    };

All it takes is adding one destination before and one after.  In Camel, declaring multiple destinations will create a pipeline. This means that the message will be sent to the first destination, then the result of the first destination will be sent to the second destination, and so on. This in contrast to multicast, which will send the same message to all of the destinations.

The first change is the addition of  "bean:claimCheckOut", after "jms:queue:order". What this does is to check out the data and putting it back on the message once the response arrives.  Something similar is done on the server side.  First we process the request using the orderServiceHandler bean, and then put the response in the data store.  This particular configuration is using the check in for only the response.

It's fairly easy to change it to use the claim check for both the request and the response. It's worth noting that such cases are not common in practice, as normally either the response or the request are large.
    RouteBuilder jmsClientCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:order").to("bean:claimCheckIn", "jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);
        }
    };

    RouteBuilder jmsServerCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("jms:queue:order?concurrentConsumers=5").to("bean:claimCheckOut", "bean:orderServiceHandler", "bean:claimCheckIn");
        }
    };
In this case, request is also stored.  Since our check in method stores the whole object, this will an instance of BeanInvokation. This method contains the parameters and other information need to invoke the service method.  Based on your needs, you could of course only extract part of this object, for example one or more of the parameters, while keeping the rest of the message intact.

Side by side comparison with Spring Integration

The following sections compare mostly equivalent Spring Integration XML to their Apache Camel DSL. I have not included the implementation of the extra infrastructure pieces which Apache Camel requires.

Routing to a JMS queue and then retrieing the data from the data store
<int:chain input-channel="requestChannel">
    <int-jms:outbound-gateway request-destination="amq.outbound" extract-request-payload="true"/>
    <int:claim-check-out message-store="clientMessageStore"/>
</int:chain>
vs
from("direct:order").to( "jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);

Listening to a JMS queue and routing messages to a bean and storing the result

<int:chain  input-channel="inChannel"> 
    <int:service-activator ref="orderServiceHandler" method="processOrder"/>
    <int:claim-check-in message-store="serverMessageStore"/> 
</int:chain>
vs
from("jms:queue:order?concurrentConsumers=5").to( "bean:orderServiceHandler", "bean:claimCheckIn");

Conclusion

From this short tutorial, it's evident that Apache Camel requires quite a bit extra code to accomplish the same tasks we did with Spring Integration. However, most of it becomes irrelevant if running inside an application server, which is the main target for CDI.  The biggest piece that I feel should be provided by Apache Camel is a set of data stores using common data sources such as JDBC, JPA, and some of the more popular NoSQL databases.  However, once that hurdle is overcome, Apache Camel shines in the ease in which complex problems can be solved in clear concise and easy to maintain code. So which one is better? A lot of it depends on what framework you're already using (Spring vs CDI), and if any of the integration platforms provides a particular esoteric feature. But in the end, both tools are very capable and easy to use.

Source code

You can find the source code for this example in github.
To check out the code clone the following repository:https://github.com/aolarte/camel-integration-samples.git.
git clone https://github.com/aolarte/camel-integration-samples.git
git checkout branches/part2
This example can be run directly from maven. For example :
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="server client"

Tuesday, May 19, 2015

Using Spring Integration for creating scalable distributed services backed by JMS (part 1)

Spring Integration provides an implementation of the Enterprise Integration Patterns (as described in the book "Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions" by Gregor Hohpe and Bobby Woolf). While these patterns are mostly designed to integrate separate heterogeneous systems, Spring Integration allows you to use these patterns inside of your application. One useful mechanism provided by Spring Integration is an implementation of the "Request-Reply" pattern. In this pattern a command message is sent with a "ReplyTo" address. Using the "ReplyTo" address, the replier entity knows where to send the response.  This pattern is illustrated in the following diagram:


The plumbing to achieve this is somewhat complicated an tedious, however Spring Integration takes care of all of the boilerplate code, with just a few lines of XML. This provides a very powerful mechanism, where you can define your interface and use it, without any knowledge of the mechanism used to communicate with the actual implementation. Furthermore, you can change this mechanism as needed, just by changing your configuration.
In this post we're going to wire a service using a direct connection. The direct connection occurs inside the JVM, in the same thread, resulting in very little overhead. We're also going to configure that same service using a JMS queue. Using this transport mechanism, the Requestor and the Replier and decoupled, and can live in different JVMs, potentially hosted in different machines. This also allows you to scale, by adding more Replier machines, to increase the capacity.

Using Spring Integration to wire a service

In this first part, we're going to create a an Interface for a service. The implementation of this interface is not wired directly, but rather provided by Spring Integration, using the configuration provided in the spring XML. The Interface is very basic as you can see below. When using Spring Integration it's best to design simple Interfaces, with few, coarse grained methods.

package com.javaprocess.examples.integration.interfaces;

import com.javaprocess.examples.integration.pojos.Order;
import com.javaprocess.examples.integration.pojos.OrderConfirmation;

public interface IOrderService {
    OrderConfirmation placeOrder(Order order);
}
This interface is going to be backed by the following class:
package com.javaprocess.examples.integration.impl;

import com.javaprocess.examples.integration.pojos.Order;
import com.javaprocess.examples.integration.pojos.OrderConfirmation;

public class OrderServiceHandler {


        public OrderConfirmation processOrder(Order order) {
            long threadId = Thread.currentThread().getId();
            System.out.println("Got order with id " + order.getId() + " on thread: " + threadId);
            OrderConfirmation ret = new OrderConfirmation();

            ret.setThreadHandler(threadId);
            return ret;
        }
}
You might notice that OrderServiceHandler does not implement IOrderService, and in fact, the methods names do not even match. So how can we have such level of decoupling? This is where Spring Integration comes into the picture:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
  ">
    
    <bean id="orderServiceHandler" class="com.javaprocess.examples.integration.impl.OrderServiceHandler"/> <!-- 1 -->

    <int:channel id="requestChannel"/> <!-- 2 -->

    <int:service-activator input-channel="requestChannel"
    ref="orderServiceHandler" method="processOrder" /> <!-- 3 -->

    <int:gateway id="orderService"
                 service-interface="com.javaprocess.examples.integration.interfaces.IOrderService"  <!-- 4 -->
                 default-request-channel="requestChannel"/> <!-- 5 -->

    <bean id="app" class="com.javaprocess.examples.integration.impl.App">
        <property name="orderService" ref="orderService"/> <!-- 6 -->
    </bean>
    
</beans>
Let's look at the relevant part of this Spring xml configuration (spring-int.xml):

  1. We instantiate an OrderServiceHandler bean. This will execute our business logic. How you instantiate it is your choice, we declare it manually in this example for clarity.
  2. We declare a Spring Integration channel "requestChannel". This channel will send the reply back to the requestor.
  3. The Service Activator is an endpoint, which connects a channel to Spring Bean. In this case we're connecting requestChannel channel to our business logic bean.  In this xml block we also declare that the "processOrder" method will invoked for this Service Activator.
  4. The Gateway is another endpoint point, designed to expose a simple interface, while hiding all of the Spring Integration complexity. In this case we're wiring our two channels, and exposing the gateway as "orderService" implementing the IOrderService interface. This is the basic syntax for an interface with a single method, the xml get more complicated with additional methods as every method must be wired independently.
  5. We're using "requestChannel" to send out requests.
  6. The "app" bean is our test application. 
We explicitly specified a channel to send the request ("requestChannel"). So how does the JMS outgoing-gateway know where to send the reply? By default it sends it back to the requester, using a temporary anonymous channel, so Spring Integration gives us that functionality for free. If you have special needs, to can specify the reply channel. We'll explore this in future postings. 
The code for the App is shown below, and it creates 5 threads, processing an order in each thread, using the IOrderService bean provided by Spring Integration (Remember, we have no implementation of IOrderService anywhere in our source code):
package com.javaprocess.examples.integration.impl;

import com.javaprocess.examples.integration.pojos.Order;
import com.javaprocess.examples.integration.pojos.OrderConfirmation;
import com.javaprocess.examples.integration.interfaces.IOrderService;

import java.util.ArrayList;
import java.util.List;

public class App {

    public class AppWorker implements Runnable {
        public void run() {
            long threadId=Thread.currentThread().getId();
            System.out.println("Requesting order processing on thread: " + threadId);
            Order order=new Order();
            order.setId(100);
            OrderConfirmation ret= orderService.placeOrder(order);
            System.out.println("Order was requested by " + threadId+"  and by processed by thread: " + ret.getThreadHandler());
        }
    }

    private IOrderService orderService;

    public void setOrderService(IOrderService orderService) {
        this.orderService = orderService;
    }

    public void run() {
        List list=new ArrayList();
        for (int i=0;i<5 appworker="" catch="" e.printstacktrace="" e="" for="" hread="" i="" list.add="" new="" nterruptedexception="" pre="" thread.join="" thread.start="" thread:list="" thread="" try="">
The example is provided as a Maven application, and can be compiled command:
mvn compile
Once compiled, you can see it in action:
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="direct"
The argument "direct" will cause the main method to use the Spring configuration we were examining before. The output will look something like this:
Running in client mode
Requesting order processing on thread: 13
Requesting order processing on thread: 14
Requesting order processing on thread: 15
Requesting order processing on thread: 16
Requesting order processing on thread: 17
Got order with id 100 on thread: 16
Got order with id 100 on thread: 15
Got order with id 100 on thread: 17
Got order with id 100 on thread: 13
Got order with id 100 on thread: 14
Order was requested by 16  and by processed by thread: 16
Order was requested by 15  and by processed by thread: 15
Order was requested by 14  and by processed by thread: 14
Order was requested by 17  and by processed by thread: 17
Order was requested by 13  and by processed by thread: 13
So what is happening in this example?

  • Spring Integration has created a dynamic proxy that implements IOrderService.  This proxy is exposed as Spring Bean with name "orderService".  This is the bean is used from inside App.
  • The "placeOrder" method in "orderService" is connected using two channels to a method ("processOrder") in the "orderServiceHandler" bean.
  • The processing of the order is occurring in the same thread in which it is requested.
This can be seen in the following diagram:
Service connected directly using Spring Integration


Up to this point, Spring Integration seems to add very little value, and a lot of extra XML configuration. The same result could have been achieved by having OrderServiceHandler implement IOrderService and wiring the bean directly (even easier if using annotations). However the real power of Spring Integration will become evident on the next section.

Using JMS with Spring Integration

In this next section we're going to have two separate Spring configuration, one for the Requestor, and one for the Replier. They exist in two different files, and we can run them in the same JVM, or in separate JVMs.
First we will examine the file for requestor (spring-int-client.xml):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
  ">

    <int:channel id="outChannel"></int:channel> <!-- 1 -->

    <int:gateway id="orderService"
                 service-interface="com.javaprocess.examples.integration.interfaces.IOrderService"
                 default-request-channel="outChannel"/> <!-- 2 -->

    <int-jms:outbound-gateway request-channel="outChannel"
                              request-destination="amq.outbound" extract-request-payload="true"/> <!-- 3 -->

    <bean id="app" class="com.javaprocess.examples.integration.impl.App">
        <property name="orderService" ref="orderService"/> <!-- 4 -->
    </bean>

</beans>

  1. The outgoing channel is declared. This channel is used by the gateway to send its requests.
  2. The gateway is setup in a similar fashion as in the first example. You might have noticed that it lacks a setting for "default-reply-channel", this is on purpose, and will be explained shortly.
  3. The JMS outgoing gateway (notice we're using the int-jms namespace). This is the interface with the JMS broker, it will read messages from "outChannel" and send them to the "amq.outbound" JMS queue.
  4. The app is the same as used in the first example. This way we show how you wire services differently, without rewriting any code.
The counter part to requestor, is the replier (spring-int-server.xml). The file is shown below:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
   http://activemq.apache.org/schema/core
   http://activemq.apache.org/schema/core/activemq-core.xsd
  ">

    <amq:broker id="activeMQBroker" useJmx="false" persistent="false">
        <amq:transportConnectors>
        <amq:transportConnector uri="tcp://localhost:61616" />
        </amq:transportConnectors>
    </amq:broker> <!-- 1 -->

    <bean id="orderServiceHandler" class="com.javaprocess.examples.integration.impl.OrderServiceHandler"/> <!-- 2 -->

    <int:channel id="inChannel" /> <!-- 3 -->

    <int-jms:inbound-gateway request-channel="inChannel" request-destination="amq.outbound"
                             concurrent-consumers="10"/> <!-- 4 -->

    <int:service-activator input-channel="inChannel"
                           ref="orderServiceHandler" method="processOrder"/> <!-- 5 -->

</beans>
  1. A broker is configured to run in-process, on the JVM hosting the replier or server component. This is suitable for a proof of concept such as this. In production you will want to run a stand alone and properly tuned Broker.
  2. This channel will be used to send messages received from JMS, to the service activator
  3. A JMS inbound gateway will read messages from the Queue. Of special interest in here is the number of concurrent consumers. This can be tweaked to set the number of threads that will process incoming messages.
  4. The service activator is configured to listen on the "inChannel". Similar to the way we defined Gateway, we have omitted the output-channel parameter.

You can run both the requestor and the replier in the same JVM, by using this command:
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="client server"
Running in this fashion will result in an input similar to this:
Running in client mode
Requesting order processing on thread: 33
Requesting order processing on thread: 34
Requesting order processing on thread: 35
Requesting order processing on thread: 36
Requesting order processing on thread: 37
Got order with id 100 on thread: 19
Got order with id 100 on thread: 24
Got order with id 100 on thread: 26
Got order with id 100 on thread: 18
Got order with id 100 on thread: 25
Order was requested by 37  and by processed by thread: 26
Order was requested by 36  and by processed by thread: 18
Order was requested by 34  and by processed by thread: 25
Order was requested by 33  and by processed by thread: 19
Order was requested by 35  and by processed by thread: 24
Closing context
Notice how the the thread that made the request is different from the thread the replied to the request.
We can take this a step further and see it run in two different JVMs. First start the server. This is necessary since it also starts the JMS broker that the client will connect to. Invoked in this fashion, the server will run until manually terminated, fulfilling any requests put in the queue.
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="server"
Once the server has started, open a new terminal or command prompt window, and start the client:
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="client"
You will see the client making requests:
Running in client mode
Requesting order processing on thread: 13
Requesting order processing on thread: 14
Requesting order processing on thread: 15
Requesting order processing on thread: 16
Requesting order processing on thread: 17
Order was requested by 15  and by processed by thread: 21
Order was requested by 16  and by processed by thread: 24
Order was requested by 14  and by processed by thread: 20
Order was requested by 13  and by processed by thread: 17
Order was requested by 17  and by processed by thread: 19
Closing context
You will also see the server fulfilling these requests:
Running in server mode
Got order with id 100 on thread: 19
Got order with id 100 on thread: 17
Got order with id 100 on thread: 20
Got order with id 100 on thread: 24
Got order with id 100 on thread: 21
If you start more than one client working at a time, you will see the server replying to all the incoming requests.

So how is this working?

On the other hand we define a JMS Queue as the destination to send the request, but how does the replier know which channel to use to send the respond? The outgoing-gateway will create a temporary channel, and sets "ReplyTo" JMS header to this temporary channel. This can conceptualized in the following diagram:
JMS backed service

Conclusion

So why should you try to use something like this? We'll point out some general advantages, but as usual, remember that the best solution depends on your particular needs.

  • This approach provides a deep decoupling, allowing you to have servers in the back servicing requests. These servers can be even added on demand.
  • The scalability of Messaging based service buses tends to be better than other approaches like load balances, specially under heavy load. If using a load balancer, a single request that takes too long could cause a server to drop off the load balancer. With messaging, the available servers will continue to service requests as fast as possible.
  • Spring Integration provides a great deal of extra functionality out of the box, such as the "Claim Check" pattern. We'll examine some of these possibilities in future posts.  Other remoting frameworks (such as plain Spring remoting) do not provide this wealth of extra functionality.
  • This approach provides a huge deal of flexibility. Can even wire different methods in single interface using different transports (JMS, direct connection, REST web services).
  • The configuration can be easily changed by modifying the XML files. These files can even be located in files outside of the application. This allows precise tuning, without having to change the application code, recompile, or repackage. In this respect, I favor using the XML configuration as opposed to Java configuration.

Source code

You can find the source code for this example in github
To check out the code clone the following repository: https://github.com/aolarte/spring-integration-samples.git.
git clone https://github.com/aolarte/spring-integration-samples.git
git checkout branches/part1