Tuesday, February 16, 2010

ActiveMQ 5.4: JMS Timer and CRON Scheduler

I've added a persistent scheduler engine to ActiveMQ, so that messages that are scheduled for delivery will always survive restarts. The scheduling of some message in the future can be useful - but deciding  a scheduling syntax that is flexible, but can also fit into Message header properties is a little tricky.  I've decided the best approach is to use a combination of different concepts - simple delay, repeats and CRON.

There are a few choices of around scheduling messages to deliver in the future(inspiration from java.util.Timer)  - a simple delay - this one will deliver the message in a minute:

long time = 60 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); 

Delivery at a fixed rate - this example will delivery 10 messages, waiting 10 seconds between each message - note the repeat count is 9 (9 + first one = 10).
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 10;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, COUNT repeat);

For more advanced messaging you can use CRON scheduling - this one will deliver a message at 2AM on the 12 day of every month:

message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 2 12 * *");

You can combine delays and repeats with CRON. For example, you may wish to send 10 messages, every 30 minutes, with a delay of 10 seconds between each one.

message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "30 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10*1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);

As I quickly realised whilst testing -  having a persistent message scheduler that survives re-starts really needs management support - so that you can delete those pesky jobs that you accidentally scheduled ;). So there's JMX support - and the ActiveMQ Web Console has been updated too:

Sunday, February 14, 2010

A funny thing happened to my ActiveMQ Performance

In ActiveMQ we have a message cursoring system to ensure that there is no memory restriction on the size of the queues we use. However, we do cache messages to improve performances - as depicted below:

The cursor cache is limited by memory constraints that are defined in the ActiveMQ configuration. Usually the cache is valid, but if memory constraints hit, - or consumers lag to far behind producers, the cache is cleared down. However for every message dispatched through the cursor after the cache is cleared,  the cursor will check if the Queue is empty in the store to determine if it can start using the cache again.

This all sounds eminently sensible. However, the code to check if the Queue was empty in the store called for a count on the outstanding messages. In order to determine the size of the Queue, the BTree index used for KahaDB would iterate through every BTree node and, er count them. ActiveMQ would do this for every message dispatched after the cache became invalid.

Just to labour the point, for every message dispatched, every BTree node for the BTree index would be paged into memory in the same transaction  and counted ....

Although this is an issue with some serious side-affects - like being a memory hog when using KahaDB and seriously affecting performance -  it does highlight that for distributed systems there are always going to be behaviour  that you can't always anticipate.

The reason I found this issue so interesting is because it wasn't obvious - its an edge case that happens under only under certain load conditions with certain configurations - but was very straightforward to fix - which always makes me smile :)

This issue only affects KahaDB and is fixed in trunk, the ActiveMQ 5.3.1 branch and Fuse Message Broker versions - and for those interested - the code change is here.