Wednesday, November 12, 2014

Messaging Infrastructure using ActiveMQ


What is rhq-msg?

Before I get into the news about the new rhq-msg repository, let me first take a step back and summarize what rhq-msg is and what I'm trying to accomplish with it.

rhq-msg is a simple messaging API built on top of ActiveMQ and JMS. However, I wanted to isolate the user of the API from as much ActiveMQ and JMS specific classes and code as possible. I wanted a simpler API that provides basic messaging functionality without requiring the user from having to create and manage lots of little specific classes (like JMS Connections, Sessions, Destinations, Consumers, Producers, etc) and without having to know very many specific ActiveMQ details.

I also wanted to ensure that non-Java clients and servers can interact with rhq-msg clients and servers. So the messages that are sent and received from the rhq-msg API are JSON-encoded and thus can be sent to and received by other non-Java endpoints so long as they can handle JSON-encoded messages (ActiveMQ supports non-Java clients, I just wanted to make sure the messages rhq-msg handles can easily flow to/from those non-Java clients as well).

So the point is I have small, simple, easier-to-use messaging API that can talk to non-Java endpoints, but yet still retain all the nice functionality (like guaranteed delivery and things like that) that JMS and ActiveMQ provides.

New Repository for rhq-msg

My past few blog posts were about the prototyping work I'm doing with respect to rhq-msg and rhq-audit.

I just split out rhq-msg and put it in its own rhq-msg repository since it really does belong as a separate project.

I also added a nifty feature to it - you can now use it for request-response workflows. Before, the rhq-msg API really only supported fire-and-forget async messaging. You had a message and you sent it to an endpoint asynchronously and that was that. The problem is sometimes you want a response back, and many times you want to wait for that response to come back in an RPC-like fashion (as opposed to accepting the response asynchrously). I call this a request-response workflow.

Well, the API now supports this. In fact, it supports receiving the response both synchronously (via a Java Future implementation) and asynchronously (via a message listener implementation). More on this below.

A Quick Overview of the API

The main purpose of rhq-msg is to provide a simpler API to do messaging (simpler than, say, JMS). If I can't describe how to send and receive messages via rhq-msg in a couple paragraphs of a blog, then I think I failed :) So let me give a quick overview of what the API calls would look like if you want to send and receive messages. I will cover sending fire-and-forget messages, request-response messaging, and listening for incoming messages.

The main rhq-msg API is found in the rhq-msg-common module.

* Common Code For Both Producers and Consumers

Before I talk about how to send and receive messages, let me introduce the few classes both senders (aka producers) and receivers (aka consumers) use.

Each message that flows through rhq-msg is a BasicMessage. You can subclass that to create your own message types, but all messages must derive from BasicMessage. BasicMessage provides the JSON functionality needed to serialize the messages over the wire.

ConnectionContextFactory connects to your rhq-msg broker and creates contexts for both producers and consumers that are then passed to the MessageProcessor so it knows where to send/receive its messages from.

MessageProcessor provides the API to send and listen for messages.

OK, with that out of the way, let's talk about how to send and listen for messages. For these examples, let's assume you have a broker running on the local machine (127.0.0.1) listening to port 17173, you have a BasicMessage you want to send (or you want to listen for a BasicMessage) and the messages are to be found on a queue named "Foo".

* Sending Fire-and-Forget Messages

Fire-and-forget means you send a message off to an endpoint and forget about it - you don't need or expect a response back. You assume the message broker will deliver the message and the recipient will process the message properly.

   Endpoint endpoint = new Endpoint(Endpoint.Type.QUEUE, "Foo");
   ConnectionContextFactory factory = new ConnectionContextFactory("tcp://localhost:17173");
   ProducerConnectionContext context = factory.createProducerConnectionContext(endpoint);
   MessageProcessor processor = new MessageProcessor();
   processor.send(context, basicMessage);
   factory.close();

Here you create your factory so it connects to your broker. Using the factory, create a producer context. With that new producer context and your basic message, just tell a message processor to send it. That's it. Message is away. You can keep your factory around to create additional contexts which share the connection. But once you are done sending messages, close the factory so it cleans up all resources and closes its connection to the broker. Note that every class you see is a rhq-msg object. No ActiveMQ or JMS classes are used to code this up (though, obviously, under the covers, ActiveMQ and JMS is used heavily).

* Sending Request-Response Messages

