I have started to create a server Process that sends a status update message that is consumed by remote clients via an activeMQ VirtualTopic.
* note in the code examples, many comments and log statements have been removed for clarity here, but may bee seen in the log stack trace sections.
What I want to be able to do, is verify that I can get desired messages to multiple clients, and run various assertions. I thought to so this with a wiretap via AOP into a client Mock. This is what I was going toward
Server
The server was fairly simple, where I wanted to have a service that sends a Map message to a VirtualTopic:
@EndpointInject(uri="seda:pushStatusUpdate") ProducerTemplate producer;
public boolean sendStatusUpdate(String body) { log.info("sendStatusUpdate: " + body); Map<String, Object> gameState = new HashMap<String, Object>(); gameState.put("tableId", "123"); gameState.put("player1", "player-status"); producer.sendBodyAndHeader("seda:pushStatusUpdate", gameState, "tableId", "123"); return true; }
I then setup the following server side route to direct messages to my VirtualTopic
@Override public void configure() throws Exception { // add tracer as an interceptor so it will log the exchange executions at runtime // this can aid us to understand/see how the exchanges is routed etc. getContext().addInterceptStrategy(new Tracer()); from("seda:pushStatusUpdate") .process(new VirtualTopicAuditor()) .to("activemq:topic:VirtualTopic.Table.1"); }
Now I run the server via the Maven Camel Run Plugin in my pom.xml as follows:
<plugin> <groupId>org.apache.camel</groupId> <artifactId>camel-maven-plugin</artifactId> <version>${camel.version}</version> <configuration> <fileApplicationContextUri>src/test/resources/camel-server-test.xml</fileApplicationContextUri> </configuration> </plugin>
* NOTE: The default way to use the camel plugin is to us:
<strong></strong>META-INF/spring/*.xml;YOUR_FILE_NAME_IN_THE_CLASS_PATH.xml
However, I wanted to use a separate server for testing, and wanted to add it to src/test/resources, thus I have to use the <fileApplicationContextUri> tag instead.
Then by opening a separate command prompt, then running mvn clean compile camel:run -e as follows:
[cmd image here]…
JUnit Test
What I wanted to do was to create a unit test that would prove I could multiple consumers subscribing to my VirtualTopic, and validate that message that each client gets.
@ContextConfiguration(locations = { "classpath:applicationContext-test.xml" }) public class TableServiceTest extends AbstractJUnit4SpringContextTests { @Autowired protected SpringCamelContext camel; @EndpointInject(uri = "mock://resultClient1") protected MockEndpoint resultClient1; @EndpointInject(uri = "mock://resultClient2") ... *up to 11 clients here * @Before public void initTests(){ resultClient1.reset(); resultClient2.reset(); ... *up to 11 clients here * } @Test public void testSendBetMessage() throws Exception { resultClient1.expectedMessageCount(1); resultClient2.expectedMessageCount(1); ... *up to 11 clients here * // Send the test message to make Server Service create our Status Message producerTemplate.sendBody("jms:queue:bets", ExchangePattern.InOnly, 22); // now lets assert that the mock endpoint received messages resultClient1.assertIsSatisfied(); resultClient2.assertIsSatisfied(); ... *up to 11 clients here * }
So what I am trying to do, is determine if each client gets the appropriate number of messages.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:broker="http://activemq.apache.org/schema/core" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd "> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:test.properties</value> </list> </property> </bean> <bean id="clientRoutes" class="com.wiredducks.test.routes.ClientRoutes" /> <!-- Camel JMSProducer to be able to send messages to a remote Active MQ server --> <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="transactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="connectionFactory" /> </bean> <import resource="classpath:camel-client.xml" /> <!-- we need to create test clients to validate multiple recipients will get the status update message. --> <import resource="classpath:clients/test-client1.xml" /> <import resource="classpath:clients/test-client2.xml" /> ... <import resource="classpath:clients/test-changeDestination.xml" /> </beans>
This is the camel-client.xml for this test
<context:component-scan base-package="com.wiredducks.routes.test" /> <camel:camelContext id="camel"> <camel:package>com.wiredducks.routes.test</camel:package> <camel:consumerTemplate id="consumer" /> <camel:routeBuilder ref="clientRoutes" /> </camel:camelContext> <bean id="clientRoutes"/> <camel:template id="producer" /> <bean id="jms"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="connectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="transactionManager"> <property name="connectionFactory" ref="connectionFactory" /> </bean> <import resource="clients/test-client1.xml" /> <import resource="clients/test-client2.xml" /> ... *up to 11 clients here *
Then I setup a client side route for my tests.
@Override public void configure() throws Exception { from("seda://resultClient1") .process(new ClientRouteAuditor()) .to("mock://resultClient1"); from("seda:resultClient2").to("mock:resultClient2"); ... *up to 11 clients here * } ... class ClientRouteAuditor implements Processor { public void process(Exchange exchange) throws Exception { DefaultMessage message = (DefaultMessage) exchange.getIn(); log.info("//--- start ClientRouteAuditor ---//"); log.info("tableId " + message.getHeader("tableId")); log.info(message.getBody()); log.info("//--- end ClientRouteAuditor ---//"); } }
Camel Client (s)
I wanted to simulate multiple clients
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd "> <jms:listener-container client-id="jmsContainer1" transaction-manager="transactionManager"> <jms:listener id="jmsListener1" destination="Consumer.1.VirtualTopic.Table.1" ref="testClient1" method="onMessage" /> </jms:listener-container> <bean id="testClient1" class="com.wiredducks.client.StatusUpdateService" scope="prototype"> <property name="consumerId" value="1" /> <property name="tableId" value="1" /> </bean> <!-- Aspect that tracks all the invocations of the business service --> <bean id="messageDrivenMockWiretapClient1" class="com.baselogic.test.MessageDrivenMockWiretap"> <property name="destinationEndpoint" ref="resultClient1" /> </bean> <aop:config> <aop:pointcut id="onMessageCall" expression="execution(* com.wiredducks.client.StatusUpdateService.onMessage(..)) &amp;&amp; args(message)" /> <aop:aspect id="aspectMessageDrivenMockWiretap" ref="messageDrivenMockWiretapClient1"> <aop:before method="tap" pointcut-ref="onMessageCall" /> </aop:aspect> </aop:config> </beans>
…
Message Driven Pojo (MDP)
I wanted to create an MDP on each client so that each remote client could subscribe to my VirtualTopic.
public class StatusUpdateService implements MessageListener { ... public void onMessage(Message message) { log.info("//----- StatusUpdateService.onMessage -----------------------------//"); log.info("//----- " + consumerId + " -----------------------------//"); if(message instanceof ActiveMQMapMessage){ log.info("//----- ActiveMQMapMessage -------------------//"); ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message; log.info(mapMessage.toString()); // Set Map via Spring. } else if(message instanceof ActiveMQTextMessage){ log.info("//----- ActiveMQTextMessage -------------------//"); ActiveMQTextMessage mapMessage = (ActiveMQTextMessage) message; log.info(mapMessage.toString()); } }
here…
Message Driven Wiretap Audit (AOP)
Now that I have multiple MPD’s consuming from a Virtual topic, I wanted to be able to perform more unit test functions with the Camel Mock framework. In the above camel-client.xml, there was an AOP declaration for a before point cut:
<!-- Aspect that tracks all the invocations of the business service --> <bean id="messageDrivenMockWiretapClient1"> <property name="destinationEndpoint"> <camel:endpoint uri="seda:resultClient1"/> </property> </bean> <aop:config> <aop:pointcut id="onMessageCall" expression="<strong>execution(* com.wiredducks.service.impl.StatusUpdateService.onMessage(..)) &amp;&amp; args(message)</strong>"/> <aop:aspect id="aspectMessageDrivenMockWiretap" ref="messageDrivenMockWiretapClient1"> <aop:before method="tap" pointcut-ref="onMessageCall"/> </aop:aspect> </aop:config>
This is the Audit bean, the is only suppose to forward additional message to the set destination. In my case, to a mock endpoint.
@Aspect public class MessageDrivenMockWiretap { ... private Endpoint destinationEndpoint; @Required public void setDestinationEndpoint(Endpoint destinationEndpoint) { this.destinationEndpoint = destinationEndpoint; } public void tap(Object message) { if(message instanceof ActiveMQMapMessage){ log.info("//----- ActiveMQMapMessage -------------------//"); ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message; log.info(mapMessage.toString()); } else if(message instanceof ActiveMQTextMessage){ log.info("//----- ActiveMQTextMessage -------------------//"); ActiveMQTextMessage mapMessage = (ActiveMQTextMessage) message; log.info(mapMessage.toString()); } else{ log.info("//----- Other Message -------------------//"); } String msg = "MessageDrivenMockWiretap: " + message; // now send the message to the backup store using the Camel Message Endpoint pattern Exchange exchange = destinationEndpoint.createExchange(); exchange.getIn().setBody(msg); try{ destinationEndpoint.createProducer().process(exchange); }catch(Exception e){ e.printStackTrace(); } }// tap
[more…]
Running the Unit Tests
Now I open another command prompt to run the Unit Tests using mvn clean verify -e to run the unit tests. Now at this time, there should be a client MDP connecting to a VirtualTopic, and with AOP, a wiretap sends the message to a SEDA queue, then to a Mock for Testing.
On the command window with the server side Camel running. I see the Message getting created, and sent to the VirtualTopic:
INFO: //--- SendStatusUpdate.process() ---// Sep 6, 2009 11:39:55 AM org.apache.camel.processor.Logger process INFO: ID:mick-knutsons-macbook.local-63225-1252262382326-0:73:1:1:2 >>> com.wiredducks.processors.SendStatusUpdate@4064397c --> ref:tableService method: sendStatusUpdate, Pattern:InOnly, Headers:{JMSRedelivered=false, JMSXGroupID=null, JMSPriority=4, JMSExpiration=0, JMSDestination=queue://sendStatusUpdate, JMSCorrelationID=null, JMSDeliveryMode=2, JMSReplyTo=null, JMSMessageID=ID:mick-knutsons-macbook.local-63225-1252262382326-0:73:1:1:2, JMSType=null, JMSTimestamp=1252262395046}, BodyType:Integer, Body:1 Sep 6, 2009 11:39:55 AM com.wiredducks.service.impl.TableServiceImpl sendStatusUpdate INFO: //----------------------------------------------------------------// Sep 6, 2009 11:39:55 AM com.wiredducks.service.impl.TableServiceImpl sendStatusUpdate INFO: sendStatusUpdate Message: 1 Sep 6, 2009 11:39:55 AM com.wiredducks.service.impl.TableServiceImpl sendStatusUpdate INFO: //----- Messge Sent ----------------------------------------------// Sep 6, 2009 11:39:55 AM org.apache.camel.processor.Logger process INFO: ID-mick-knutsons-macbook-local-60727-1252260656775-0-39 >>> from(seda://pushStatusUpdate) --> log://com.wiredducks.processors.VirtualTopicAuditor?level=DEBUG, Pattern:InOnly, Headers:{tableId=1}, BodyType:java.util.HashMap, Body:{tableId=1, player1=raise} Sep 6, 2009 11:39:55 AM org.apache.camel.processor.Logger process INFO: ID-mick-knutsons-macbook-local-60727-1252260656775-0-39 >>> log://com.wiredducks.processors.VirtualTopicAuditor?level=DEBUG --> com.wiredducks.processors.VirtualTopicAuditor@603bb54b, Pattern:InOnly, Headers:{tableId=1}, BodyType:java.util.HashMap, Body:{tableId=1, player1=raise} Sep 6, 2009 11:39:55 AM com.wiredducks.processors.VirtualTopicAuditor process INFO: //--- VirtualTopicAuditor ---// Sep 6, 2009 11:39:55 AM com.wiredducks.processors.VirtualTopicAuditor process INFO: tableId 1 Sep 6, 2009 11:39:55 AM com.wiredducks.processors.VirtualTopicAuditor process INFO: {tableId=1, player1=raise} Sep 6, 2009 11:39:55 AM com.wiredducks.processors.VirtualTopicAuditor process INFO: //--- VirtualTopicAuditor ---//
I noticed that the MDP gets called with the correct Map Message, as well as the MessageDrivenWiretap which leads me to conclude that the wiretap works from the MDP to the Processor I created.
252260655336-2:12:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1, destination = topic://VirtualTopic.Table.1, transactionId = null, expiration = 0, timestamp = 1252262710167, arrival = 0, brokerInTime = 1252262710169, brokerOutTime = 1252262710189, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5c62f8df, marshalledProperties = org.apache.activemq.util.ByteSequence@65c127db, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {tableId=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} } MessageDriveMockWiretap Message ActiveMQMapMessage {commandId = 7, responseRequired = true, messageId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1, destination = topic://VirtualTopic.Table.1, transactionId = null, expiration = 0, timestamp = 1252262710167, arrival = 0, brokerInTime = 1252262710169, brokerOutTime = 1252262710189, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5c62f8df, marshalledProperties = org.apache.activemq.util.ByteSequence@65c127db, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {tableId=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} } [ jmsListener1-1] StatusUpdateService INFO //----- StatusUpdateService.onMessage -----------------------------// [ jmsListener1-1] StatusUpdateService INFO //----- 1 -----------------------------// [ jmsListener1-1] StatusUpdateService INFO //----- ActiveMQMapMessage -------------------// [ jmsListener1-1] StatusUpdateService INFO ActiveMQMapMessage {commandId = 6, responseRequired = true, messageId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1, destination = topic://VirtualTopic.Table.1, transactionId = null, expiration = 0, timestamp = 1252262710157, arrival = 0, brokerInTime = 1252262710157, brokerOutTime = 1252262710167, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@69017455, marshalledProperties = org.apache.activemq.util.ByteSequence@5e7cae4e, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {tableId=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} } [cp://localhost/127.0.0.1:61616] WireFormatNegotiator DEBUG tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false} [ jmsContainer4-1] MessageDrivenMockWiretap INFO //>>>>> tap(Object message) >>>>>>>>>>>>>>>>>>>>>>>>>>>>// [ jmsContainer4-1] MessageDrivenMockWiretap INFO //----- ActiveMQMapMessage -------------------// MessageDriveMockWiretap Message ActiveMQMapMessage {commandId = 7, responseRequired = true, messageId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1, destination = <strong>topic://VirtualTopic.Table.1</strong>, transactionId = null, expiration = 0, timestamp = 1252262710167, arrival = 0, brokerInTime = 1252262710169, brokerOutTime = 1252262710178, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@15de4ebf, marshalledProperties = org.apache.activemq.util.ByteSequence@3603e8d0, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {tableId=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} }
Then the message seems like it would get to the mock://resultClient1 at this point. but in my unit test, this is not the case.
Current Issue with the above approach
The current issue is random at least to my eyes. It appears, that sometimes, I do not get any message sent to the mock endpoint.
testSendBetMessage(com.wiredducks.service.test.TableServiceTest) Time elapsed: 20.016 sec <<< FAILURE! java.lang.AssertionError: mock://resultClient1 Received message count. Expected: <strong><1></strong> but was: <strong><0></strong>
Thus, we have already gotten the message, we just need to forward the message to the Mock.
This does work sometimes, but there is no pattern to make it work
Unresolved issues
There are a few issues I still have left to solve in this solution.
- Changing MDP destination dynamically, and testing I can receive a message on the new Destination.
Conclusion
Conclusion to ensue…
[more…]<span class="code-tag"><applicationContextUri></span>META-INF/spring/*.xml;YOUR_FILE_NAME_IN_THE_CLASS_PATH.xml<span class="code-tag"></applicationContextUri></span>
Recent Comments