Spring Integration is a quite useful framework which provides asynchronous messaging capabilities to a system. Unlike traditional frameworks like JMS, this framework can be plugged into any application. I will not be going into the details of messaging and 'How messaging works!' but an overview should be a good start.
Messaging is a mechanism to invoke some actions asynchronously. Message passing helps to de-couple the logic from infrastructure and can be applied to almost every system. Sending across a communication to a system or more appropriately a message handler is called messaging.
There are two types of message handling implementations in Spring Integration framework - Queue channel based and Direct channel. In case of Queue based message handling, a pollable queue is placed after each message generator and one or more pollers are attached to that queue for handling and processing messages whereas in case of direct message handling, our message handler is attached to the direct channel which gets called as soon as the message is published to the channel.
Spring Integration framework can also be used to send data across the network. Think of a scenario where a file needs to be sent to a receiver and there is no limit to the size of that file. The only possible solution to reliably and quickly transmit that file over to the other end could be splitting it into number of messages and sending them across. Spring integration framework facilitates message transmission functionality by providing the necessary API to transmit Messages over the network in a reliable manner,
There are two base classes provided in spring integration framework to achieve this task - AbstractInternetProtocolSendingMessageHandler and AbstractInternetProtocolReceivingChannelAdapter. For the details of API, one must refer to the package summary provided.
How to use it??
To add a simple TCP message handler/ TCP message receiver, the following snippets can be added to the applicationContext.xml file:
<ip:inbound-channel-adapter protocol="tcp/udp" channel="outputChannel" close="false" custom-socket-reader-class-name="CustomClass" id="inbound" local-address="localIpAddressForMultiHomeServers" message-format="length-header" pool-size="noOfThreads" port="portNumber" receive-buffer-size="bufferSize" using-direct-buffers="true/false" using-nio="true/false" />
<ip:outbound-channel-adapter protocol="tcp/udp" channel="inputChannel" custom-socket-writer-class-name="CustomClass" host="hostname" id="outbound" local-address="localIpAddressForMultiHomeServers" message-format="length-header" port="portnumber" using-direct-buffers="true/false" using-nio="true/false"/>
Description of tags
An inbound channel adapter is something that is used to receive the data from some sender and convert it to a Message object which would then be sent to the OutputChannel for further processing. It is an instance of AbstractInternetProtocolReceivingChannelAdapter's implementation depending upon the protocol specified.
On the contrary, an outbound channel adapter is supposed to send whatever is present on the InputChannel to some listener identified by hostname:portNumber. This is an instance of AbstractInternetProtocolSendingMessageHandler's implementation.
Besides, each handler-adapter pair implementation also uses an underlying SocketWriter-SocketReader pair as well.
There are different message_formats via which, one can convert the Message object to byte data, namely :-
- FORMAT_LENGTH_HEADER - Sends message length in header first and then the byte message.
- FORMAT_STX_ETX - Sends a special bit pattern STX, then data followed by ETX bit pattern.
- FORMAT_CRLF - Starts sending a message and when its complete, sends a CR_LF bit pattern.
- FORMAT_JAVA_SERIALIZED - Sends data after serializing the object using ObjectOutputStream.
- FORMAT_CUSTOM - Left as an exercise for the reader .
Hint: You can add your own implementation of NetSocketWriter/NetSocketReader class in case you want to use FORMAT_CUSTOM.
In case someone doesn't know the destination beforehand, or maybe the port to listen to on the local machine, objects of above mentioned API can be created and parameters can be set dynamically. Finally, these objects can be registered to a ConfigurableApplicationContext for making them running.
These classes have four implementation classes each, two for TCP and two for UDP protocols. For TCP the two classes are segregated as:
1. TcpNetSendingMessageHandler and TcpNetReceivingChannelAdapter pair - This pair works over TCP simple Socket based implementation and does the usual socket stuff like blocking reads over streams, handling EOF when stream is closed. Both the classes use a SocketFactory class to create an instance of NetSocketWriter and a NetSocketReader respectively which follow the same protocol. You can choose to implement custom logic for sending data by adding a custom implementation to each of the NetSocketWriter and NetSocketReader classes and implement the methods writeCustomFormat and assembleDataCustomFormat respectively.
2. TcpNioSendingMessageHandler and TcpNioReceivingChannelAdapter pair - This pair works almost the same way as the above mentioned apart from the fact that this pair uses NioSocketWriter and NioSocketReader pair. It does have an additional capability to use direct buffers for channels which enhances the speed many folds. Although the NIO based IO is a little slow, its never blocking. So there might be some performance concerns on the system as well as it will always be executing code to read from the streams even though there is nothing present. Please note that FORMAT_JAVA_SERIALIZED is not supported by this kind of pair. Rest all formats work fine. Also, the hint in this implementation changes to NioSocketWriter/NioSocketReader pair.
For UDP, the pairs are not used much because most of the network firewalls have security issues using UDP protocol. Moreover its not a reliable protocol anyways. The pairs for sending messages over UDP are UnicastSendingMessageHandler-UnicastReceivingChannelAdapter and MulticastSendingMessageHandler-MulticastReceivingChannelAdapter. This system is an ACK driven setup and relies on the successful reception of ACK message from the receiver. Reliability can be added through code by enforcing a check for successful reception of ACK message else the message goes to error handler in case the TTL(Time To Live) is expired.
The UDP attributes like multicast and multicast-address have not been added to the tags. One more attribute is there in case UDP is used which is check-length. It enforces a check for length of the message. Also, in case of outbound-adapter, ack message related attributes like acknowledge, ack-host, ack-port, ack-timeout can also be added to the tag.
Tips and tricks:
There were some scenarios where certain tweaks were really helpful. I would like to list down some which could be encountered most of the times when someone uses this framework.
- You can build your own mechanism to perform cleanup like closing of the socket after all the messages have been sent. This helps to transfer messages faster as each time making/breaking a connection takes CPU and time. To do this, a custom implementation of SocketWriter needs to be created and put into the object of your AbstractInternetProtocolSendingMessageHandler's implementation which will hold the logic of closing the connection after sending all the data. By default the connection always remains open.
- While serializing an object through FORMAT_JAVA_SERIALIZED, please make sure that you don't reuse the existing object by just changing the state of that object. Java will not serialize new bytes for that object as there is a separate API called ObjectOutputStream.writeUnshared for performing that task. Making your classes immutable should help a lot here.
- The framework will not send the MessageHeaders set on the message to the other end. So, some custom logic needs to be built in case headers are needed on the receiver. Simple trick to do that will be just make a wrapper class which contains both, message and header objects and send that wrapper to the other side.
- While using ConfigurableApplicationContext for building the logic, you will have to add a TaskScheduler object to the adapter. That object is actually never used. Instead, the Adapter builds its own ThreadPoolTaskScheduler and its pool size can be set using TcpNetReceivingChannelAdapter.setPoolSize. That means in case you set a ThreadPoolTaskScheduler as the TaskScheduler in TcpNetReceivingChannelAdapter, don't expect its pool size to work.
- Sending data across network is very common in distributed systems. Having a framework which takes care of almost all of the sender/receiver things helps a lot.
- Spring is always keen to help the developers reduce boiler plate code so using it in any project is always a fair deal.
- Spring Integration is a lightweight and standalone implementation of messaging just like JMS on most of the enterprise servers. It can be used with or without a server as it only needs a couple of jars.