Many times when you send a message request, you want a response back. In this example I will show how you can have an RPC-like request-response workflow. I will assume that the remote endpoint will process my BasicMessage and send back to me its own BasicMessage response. Note that, as I mentioned earlier, I could use custom message types (I don't have to use BasicMessage) but those custom message types must always derive from BasicMessage.

   Endpoint endpoint = new Endpoint(Endpoint.Type.QUEUE, "Foo");
   ConnectionContextFactory factory = new ConnectionContextFactory("tcp://localhost:17173");
   ProducerConnectionContext context = factory.createProducerConnectionContext(endpoint);
   MessageProcessor processor = new MessageProcessor();
   Future<BasicMessage> future = processor.sendRPC(context, basicMessage, BasicMessage.class);
   BasicMessage response = future.get(30, TimeUnit.SECONDS);

The first several lines are identical with the fire-and-forget example above. The difference starts with the API call made on the processor - here you call sendRPC() passing in the same context and your outgoing basicMessage request object as before, but you also now pass in the message type of the expected response message. In return, you get a Future object which you can use to retrieve the response when it is received. In this example, it blocks for 30 seconds waiting for the response.

There is another API I won't talk about in detail here, but suffice it to say there is another request-response API you can call on the MessageProcessor (as opposed to sendRPC()) and that is "sendAndListen()". sendAndListen() also allows you to send a request and listen for a response, but this API allows you to give your own message listener so it can wait for and receive the response, rather than go through a Future object. It seems more intuitive and easier to use Future, but in case you want to write your own listener object and listen for the response that way, this is doable. I'll explain the listener API below - it would be the same thing you would need to pass to sendAndListen().

* Listening for Incoming Messages

This last example shows how you implement consumers via rhq-msg API. You implement these via listeners. These listeners are your server-side code - they listen for and accept incoming messages from producers and process those messages. There are two general types: those that do not send responses back, and those that do. I'll cover both in that order.

To listen for "fire-and-forget" messages (that is, messages that are sent that do not require a response sent back to the sender) you implement a BasicMessageListener and hand that listener off to the message processor:

   Endpoint endpoint = new Endpoint(Endpoint.Type.QUEUE, "Foo");
   ConnectionContextFactory factory = new ConnectionContextFactory("tcp://localhost:17173");
   ConsumerConnectionContext context = factory.createConsumerConnectionContext(endpoint);
   MessageProcessor processor = new MessageProcessor();
   processor.listen(context, listener);

where "listener" is a subclass implementation of BasicMessageListener. An example is:

   public class MyCustomListener extends BasicMessageListener<BasicMessage> {
       @Override
       protected void onBasicMessage(BasicMessage receivedMessage) {
          // Process the received message.
       }
   }

Notice that we still create a ConnectionContextFactory (because we still need a connection to the message broker) but we ask that factory to create for us a consumer context this time. We call "listen()" on the message processor, passing to it that consumer context and our custom listener. That listener is now listening for messages to arrive on the "Foo" queue and will process them.

What about request-response processing? If your listener needs to send data back to the sender, it needs to implement a RPCBasicMessageListener. Other than that difference, the main code is still the same:

   Endpoint endpoint = new Endpoint(Endpoint.Type.QUEUE, "Foo");
   ConnectionContextFactory factory = new ConnectionContextFactory("tcp://localhost:17173");
   ConsumerConnectionContext context = factory.createConsumerConnectionContext(endpoint);
   MessageProcessor processor = new MessageProcessor();
   processor.listen(context, listener);

but this time "listener" is a subclass implementation of RPCBasicMessageListener. An example is:

   public class MyCustomRPCListener extends RPCBasicMessageListener<BasicMessage, AnotherMessage> {
       @Override
       protected AnotherMessage onBasicMessage(BasicMessage receivedMessage) {
          // Process the received message.
          AnotherMessage response = ...create your response message object...;
          return response;
       }
   }

Note the onBasicMessage() method now can return a non-void type - the response message type. This return type (the specific response message type) is declared in the generic type definition found in your class definition. The first generic type is that of the expected incoming message type, the second generic type is that of the response message type.

How to Build rhq-msg?

rhq-msg is composed of a set of Maven modules. From the parent root module directory, just run "mvn install" and everything will build, tests will run, and artifacts will be packaged.

There are also Eclipse project files that allow you to work with rhq-msg via M2E (the Eclipse Maven plugin).

How to Run a rhq-msg Broker

If you want to run your own code that utilizes the rhq-msg API, you will need to run a rhq-msg broker (which is really just an ActiveMQ broker). You can run a rhq-msg broker easily in a couple ways. You can run the EmbeddedBroker from the Maven command line or you can install the EmbeddedBroker in a WildFly 8 installation. This EmbeddedBroker code is found in the rhq-msg-broker Maven module.

* Running EmbeddedBroker from the command line

Starting a broker as a standalone process is relatively painless. You can use Maven to do it:

   mvn -Prun-test-broker install

Right now the rhq-msg broker is not packaged with all its dependencies, but if you do it yourself (that is, get all the dependency jars and start Java with the appropriate classpath) you can run straight from the Java command line:

   java -cp <the-appropriate-classpath> -jar rhq-msg-broker-0.1.jar -c broker-config.xml

where "broker-config.xml" is an actual ActiveMQ XML configuration file (you can also pass in a simpler ActiveMQ .properties configuration file if you wish). Examples are test-broker.xml and test-broker.properties.

* Installing EmbeddedBroker in WildFly 8

One of the artifacts within rhq-msg is a WildFly Extension Module that provides a broker. Once installed in WildFly, you will have an rhq-msg broker subsystem running within WildFly itself. It is a handy way to have your own broker running in your own WildFly instance. So this means you can have, for example, an rhq-msg client within a web application running in WildFly and provide your own broker for that client running in the same WildFly instance. Technically, this is useful for any ActiveMQ or JMS client - since this broker is nothing more than an ActiveMQ broker and can serve any client from anywhere, not just those using the rhq-msg API.

First, download and unzip the WildFly 8 app server (I have not tested on WildFly 9+). I'll use <wildfly-install-dir> to indicate where you installed WildFly.

Now install the custom rhq-msg broker WildFly Extension Module into WildFly. You can use the new Maven plugin "wildfly-extension-maven-plugin" to do this - the rhq-msg-broker-wf-extension Maven module has integrated that Maven plugin. I talk about this in a previous blog post. So, simply running this Maven command will install your rhq-msg broker WildFly Extension Module for you:

   mvn -Dorg.rhq.msg.broker.wildfly.home=<wildfly-install-dir> wildfly-extension:deploy

Since this is nothing more than a normal subsystem like all the other subsystems in WildFly, you can use the JBoss CLI to look at its configuration. To poke around, run the JBoss CLI in GUI mode ("jboss-cli.sh --gui") and look for the "org.rhq.msg.broker" subsystem.

In Closing

This is a prototype and was developed just to see how a messaging API around JMS and ActiveMQ can be made simpler and easier to use. I am sure I am missing some pieces of functionality (one such missing piece is security - this currently doesn't handle logging into a secured broker). If you see anything missing, let me know. Feel free to suggest corrections or enhancements. All the code is in github, so play around with it and see how useful it is.

Wednesday, November 5, 2014

Using a Message Broker to Send Audit Messages

One thing I've been working on on the side is a little project that uses a message broker to send and receive "auditing" events. Just briefly, the idea is that any piece of software might want to emit audit messages to log certain events that have occurred so that those events are captured for future reporting purposes. This project (call it rhq-audit for now) can provide a message broker if you need one, along with a high-level API that can be used to both emit and process these audit messages. For example, maybe you want your sales application to record each attempted login by a remote client, or maybe you want to record each sales order that has been submitted. Using the rhq-audit API, audit messages can be fired off on the message bus for later consumption by an rhq-audit consumer whose job it will be to store the audit record in a persistence store that can be used by reporting tools when audit reports need to be generated.

RHQ-Audit doesn't provide any reporting tools, but does provide a compact API that allows you to emit and store audit messages. You can persist your audit messages to a simple log file (see LoggerConsumer) or to a relationship database (see DataSourceConsumer), or to any custom backend (you would just need to implement your own BasicMessageListener and have the AuditRecordProcessor use it as its listener).

I talk a bit more detail on this type of thing in my previous blog on starting out with ActiveMQ. The code now in the rhq-audit repo is a bit different, but the concepts are still the same.

I separated out the message broker stuff (along with a generic and extensible consumer/producer API) into rhq-msg. rhq-audit extends rhq-msg to handle audit events. But the idea is rhq-msg can be used to provide a message bus along with an API for any type of messaging app you want to build.

In addition to the API, an embedded broker that you can use to embed in your VM or run standalone, and an extensible test API, there is also a custom Wildfly module extension that can be used to easily deploy a message broker inside your application server via Maven. See my previous blog on how to install that Wildfly extension via a mvn plugin.

To build and run the tests for rhq-audit and rhq-msg, all you need to do is run "mvn install" from the top level root directory.

Using the new Maven Wildfly Extension plugin to deploy module extensions

Libor is putting together a nice maven plugin to help you deploy your custom Wildfly module extensions to an existing Wildfly or EAP installation.

I decided to integrate it with my rhq-msg-broker-wf-extension project (I talked about my prototype of the rhq-msg work here). What this now allows us to do is deploy a message broker directly in an existing Wildfly or EAP installation via a simple mvn command line.

I won't repeat all of Libor's instructions - just go to his blog I linked above for the full details of how to integrate his mvn plugin in your mvn project - but in short all I had to do was create a snippet of .xml which includes the configuration I want to deploy in the Wildfly or EAP instance's standalone.xml (which includes the subsystem and the socket-binding) and then in the pom just define some settings for the plugin to let it know where to put my extension code and that XML configuration.

In the pom.xml, I have this to define the mvn plugin configuration - notice I allow the user to tell us where the app server is installed via a system property "org.rhq.msg.broker.wildfly.home":

        <plugin>
            <groupId>org.jboss.plugins</groupId>
            <artifactId>wildfly-extension-maven-plugin</artifactId>
            <configuration>
                <moduleZip>${project.build.directory}/${project.build.finalName}-module.zip</moduleZip>
                <jbossHome>${org.rhq.msg.broker.wildfly.home}</jbossHome>
                <modulesHome>modules/system/layers/base</modulesHome>
                <serverConfig>standalone/configuration/standalone.xml</serverConfig>
                <subsystem>${basedir}/src/main/scripts/standalone-subsystem.xml</subsystem>
                <profiles>
                    <profile>default</profile>
                </profiles>
                <socketBinding>${basedir}/src/main/scripts/socket-binding.xml</socketBinding>
                <socketBindingGroups>
                    <socketBindingGroup>standard-sockets</socketBindingGroup>
                </socketBindingGroups>
            </configuration>
        </plugin>

To publish the rhq-msg-broker-wf-extension custom module extension to an already-installed EAP instance, simply run this command (this sets the system property to point to a EAP install directory, and it uses the goal "wildfly-extension:deploy" which tells the mvn plugin to deploy the extension to that EAP installation):

mvn -Dorg.rhq.msg.broker.wildfly.home=/opt/jboss-eap-6.3 wildfly-extension:deploy

At this point, the standalone.xml will have been modified to include the custom module extension's subsystem configuration and the module code will have been copied to the EAP modules directory. At this point, once the app server is started, the module extension will be deployed and running.

Tuesday, July 29, 2014

Starting an ActiveMQ Project with Maven and Eclipse

I'm currently researching and prototyping a new subproject under the RHQ umbrella that will be a subsystem that can perform the emission and storage of audit messages (we're tentatively calling it "rhq-audit").

I decided to start the prototype with ActiveMQ. But one problem I had was I could not find a "starter" project that used ActiveMQ. I was looking for something with basic, skeleton Maven poms and Eclipse project files and some stub code that I could take and begin fleshing out to build a prototype. So I decided to publish my basic prototype to fill that void. If you are looking to start an ActiveMQ project, or just want to play with ActiveMQ and want a simple project to experiment with, then this might be a good starting point for you. This is specifically using ActiveMQ 5.10.

The code is in Github located at https://github.com/jmazzitelli/activemq-start

Once you clone it, you can run "mvn install" to compile everything and run the unit tests. Each maven module has an associated Eclipse project and can be directly imported into Eclipse as-is. If you have the Eclipse M2E plugin, these can be imported using that Eclipse Maven integration.

Here's a quick overview of the Maven modules and a quick description of some of the major parts of the code:

  • /pom.xml
    • This is the root Maven module's pom. The name of this parent module is rhq-audit-parent and is the container for all child modules. This root pom.xml file contains the dependency information for the project (e.g. dependency versions and the repositories where they can be found) and identifies the child modules that are built for the entire project.
  • rhq-audit-common
    • This Maven module contains some core code that is to be shared across all other modules in the project. The main purpose of this module is to provide code that is shared between consumer and producer (specifically, the message types that will flow from sender to receiver).
      • AuditRecord.java is the main message type the prototype project plans to have its producers emit and its consumers listen for. It provides JSON encoding and decoding so it can be sent and received as JSON strings.
      • AuditRecordProcessor.java is an abstract superclass that will wrap producers and consumers. This provides basic functionality such as connecting to an ActiveMQ broker and creating JMS sessions and destinations.
  • rhq-audit-broker
    • This provides the ability to start an ActiveMQ broker. It has a main() method to allow you to run it on the command line, as well as the ability to instantiate it in your own Java code or unit tests.
      • EmbeddedBroker.java is the class that provides the functionality to embed an ActiveMQ broker in your JVM. It can be configured using either an ActiveMQ .properties configuration file or an ActiveMQ .xml configuration file.
  • rhq-audit-test-common
    • The thinking with this module is that there is probably going to be common test code that is going to be needed between producer and consumer. This module is to support this. The intent is for other Maven modules in this project to list this module as a dependency with a scope of "test". For example, some common code will be needed to start a broker in unit tests - including this module as a test dependency will give unit tests that common code.
  • rhq-audit-producer
    • This provides the producer-side functionality of the project. The intent here is to flesh out the API further. This will become rhq-audit's producer API.
      • AuditRecordProducer.java provides a simple API that allows a caller to connect the producer to the broker and send messages. The caller need not worry about working with the JMS API as that is taken care of under the covers.
  • rhq-audit-consumer
    • This provides the consumer-side functionality of the project. The intent here is to flesh out the client-side API further. This will become the rhq-audit's consumer API.
      • AuditRecordConsumer.java provides a simple API that allows a caller to connect the consumer to the broker and attach listeners so they can process incoming messages.
      • AuditRecordListener.java provides the abstract listener class that is to be extended in order to process received audit records. The idea here is that subclasses can process audit records in different ways - perhaps one can store the audit records in a backend data store, and another can log the audit messages in rsyslog.
      • AuditRecordConsumerTest.java provides a simple end-to-end unit test that uses the embedded broker to pass messages between a producer and consumer.
Taking a look at AuditRecordConsumerTest shows how this initial prototype can be tested and shows how audit records can be sent and received through an ActiveMQ broker:

1. Create and start the embedded broker:
VMEmbeddedBrokerWrapper broker = new VMEmbeddedBrokerWrapper();
broker.start();
String brokerURL = broker.getBrokerURL();
3. Connect the producer and consumer to the test broker:
producer = new AuditRecordProducer(brokerURL);
consumer = new AuditRecordConsumer(brokerURL);
2. Prepare to listen for audit record messages:
consumer.listen(Subsystem.MISCELLANEOUS, listener);
3. Produce audit record messages:
producer.sendAuditRecord(auditRecord);
At this point, the messages are flowing and the test code will ensure that all the messages were received successfully and had the data expected.

A lot of the code in this prototype is generic enough to provide functionality for most messaging projects; but of course there are rhq-audit specific types such as AuditRecord involved. The idea is to now flesh out this generic prototype to further provide the requirements of the rhq-audit project. More on that will be discussed in the future. But for now, perhaps this could help others come up to speed quickly with an AcitveMQ project without having to start from scratch.

Tuesday, April 15, 2014

Completed Remote Agent Install

My previous blog post talked about work being done on implementing an enhancement request which asked for the ability to remotely install an RHQ Agent. That feature has been finished and checked into the master branch and will be in the next release.

I created a quick 11-minute demo showing the UI (which is slightly differently than what the prototype looked like) and demonstrates the install, start, stop, and uninstall capabilities of this new feature.

I can already think of at least two more enhancements that can be added to this in the future. One would be to support SSH keys rather than passwords (so you don't have to require passwords to make the remote SSH connection) and the other would be to allow the user to upload a custom rhq-agent-env.sh file so that file can be used to override the default agent environment (in other words, it would be used instead of the default rhq-agent-env.sh that comes with the agent distribution).

Thursday, March 20, 2014

Remote Install of JON Agent

A new feature request has been added to JBoss Operations Network. JON users will now be able to install agents on remote boxes from the UI as long as the remote box is accessible via SSH.

All you need is the hostname of the machine you want to install the agent on, its SSH port that it's listening to (default is 22) and the credentials of the user who will install (and run) the JON agent.

You can install, start, and stop the JON agent from this UI mechanism. You can also use it to get the status of any JON agent that might be installed as well and even attempt to find if and where an agent may be installed on the remote box.

Here's a snapshot of the UI page after I just successfully remote installed a JON agent: