Thursday, April 7, 2016

ActiveMQ, prefetch limits, the Dispatch Queue and transactions

The objective of this article is to describe the interaction of ActiveMQ and its consumers, and how message delivery is affected by the use of transactions, acknowledgement modes, and the prefetch setting.  This information is particularly useful if you have messages that could potentially take a long time to process.

The Dispatch Queue

The Dispatch Queue contains a set of messages that ActiveMQ has destined to be sent to a particular consumer.  These messages are not available to be sent to any other consumers, unless their target consumer runs into an error (such as being disconnected). These messages are streamed to the consumer, to allow faster processing.  This is also referred to as "pushing" messages to the consumer. This is in contrast to consumer polling or "pulling" messages when it's available to process a new one.
The prefetch limit is defined by the ActiveMQ documentation as "how many messages can be streamed to a consumer at any point in time. Once the prefetch limit is reached, no more messages are dispatched to the consumer until the consumer starts sending back acknowledgements of messages (to indicate that the message has been processed)".  Basically, the prefetch limit defines the maximum number of messages to assign to the dispatch queue of a consumer. This can be seen in the following diagram (the dispatch queue of each consumer is depicted by the dotted line):

Dispatch queue with a prefetch limit of 5 and transactions enabled in the consumer

Streaming multiple messages to a client is a very significant performance boost, specially when messages can be processed quickly.  Therefore the defaults are quite high:


  • persistent queues (default value: 1000)
  • non-persistent queues (default value: 1000)
  • persistent topics (default value: 100)
  • non-persistent topics (default value: Short.MAX_VALUE -1)
The prefetch values can be configured at the connection level, with the value reflected in all consumers using that connection. The value can be overridden in a per consumer basis. Configuration details can be found in the ActiveMQ documentation.
Normally messages are distributed somewhat evenly, but by default ActiveMQ doesn't guarantee balanced loads between the consumers (you can plug in your own DispatchQueue policy, but in most cases that would overkill). Some cases in which the messages are unevenly distributed might be caused by:




  • A new consumer connects after all of the available messages have already been committed to the Dispatch Queue of the consumers already connected.
  • Consumers that have different priorities.
  • If the number of messages is small, they might all be assigned to a single consumer.
Tuning these numbers is normally not necessary, but if messages take (or could potentially take) a significant long time to process, it might be worth the effort to tune.  For example, you might want to ensure a more even balancing of message processing across multiple consumers, to allow processing in parallel.  While the competing consumer pattern is very common, the ActiveMQ's Dispatch Queue could get in your way. Particularly, one of the consumers can have all of the pending messages (up to the prefetch limit) assigned to its Dispatch Queue. This would leave other consumers idle.  Such a case can be seen below:
Dispatch queue with a prefetch limit of 5 and transactions enabled in the consumer


This is normally not a big issue if messages are processed quickly.  However, if the processing time of a message is significant, tweaking the prefetch limit is an option to get better performance.

Queues with low message volume and high processing time

While it's a best practice to ensure your consumer can process messages very quickly, that's not always possible. Sometimes you have to call a third party system that might be unreliable, or the business logic just keeps growing without much thought about the real world implications.
For consumers with very long processing times, or very variable processing time, it is recommended to reduce the prefetch queue.  A low prefetch limit prevents messages from "backing up" in the dispatch queue, earmarked for a consumer that is busy:
Dispatch queue with a prefetch limit of 5 and transactions enabled in the consumer

This behavior can be seen in the ActiveMQ console with a symptom most people describe as "stuck" messages, even though some of the consumers are idle.  If this is the case, it's worth examining the consumers:
The "Active Consumers" view can help shed light into what is actually happening:
Screenshot showing a consumer with 5 messages in its Dispatch Queue. The prefetch limit is set at 5 for this consumer.


To address the negative effects of such cases, a prefetch limit of 1 will ensure maximum usage of all available consumers:
Dispatch queue with a prefetch limit of 1 and transactions enabled in the consumer

This will negate some of the efficiencies of streaming a large number of messages to a consumer, but this is negligible in cases where processing each message takes a long time.

The Dispatch Queue and transactions (or lack thereof)

When the consumer is set to use Session.AUTO_ACKNOWLEDGE, the consumer will automatically acknowledge a message as soon as it receives it, and then it will start actually processing the message.  In this scenario, ActiveMQ has no idea if the consumer is consumer is busy processing a message or not, and will therefore not take that message into account for the dispatch queue delivery.  Therefore, it's possible for a SECOND message to be queued for a busy consumer, even if there is another consumer idle:

Dispatch queue with a prefetch limit of 1 and auto acknowledge enabled
If Consumer 1 takes a long time processing its message, the second message could will take a long time to even start being process.  This could have significant impact on the performance issues.  Normal troubleshooting might a few discrepancies

  • We have messages waiting to be picked up in ActiveMQ
  • We have idle consumers
How can we this situation be prevented? For such cases, one option is to disable the dispatch queue altogether, by setting the prefetch limit to zero.  This cause force consumers to have to fetch a message every time they're idle, instead of waiting for messages to be pushed to them.  This will further degrade performance of the JMS delivery, so it should be used will care.  However this will ensure that all available consumer are kept busy:

Consumer with no dispatch queue (prefetch limit set to zero)

Final thoughts and considerations

While the prefetch limit default is good enough for most applications, a good understanding of what happening under the covers can go a long way in tuning a system.

Sunday, October 25, 2015

Using Apache Camel and CDI for creating scalable distributed services backed by JMS (part 2)

Continuing the example shown in part1 we now add a new feature, the Claim Check. This is the Apache Camel / CDI equivalent of another Spring Integration example shown before.  This pattern, along with most of the groundwork used in enterprise integration, was first codified in "Enterprise Integration Patterns" by
Gregor Hohpe and Bobby Woolf. This book is a must read for any one working in this area, regardless of the tool.

In this example we add the necessary pieces to store data in an external data store while our original message is route throughout our system. The main motivation is to avoid sending and receiving large amounts of data through JMS or other similar systems that are designed to handle low latency communication. Contrary to Spring Integration, Apache Camel does not offer an out of the box implementation of the the Claim Check pattern. The pattern and how to implemented are described here.There are talks of introducing one for Apache Camel 3.0, but reading the discussions it becomes evident that providing a flexible general case implementation is complicated. Spring Integration provides an implementation, but many times it's insufficient, as it becomes evident that it's necessary to only store parts of the message (most likely binary portions of the message), while keeping most of the message to be processed along the pipeline. The general flow of this pattern can be seen below:

The code

To implement the Claim Check pattern, five different new classes were created. The first two classes provide the actual implementation of the two pieces that make up the Claim Check pattern. The two pieces are in essence a Content Filter that removes part of the content (and stores it in separate database), and a Content Enricher that will pull the data from database and back into the message. The code is seen below:

@Named("claimCheckIn")
public class ClaimCheckIn {

    @Inject
    private DataStore dataStore;

    public void checkIn(Exchange exchange, @Body Object body) {
        String id = UUID.randomUUID().toString();
        // store the message in the data store
        dataStore.put(id, body);
        // add the claim check as a header
        exchange.getIn().setHeader("claimCheck", id);
        // remove the body from the message
        exchange.getIn().setBody(null);
    }
}

@Named("claimCheckOut")
public class ClaimCheckOut {

    @Inject
    private DataStore dataStore;

    public void checkOut(Exchange exchange, @Header("claimCheck") String claimCheck) {
        exchange.getIn().setBody(dataStore.get(claimCheck));
        // remove the message data from the data store
        dataStore.remove(claimCheck);
        // remove the claim check header
        exchange.getIn().removeHeader("claimCheck");
    }
}


The data store provides a very simple abstraction to put, get, and delete objects from the database. This current implementation is using JDBC, but shows the basics of how a data store can be implemented:
public class DataStore {

    @Inject
    DataSource dataSource;

    public void put(String id, Object body) {
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("INSERT INTO DATA (id,data) VALUES (?,?)")) {
            stmt.setString(1, id);
            stmt.setObject(2, body);
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public Object get(String id) {
        Object ret = null;
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("SELECT * FROM  DATA WHERE id=?")) {
            stmt.setString(1, id);
            try (ResultSet rs = stmt.executeQuery()) {
                if (rs.next()) {
                    ret = rs.getObject("data");
                }
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
        return ret;
    }

    public void remove(String id) {
        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareCall("DELETE FROM  DATA WHERE id=?")) {
            stmt.setString(1, id);
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}



The last two new classes, H2ServerWrapper and DataSourceProvider, provide the connections to the database, as well as starting the in memory H2 database. Theses classes are normally not needed, since connections are handled by the application server in most Java EE applications.  There were added here to provide similar functionality without an application server. You're welcome to look at the code, but in future post I'll explain in more detail how to provide these services from inside CDI.

Wiring it all together

Once we have our classes implementing the functionality, the wiring is very simple:
    RouteBuilder jmsClientCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:order").to("jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);
        }
    };

    RouteBuilder jmsServerCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("jms:queue:order?concurrentConsumers=5").to("bean:orderServiceHandler", "bean:claimCheckIn");
        }
    };

All it takes is adding one destination before and one after.  In Camel, declaring multiple destinations will create a pipeline. This means that the message will be sent to the first destination, then the result of the first destination will be sent to the second destination, and so on. This in contrast to multicast, which will send the same message to all of the destinations.

The first change is the addition of  "bean:claimCheckOut", after "jms:queue:order". What this does is to check out the data and putting it back on the message once the response arrives.  Something similar is done on the server side.  First we process the request using the orderServiceHandler bean, and then put the response in the data store.  This particular configuration is using the check in for only the response.

It's fairly easy to change it to use the claim check for both the request and the response. It's worth noting that such cases are not common in practice, as normally either the response or the request are large.
    RouteBuilder jmsClientCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:order").to("bean:claimCheckIn", "jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);
        }
    };

    RouteBuilder jmsServerCamelRoute = new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("jms:queue:order?concurrentConsumers=5").to("bean:claimCheckOut", "bean:orderServiceHandler", "bean:claimCheckIn");
        }
    };
In this case, request is also stored.  Since our check in method stores the whole object, this will an instance of BeanInvokation. This method contains the parameters and other information need to invoke the service method.  Based on your needs, you could of course only extract part of this object, for example one or more of the parameters, while keeping the rest of the message intact.

Side by side comparison with Spring Integration

The following sections compare mostly equivalent Spring Integration XML to their Apache Camel DSL. I have not included the implementation of the extra infrastructure pieces which Apache Camel requires.

Routing to a JMS queue and then retrieing the data from the data store
<int:chain input-channel="requestChannel">
    <int-jms:outbound-gateway request-destination="amq.outbound" extract-request-payload="true"/>
    <int:claim-check-out message-store="clientMessageStore"/>
</int:chain>
vs
from("direct:order").to( "jms:queue:order", "bean:claimCheckOut").setExchangePattern(ExchangePattern.InOut);

Listening to a JMS queue and routing messages to a bean and storing the result

<int:chain  input-channel="inChannel"> 
    <int:service-activator ref="orderServiceHandler" method="processOrder"/>
    <int:claim-check-in message-store="serverMessageStore"/> 
</int:chain>
vs
from("jms:queue:order?concurrentConsumers=5").to( "bean:orderServiceHandler", "bean:claimCheckIn");

Conclusion

From this short tutorial, it's evident that Apache Camel requires quite a bit extra code to accomplish the same tasks we did with Spring Integration. However, most of it becomes irrelevant if running inside an application server, which is the main target for CDI.  The biggest piece that I feel should be provided by Apache Camel is a set of data stores using common data sources such as JDBC, JPA, and some of the more popular NoSQL databases.  However, once that hurdle is overcome, Apache Camel shines in the ease in which complex problems can be solved in clear concise and easy to maintain code. So which one is better? A lot of it depends on what framework you're already using (Spring vs CDI), and if any of the integration platforms provides a particular esoteric feature. But in the end, both tools are very capable and easy to use.

Source code

You can find the source code for this example in github.
To check out the code clone the following repository:https://github.com/aolarte/camel-integration-samples.git.
git clone https://github.com/aolarte/camel-integration-samples.git
git checkout branches/part2
This example can be run directly from maven. For example :
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="server client"

Monday, October 19, 2015

Using Apache Camel and CDI for creating scalable distributed services backed by JMS (part 1)

As an exercise and follow up to my Spring Integration post, and after a discussion with a coworker, I decided to reimplement the test application using Apache Camel and CDI (Context and Dependency Injection). I will show some the basic pieces that make the solution work, as well as doing a side by side comparison with the XML from the Spring Integration solution.

The main business logic of the application is an almost exact copy of the Spring Integration example (Application.java, OrderServiceHandler.class, etc...). The only difference is in the annotations, since we're using JSR-299 CDI annotations like @Inject and @Produces.

The application has a "client" portion that generates messages that are serviced by a bean. This bean is either located in the same JVM, or remotely accesible through a JMS queue. The objective of this example is to show how easy it is to use this pattern create distributed services that enable spreading particularly resource intensive operations to backend nodes. This is the same operation of the Spring Integration post mentioned previously, therefore the flow can be shown using the exact same diagrams as in previous example:

The main routing logic is found in CamelContext.java.
For anyone used to Spring Integration, one of the most obvious things that jumps out is the abundance of code to wire the Camel and CDI components together. On the other hand, there is basically no XML. The only XML file (beans.xml) is basically empty.
CDI favors using code to wire components. This is done mostly through annotations, and simple producer methods (which are annotated with @Produces). This is philosophically very different from Spring. Camel does provide a way to configure its routes using XML, but it requires Spring, so it's not really viable with CDI.
The general format of a Camel route is to define a source (using RouteBuilder.from()) and a destination (using RouteBuilder.to()). RouteBuilder provides a fluent API, and extra parameters can be set, for example the Exchange Pattern, which is needed to get a response back.
The format of uri passed to the from and to methods is always prefixed by the component name.

For example the following definition will route requests from a direct endpoint named "order", to a bean named "orderServiceHandler":
from("direct:order").to("bean:orderServiceHandler");
In this case we use the "direct" component which defines an internal connection, and the "jms" component. The "bean" component provides a container agnostic way to access beans in Dependency Injection container.
In this case we're including the "camel-cdi" artifact, which allows Camel to locate beans in a JSR-299 (CDI) container such as Weld. Similar modules exist for other containers such as Spring, or a manual Register may be manually maintained.
The uri also includes the destination name, as well as extra parameters. For example we can define that 5 concurrent listeners (threads) will consume from a single queue:
jms:queue:order?concurrentConsumers=5

Camel can be hooked into CDI to provide beans that can be consumed by using @Produces. This method will produce objects of type IOrderService. The resulting object will be a proxy that will be backed by the "direct:order" endpoint.

    @Produces
    public IOrderService createService() throws Exception {
        IOrderService service = new ProxyBuilder(camelCtx).endpoint("direct:order").build(IOrderService.class);
        return service;
    }
A bit of extra logic was change to determine which routes to add based on the functionality desired by the user and passed as command line parameters.  In the Spring example this was controlled based on which files were to be included.
This does limit the flexibility of the solution, since there are no XML files to tweak after building the application.

It worth noting that CDI only supports mapping one method. In practice the interface can have more than one method, but the invocation of any of those methods will be sent to same endpoint.
More sophisticated manipulation can be done, but in my experience it's not worth the effort unless absolutely necessary.

Other functions that were defined in XML in the Spring example are now done in code, for example starting an embedded ActiveMQ JMS Broker and JMS connection factory.  These are handled inside JMSConnectionFactoryProvider, but in most cases will be handled by a Java EE container.

Running the example

This example can be run in the same manner as the Spring Integration one. The code must first be compiled:
mvn compile
Then it can be run from Maven, passing one (or two) of three parameters. The three possible parameters are:
  • direct: Uses a direct connection between the client and the bean handling the requests
  • server: Starts a server that will monitor an ActiveMQ queue, and process any requests sent to it. This option will also start an embedded ActiveMQ server, which prevents more than one server from running at once. If using an external ActiveMQ server, no such restriction exists.
  • client: Start the distributed client, which will send and receive requests through ActiveMQ. This can be used 
For example :

mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="server client"

Will result in the following output:

Running in client mode
Requesting order processing on thread: 38
Requesting order processing on thread: 41
Requesting order processing on thread: 39
Requesting order processing on thread: 40
Requesting order processing on thread: 42
Got order with id 100 on thread: 33
Got order with id 100 on thread: 32
Got order with id 100 on thread: 30
Got order with id 100 on thread: 29
Got order with id 100 on thread: 31
Order was requested by 40  and by processed by thread: 33
Order was requested by 42  and by processed by thread: 32
Order was requested by 41  and by processed by thread: 29
Order was requested by 39  and by processed by thread: 31
Order was requested by 38  and by processed by thread: 30
Stop Camel

Side by side comparison with Spring Integration

The following sections compare mostly equivalent Spring Integration XML to their Apache Camel DSL.
Directly routing to a bean
<int:channel id="requestChannel"/>
<int:service-activator input-channel="requestChannel"
 ref="orderServiceHandler" method="processOrder" />
vs.
from("direct:order").to("bean:orderServiceHandler");
Routing to a JMS queue
<int:channel id="requestChannel"/>
<int-jms:outbound-gateway request-channel="requestChannel" 
 request-destination="amq.outbound" extract-request-payload="true"/>
vs.
from("direct:order").to("jms:queue:order").setExchangePattern(ExchangePattern.InOut);
Listening to a JMS queue and routing messages to a bean for service
<int:channel id="inChannel" />
<int-jms:inbound-gateway request-channel="inChannel"
 request-destination="amq.outbound"
 concurrent-consumers="10"/>
<int:service-activator  input-channel="inChannel"
 ref="orderServiceHandler" method="processOrder"/>
vs.
from("jms:queue:order?concurrentConsumers=10").to("bean:orderServiceHandler");
Creating a bean that is a proxy connected to a route
<int:gateway id="orderService"
 service-interface="com.javaprocess.examples.integration.interfaces.IOrderService"
 default-request-channel="requestChannel" />
vs.
@Produces
public IOrderService createService() throws Exception {
 IOrderService service = new ProxyBuilder(camelCtx).endpoint("direct:order").build(IOrderService.class);
 return service;
}

Notes about CDI

I'm a big fan of CDI to wire Java EE applications. However, for wiring standalone applications, CDI is lacking.  For a small application I would rather go with something simpler like Google Guice.
However, to show a comparable application to the standalone Spring Integration example, I have used Weld, the reference implementation for CDI and one of the most popular implementations out there.

One particular challenge was starting a bean eagerly. If running inside a Java EE container, this could have achieved with a single annotation @Startup. It is in cases like this that it becomes obvious that CDI is meant to compliment Java EE. However, for my standalone example I had to implement an extension that achieved this behavior. While CDI provides the way to do it, it still not ideal. More information on how this is achieved can be seen in this detailed post.

Conclusion

I hope that this short post has shown the value the Enterprise Integration Patterns, regardless of the implementation. Both Apache Camel and Spring Integration provides a rich set of the Enterprise Integration Patterns, which can be leveraged to solve complex real world problems.

Source code

You can find the source code for this example in github.
To check out the code clone the following repository:https://github.com/aolarte/camel-integration-samples.git.
git clone https://github.com/aolarte/camel-integration-samples.git

Monday, August 31, 2015

A simple CSV MessageBodyWriter for JAX-RS using Jackson

This is a very simple MessageBodyWriter that will allow you to output a List of objects as CSV from a JAX-RS webservice. Such services can be useful with frameworks such as D3.js. Jackson provides MessageBodyWriters for several formats, but it does not provide an out of the box solution for CSV. It does however offer several useful  classes to serialize objects into CSV. These are provided in the jackson-dataformat-csv artifact.
We can use those classes to create our own CSV MessageBodyWritter as shown below:
package csv;


import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;

import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.Provider;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.util.List;

@Provider
@Produces("text/csv")
public class CSVMessageBodyWritter implements MessageBodyWriter {

    @Override
    public boolean isWriteable(Class type, Type genericType, Annotation[] annotations, MediaType mediaType) {
        boolean ret=List.class.isAssignableFrom(type);
        return ret;
    }

    @Override
    public long getSize(List data, Class aClass, Type type, Annotation[] annotations, MediaType mediaType) {
        return 0;
    }

    @Override
    public void writeTo(List data, Class aClass, Type type, Annotation[] annotations, MediaType mediaType, MultivaluedMap multivaluedMap, OutputStream outputStream) throws IOException, WebApplicationException {
        if (data!=null && data.size()>0) {
            CsvMapper mapper = new CsvMapper();
            Object o=data.get(0);
            CsvSchema schema = mapper.schemaFor(o.getClass()).withHeader();
            mapper.writer(schema).writeValue(outputStream,data);
        }


    }

}
To use our MessageBodyWriter, it must be registered. This can achieved in several ways, depending on your JAX-RS implementation. Normally Jersey and other JAX-RS implementations are configured to scan packages and look for resources. In such cases classed marked with @Provider will be registered automatically. In other cases, the registration will have to be done manually. For example in Dropwizard, you have to manually register the Writer at startup:
 @Override
    public void run(MyConfiguration configuration,
                    Environment environment) {
      
        environment.jersey().register(new CSVMessageBodyWritter());
    }
Once registered, it becomes trivial to have a web service that outputs CSV. It's just a matter of annotating your webservice with the same media type as the MessageBodyWritter: @Produces("text/csv").
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import java.util.List;

@Path("/status")
public class StatusResource {
    @GET
    @Produces("text/csv")
    public List<Data> getData() {
        List<Data> data= service.getStatus();
        return data;
    }
}
Our data class is just a normal POJO:
public class Data {
    private String date;
    private Integer minimum;
    private Integer maximum;
    private Integer average;

    public Data() {

    }

    //Getters and setters as needed
}
The output will look like this:
average,date,maximum,minimum
90,3/1856,125,0
60,2/1856,115,16
60,4/1856,115,16

This is a very simple implementation, but should be a good starting point. It's worth nothing that CSV is inherently limited, and can't easily represent hierarchical object graphs. Therefore you might need flatten your data before exporting to CSV. If you need to ingest a CSV in a webservice, you can follow a similar approach to create a MessageBodyReader that will create an object from a CSV stream.

Tuesday, August 25, 2015

Simple Scala dependency injection with Scaldi (part 1)


Dependency Inject is a very useful pattern in most medium to larger projects. However, it's also sometimes useful in smaller projects. In smaller projects, I normally prefer to use appropriately smaller frameworks. In the case of Scala, Scaldi provides a very lightweight dependency injection mechanism.
Whenever I try to learn a new framework, I always like to learn my way up from a very simple example. The following is close to the absolute minimum example I could write to get Scaldi working. From this very simple example I will continue to add other useful features, such as testing and integration with other frameworks.

The build.sbt is very simple, we just added a single dependency:
name := "scaldi-test"

version := "1.0"

scalaVersion := "2.11.6"

libraryDependencies += "org.scaldi" %% "scaldi" % "0.5.6"
The main Scala file is shown next, and contains all of the necessary parts to get Scaldi working:
import scaldi.{Injector, Injectable, Module}

object HelloScaldi {
  def main(args: Array[String]) {

    val test=new Test;
    test.run
  }
}

class Test( )  extends Injectable {
  def run: Unit = {
    implicit  val injector:Injector = new UserModule //1
    val output:IService=inject[IService] //2
    println(output.execute("Scaldi"));
  }
}

class UserModule extends Module { //3
  bind [ITransport] to new MessageTransport //4
  bind [IService] to  new MessageService(inject[ITransport]) //5

}

trait ITransport {
  def send(s: String)
}

trait IService {
  def execute(x: String): String
}

class MessageService(transport:ITransport) extends IService {

  override def execute(x: String): String = {
    val ret="Hello " + x
    transport.send(ret)
    return  ret
  }
}

class MessageTransport() extends ITransport {
  override def send(s: String) = println("Sending message: " + s)
}




  1. In this line, we create an Injector using a new UserModule. The injector is the entry point into the DI container. The Module defines the bindings that will be using for the injection
  2. The inject method uses the implicit Injector to provide us with the object bound to trait IService.
  3. The Module provides explicit bindings.
  4. In this line, we bind ITransport to a new instance of MessageTransport
  5. The trait IService is bound to a new instance of MessageService, in which we're injecting ITransport. This allows us to keep MessageService and MessageTransport decoupled.
This simple example can be executed using sbt:
sbt run
The output will look something like this:
[info] Running HelloScaldi 
Sending message: Hello Scaldi
Hello Scaldi
[success] Total time: 2 s, completed Aug 25, 2015 7:40:23 PM

As you can see, the example is very simple, but if you want to start from the ground up, hopefully this will give you a good starting point. If you have any questions, don't hesitate to leave a comment.
You can check out the full source code here.

Wednesday, June 17, 2015

Using Spring Integration for creating scalable distributed services backed by JMS (part 2)

In part 1 of this series, we explored how Spring Integration allows us to easily leverage some of the Enterprise Integration Patterns to deploy salable services. In this part we'll take a look at one more pattern that is very helpful, the claim check pattern. This patterns allows to store parts of message, to be retrieved at a later stage.  This is illustrated on the following diagram:

In practice, this pattern allows us to avoid having to send very large messages across the JMS broker. Message brokers are designed to handle many small messages, and performance tends to degrade with larger messages. While there are ways of tweaking the different JMS brokers to work better in these kinds of workloads, the "Claim Check" pattern allows us to sidestep the issue entirely, by using a separate storage mechanism to hold parts of our messages. So how big is too big for a message? That's a very subjective matter, but good common sense will tell us that for example most large binary data should not be transferred through a JMS broker.
Luckily Spring Integration provides out of the box support for this pattern.  In fact, we can use the pattern without making any changes to our code. We do require some extra libraries, which we have added to the pom.xml file:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
    <version>${spring.version}</version>
</dependency>

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>1.4.187</version>
</dependency>

<dependency>
    <groupId>commons-dbcp</groupId>
    <artifactId>commons-dbcp</artifactId>
    <version>1.4</version>
</dependency>
We have added:

  1. The Spring Integration JDBC jar. This provides, among other functionality, a JDBC backed message store.
  2. H2, a Java embedded database. In this database we will store our messages.
  3. Apache Commons DBCP, a connection pool to manage our JDBC connections.

Now let's look at the spring-int-server.xml file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/jdbc
   http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
   http://www.springframework.org/schema/integration/jdbc
   http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
   http://activemq.apache.org/schema/core
   http://activemq.apache.org/schema/core/activemq-core.xsd
  ">


    <amq:broker id="activeMQBroker" useJmx="false" persistent="false">
        <amq:transportConnectors>
        <amq:transportConnector uri="tcp://localhost:61616" />
        </amq:transportConnectors>
    </amq:broker>

    <bean id="h2DBServer" class="org.h2.tools.Server" <!-- 1 -->
          factory-method="createTcpServer" init-method="start" destroy-method="stop">
        <constructor-arg value="-tcp,-tcpAllowOthers,-tcpPort,8043" />
    </bean>


    <bean id="serverDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <!-- 2 -->
        <property name="driverClassName" value="org.h2.Driver"/>
        <property name="url" value="jdbc:h2:tcp://localhost:8043/mem:test"/>
        <property name="username" value="sa"/>
        <property name="password" value=""/>
    </bean>

    <jdbc:initialize-database data-source="serverDataSource"> <!-- 3 -->
        <jdbc:script location="classpath:org/springframework/integration/jdbc/schema-h2.sql"/>
    </jdbc:initialize-database>

    <int-jdbc:message-store id="serverMessageStore" data-source="serverDataSource"/> <!-- 4 -->

    <bean id="orderServiceHandler" class="com.javaprocess.examples.integration.impl.OrderServiceHandler"/>

    <int:channel id="inChannel" />

    <int-jms:inbound-gateway request-channel="inChannel"
                             request-destination="amq.outbound"
                             concurrent-consumers="10"/>
    <int:chain  input-channel="inChannel"> <!-- 5 -->
        <int:service-activator ref="orderServiceHandler" method="processOrder"/>
        <int:claim-check-in message-store="serverMessageStore"/> <!-- 6 -->
    </int:chain>


</beans>
Now let's look at the changes:
  1. We have an in memory instance of an H2 database. H2 is a very useful embedded database, which means that we can run it inside of our JVM process. No need to have a separate database server for this example. Both the server and the client will connect to this instance. Obviously in production you will choose something more robust. But for this example, this setup works well. It's worth noting that the H2 is started with the configuration options needed to accept outside connections over TCP.
  2. A datasource is created. This datasource will point to the database server created in point #1.
  3. A data base initialization element. Spring Integration requires some tables to be created in the database that will be used as a message store. Since our database will reside in memory, this will be run every time the server is started up.
  4. The message store we're using is a JDBC store, using the data source defined in #2.
  5. We now introduce the chain element. The chain element allows multiple filter or transformations to be chained together. In this case we're still delegating to the orderServiceHandler bean for processing as the first step in the chain.
  6. In the second step of the chain, we check in the message. This transformed will take the message (which must be serializable ), store in the message store, and return a UUID (Universally Unique Identifier). This UUID is sent to the client.
Now let's look at the client side. The client side is very similar to server (actually a bit simpler).
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
   http://www.springframework.org/schema/integration/jdbc
   http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
  ">

    <int:channel id="requestChannel"/>

    <bean id="clientDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <!-- 1 -->
        <property name="driverClassName" value="org.h2.Driver"/>
        <property name="url" value="jdbc:h2:tcp://localhost:8043/mem:test"/>
        <property name="username" value="sa"/>
        <property name="password" value=""/>
    </bean>

    <int-jdbc:message-store id="clientMessageStore" data-source="clientDataSource" /> <!-- 2 -->

    <int:gateway id="orderService"
                 service-interface="com.javaprocess.examples.integration.interfaces.IOrderService"
                 default-request-channel="requestChannel"/>
    <int:chain input-channel="requestChannel"> <!-- 3 -->
        <int-jms:outbound-gateway request-destination="amq.outbound" extract-request-payload="true"/>
        <int:claim-check-out message-store="clientMessageStore"/>
    </int:chain>



    <bean id="app" class="com.javaprocess.examples.integration.impl.App">
        <property name="orderService" ref="orderService"/>
    </bean>

</beans>
Let's look at the changes in this file:

  1. A data source is created, using the same URL as in the server side. There's no need to start up the H2 server here, since we're using the H2 running inside the server process.
  2. A message store is created using the data source created in #1.
  3. A chain is created to handle messages in the "requestChannel". This chain will receive a UUID, and retrieve the corresponding object from the message store.
To check out the code clone the following repository: https://github.com/aolarte/spring-integration-samples.git.
git clone https://github.com/aolarte/spring-integration-samples.git
git checkout branches/part2
Once you checkout the code, you can run the server and client process using Maven (in different terminals):
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="server"

mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="client"

You will see the exact same output as in part 1. While that might seem anti-climatic, the important aspect is what is happening behind the scenes. The message is being stored in a database, and a UUID is sent across the wire.  What we have implemented is a very simple Enterprise Service Bus.  There's plenty of opportunities to use patterns like this. On the next article we will elaborate on some of these opportunities.

Tuesday, May 19, 2015

Using Spring Integration for creating scalable distributed services backed by JMS (part 1)

Spring Integration provides an implementation of the Enterprise Integration Patterns (as described in the book "Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions" by Gregor Hohpe and Bobby Woolf). While these patterns are mostly designed to integrate separate heterogeneous systems, Spring Integration allows you to use these patterns inside of your application. One useful mechanism provided by Spring Integration is an implementation of the "Request-Reply" pattern. In this pattern a command message is sent with a "ReplyTo" address. Using the "ReplyTo" address, the replier entity knows where to send the response.  This pattern is illustrated in the following diagram:


The plumbing to achieve this is somewhat complicated an tedious, however Spring Integration takes care of all of the boilerplate code, with just a few lines of XML. This provides a very powerful mechanism, where you can define your interface and use it, without any knowledge of the mechanism used to communicate with the actual implementation. Furthermore, you can change this mechanism as needed, just by changing your configuration.
In this post we're going to wire a service using a direct connection. The direct connection occurs inside the JVM, in the same thread, resulting in very little overhead. We're also going to configure that same service using a JMS queue. Using this transport mechanism, the Requestor and the Replier and decoupled, and can live in different JVMs, potentially hosted in different machines. This also allows you to scale, by adding more Replier machines, to increase the capacity.

Using Spring Integration to wire a service

In this first part, we're going to create a an Interface for a service. The implementation of this interface is not wired directly, but rather provided by Spring Integration, using the configuration provided in the spring XML. The Interface is very basic as you can see below. When using Spring Integration it's best to design simple Interfaces, with few, coarse grained methods.

package com.javaprocess.examples.integration.interfaces;

import com.javaprocess.examples.integration.pojos.Order;
import com.javaprocess.examples.integration.pojos.OrderConfirmation;

public interface IOrderService {
    OrderConfirmation placeOrder(Order order);
}
This interface is going to be backed by the following class:
package com.javaprocess.examples.integration.impl;

import com.javaprocess.examples.integration.pojos.Order;
import com.javaprocess.examples.integration.pojos.OrderConfirmation;

public class OrderServiceHandler {


        public OrderConfirmation processOrder(Order order) {
            long threadId = Thread.currentThread().getId();
            System.out.println("Got order with id " + order.getId() + " on thread: " + threadId);
            OrderConfirmation ret = new OrderConfirmation();

            ret.setThreadHandler(threadId);
            return ret;
        }
}
You might notice that OrderServiceHandler does not implement IOrderService, and in fact, the methods names do not even match. So how can we have such level of decoupling? This is where Spring Integration comes into the picture:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
  ">
    
    <bean id="orderServiceHandler" class="com.javaprocess.examples.integration.impl.OrderServiceHandler"/> <!-- 1 -->

    <int:channel id="requestChannel"/> <!-- 2 -->

    <int:service-activator input-channel="requestChannel"
    ref="orderServiceHandler" method="processOrder" /> <!-- 3 -->

    <int:gateway id="orderService"
                 service-interface="com.javaprocess.examples.integration.interfaces.IOrderService"  <!-- 4 -->
                 default-request-channel="requestChannel"/> <!-- 5 -->

    <bean id="app" class="com.javaprocess.examples.integration.impl.App">
        <property name="orderService" ref="orderService"/> <!-- 6 -->
    </bean>
    
</beans>
Let's look at the relevant part of this Spring xml configuration (spring-int.xml):

  1. We instantiate an OrderServiceHandler bean. This will execute our business logic. How you instantiate it is your choice, we declare it manually in this example for clarity.
  2. We declare a Spring Integration channel "requestChannel". This channel will send the reply back to the requestor.
  3. The Service Activator is an endpoint, which connects a channel to Spring Bean. In this case we're connecting requestChannel channel to our business logic bean.  In this xml block we also declare that the "processOrder" method will invoked for this Service Activator.
  4. The Gateway is another endpoint point, designed to expose a simple interface, while hiding all of the Spring Integration complexity. In this case we're wiring our two channels, and exposing the gateway as "orderService" implementing the IOrderService interface. This is the basic syntax for an interface with a single method, the xml get more complicated with additional methods as every method must be wired independently.
  5. We're using "requestChannel" to send out requests.
  6. The "app" bean is our test application. 
We explicitly specified a channel to send the request ("requestChannel"). So how does the JMS outgoing-gateway know where to send the reply? By default it sends it back to the requester, using a temporary anonymous channel, so Spring Integration gives us that functionality for free. If you have special needs, to can specify the reply channel. We'll explore this in future postings. 
The code for the App is shown below, and it creates 5 threads, processing an order in each thread, using the IOrderService bean provided by Spring Integration (Remember, we have no implementation of IOrderService anywhere in our source code):
package com.javaprocess.examples.integration.impl;

import com.javaprocess.examples.integration.pojos.Order;
import com.javaprocess.examples.integration.pojos.OrderConfirmation;
import com.javaprocess.examples.integration.interfaces.IOrderService;

import java.util.ArrayList;
import java.util.List;

public class App {

    public class AppWorker implements Runnable {
        public void run() {
            long threadId=Thread.currentThread().getId();
            System.out.println("Requesting order processing on thread: " + threadId);
            Order order=new Order();
            order.setId(100);
            OrderConfirmation ret= orderService.placeOrder(order);
            System.out.println("Order was requested by " + threadId+"  and by processed by thread: " + ret.getThreadHandler());
        }
    }

    private IOrderService orderService;

    public void setOrderService(IOrderService orderService) {
        this.orderService = orderService;
    }

    public void run() {
        List list=new ArrayList();
        for (int i=0;i<5 appworker="" catch="" e.printstacktrace="" e="" for="" hread="" i="" list.add="" new="" nterruptedexception="" pre="" thread.join="" thread.start="" thread:list="" thread="" try="">
The example is provided as a Maven application, and can be compiled command:
mvn compile
Once compiled, you can see it in action:
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="direct"
The argument "direct" will cause the main method to use the Spring configuration we were examining before. The output will look something like this:
Running in client mode
Requesting order processing on thread: 13
Requesting order processing on thread: 14
Requesting order processing on thread: 15
Requesting order processing on thread: 16
Requesting order processing on thread: 17
Got order with id 100 on thread: 16
Got order with id 100 on thread: 15
Got order with id 100 on thread: 17
Got order with id 100 on thread: 13
Got order with id 100 on thread: 14
Order was requested by 16  and by processed by thread: 16
Order was requested by 15  and by processed by thread: 15
Order was requested by 14  and by processed by thread: 14
Order was requested by 17  and by processed by thread: 17
Order was requested by 13  and by processed by thread: 13
So what is happening in this example?

  • Spring Integration has created a dynamic proxy that implements IOrderService.  This proxy is exposed as Spring Bean with name "orderService".  This is the bean is used from inside App.
  • The "placeOrder" method in "orderService" is connected using two channels to a method ("processOrder") in the "orderServiceHandler" bean.
  • The processing of the order is occurring in the same thread in which it is requested.
This can be seen in the following diagram:
Service connected directly using Spring Integration


Up to this point, Spring Integration seems to add very little value, and a lot of extra XML configuration. The same result could have been achieved by having OrderServiceHandler implement IOrderService and wiring the bean directly (even easier if using annotations). However the real power of Spring Integration will become evident on the next section.

Using JMS with Spring Integration

In this next section we're going to have two separate Spring configuration, one for the Requestor, and one for the Replier. They exist in two different files, and we can run them in the same JVM, or in separate JVMs.
First we will examine the file for requestor (spring-int-client.xml):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
  ">

    <int:channel id="outChannel"></int:channel> <!-- 1 -->

    <int:gateway id="orderService"
                 service-interface="com.javaprocess.examples.integration.interfaces.IOrderService"
                 default-request-channel="outChannel"/> <!-- 2 -->

    <int-jms:outbound-gateway request-channel="outChannel"
                              request-destination="amq.outbound" extract-request-payload="true"/> <!-- 3 -->

    <bean id="app" class="com.javaprocess.examples.integration.impl.App">
        <property name="orderService" ref="orderService"/> <!-- 4 -->
    </bean>

</beans>

  1. The outgoing channel is declared. This channel is used by the gateway to send its requests.
  2. The gateway is setup in a similar fashion as in the first example. You might have noticed that it lacks a setting for "default-reply-channel", this is on purpose, and will be explained shortly.
  3. The JMS outgoing gateway (notice we're using the int-jms namespace). This is the interface with the JMS broker, it will read messages from "outChannel" and send them to the "amq.outbound" JMS queue.
  4. The app is the same as used in the first example. This way we show how you wire services differently, without rewriting any code.
The counter part to requestor, is the replier (spring-int-server.xml). The file is shown below:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
   http://activemq.apache.org/schema/core
   http://activemq.apache.org/schema/core/activemq-core.xsd
  ">

    <amq:broker id="activeMQBroker" useJmx="false" persistent="false">
        <amq:transportConnectors>
        <amq:transportConnector uri="tcp://localhost:61616" />
        </amq:transportConnectors>
    </amq:broker> <!-- 1 -->

    <bean id="orderServiceHandler" class="com.javaprocess.examples.integration.impl.OrderServiceHandler"/> <!-- 2 -->

    <int:channel id="inChannel" /> <!-- 3 -->

    <int-jms:inbound-gateway request-channel="inChannel" request-destination="amq.outbound"
                             concurrent-consumers="10"/> <!-- 4 -->

    <int:service-activator input-channel="inChannel"
                           ref="orderServiceHandler" method="processOrder"/> <!-- 5 -->

</beans>
  1. A broker is configured to run in-process, on the JVM hosting the replier or server component. This is suitable for a proof of concept such as this. In production you will want to run a stand alone and properly tuned Broker.
  2. This channel will be used to send messages received from JMS, to the service activator
  3. A JMS inbound gateway will read messages from the Queue. Of special interest in here is the number of concurrent consumers. This can be tweaked to set the number of threads that will process incoming messages.
  4. The service activator is configured to listen on the "inChannel". Similar to the way we defined Gateway, we have omitted the output-channel parameter.

You can run both the requestor and the replier in the same JVM, by using this command:
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="client server"
Running in this fashion will result in an input similar to this:
Running in client mode
Requesting order processing on thread: 33
Requesting order processing on thread: 34
Requesting order processing on thread: 35
Requesting order processing on thread: 36
Requesting order processing on thread: 37
Got order with id 100 on thread: 19
Got order with id 100 on thread: 24
Got order with id 100 on thread: 26
Got order with id 100 on thread: 18
Got order with id 100 on thread: 25
Order was requested by 37  and by processed by thread: 26
Order was requested by 36  and by processed by thread: 18
Order was requested by 34  and by processed by thread: 25
Order was requested by 33  and by processed by thread: 19
Order was requested by 35  and by processed by thread: 24
Closing context
Notice how the the thread that made the request is different from the thread the replied to the request.
We can take this a step further and see it run in two different JVMs. First start the server. This is necessary since it also starts the JMS broker that the client will connect to. Invoked in this fashion, the server will run until manually terminated, fulfilling any requests put in the queue.
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="server"
Once the server has started, open a new terminal or command prompt window, and start the client:
mvn exec:java -Dexec.mainClass="com.javaprocess.examples.integration.main.Main" -Dexec.args="client"
You will see the client making requests:
Running in client mode
Requesting order processing on thread: 13
Requesting order processing on thread: 14
Requesting order processing on thread: 15
Requesting order processing on thread: 16
Requesting order processing on thread: 17
Order was requested by 15  and by processed by thread: 21
Order was requested by 16  and by processed by thread: 24
Order was requested by 14  and by processed by thread: 20
Order was requested by 13  and by processed by thread: 17
Order was requested by 17  and by processed by thread: 19
Closing context
You will also see the server fulfilling these requests:
Running in server mode
Got order with id 100 on thread: 19
Got order with id 100 on thread: 17
Got order with id 100 on thread: 20
Got order with id 100 on thread: 24
Got order with id 100 on thread: 21
If you start more than one client working at a time, you will see the server replying to all the incoming requests.

So how is this working?

On the other hand we define a JMS Queue as the destination to send the request, but how does the replier know which channel to use to send the respond? The outgoing-gateway will create a temporary channel, and sets "ReplyTo" JMS header to this temporary channel. This can conceptualized in the following diagram:
JMS backed service

Conclusion

So why should you try to use something like this? We'll point out some general advantages, but as usual, remember that the best solution depends on your particular needs.

  • This approach provides a deep decoupling, allowing you to have servers in the back servicing requests. These servers can be even added on demand.
  • The scalability of Messaging based service buses tends to be better than other approaches like load balances, specially under heavy load. If using a load balancer, a single request that takes too long could cause a server to drop off the load balancer. With messaging, the available servers will continue to service requests as fast as possible.
  • Spring Integration provides a great deal of extra functionality out of the box, such as the "Claim Check" pattern. We'll examine some of these possibilities in future posts.  Other remoting frameworks (such as plain Spring remoting) do not provide this wealth of extra functionality.
  • This approach provides a huge deal of flexibility. Can even wire different methods in single interface using different transports (JMS, direct connection, REST web services).
  • The configuration can be easily changed by modifying the XML files. These files can even be located in files outside of the application. This allows precise tuning, without having to change the application code, recompile, or repackage. In this respect, I favor using the XML configuration as opposed to Java configuration.

Source code

You can find the source code for this example in github
To check out the code clone the following repository: https://github.com/aolarte/spring-integration-samples.git.
git clone https://github.com/aolarte/spring-integration-samples.git
git checkout branches/part1