Gary Sieling

Using RabbitMQ for ETL of cloud storage APIs

Message queues are used as a communication between pieces of software, e.g. on a web site these are often used to request that a job be run in the background.

Compared to a database, they are optimized for message throughput and provide some mechanisms to send messages to many consumers – this could be useful if you were replicating data to many locations.

RabbitMQ is a popular message queuing product, and is fairly easy to use. In this article, I’ll show how to use it in a piece of software that pulls documents from DropBox, OneDrive, or Google Docs and indexes them for a search engine in Solr.

RabbitMQ lets us save a message string. If you use JSON, you can serialize any class, so for this example we’ll define something that takes an account ID for our application, the type of document store we’re indexing, and the OAuth credentials to DropBox.

public class IndexQueueMessage
{
  public String accountId { get; set; }
  public SearchableApplication requestedIndex { get; set; }
  public String user_key { get; set; } 
  public String user_token { get; set; } 
}

Connecting and receiving messages becomes very simple.

Queues are made to collect and pass on messages. The naming of these is one of the more important features, and the closest thing I’ve seen to a database design problem in message queueing – for instance, if you made one of these per account/tenant in your application, you could expose statistics about usage or charge people based on the quality of service you provide them (more indexers for their account)

var factory = new ConnectionFactory() 
{ 
  HostName = "tasks.cloudapp.net", 
  UserName = "user", 
  Password = "pwd"
};

factory.AutomaticRecoveryEnabled = true;

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
  var queue = indexer.GetType().ToString().ToLower() + "_indexer";

  channel.QueueDeclare(queue: queue,
    durable: true,
    exclusive: false,
    autoDelete: false,
    arguments: null);

  var consumer = new QueueingBasicConsumer(channel);
		
  channel.BasicConsume(
    queue: queue,
    noAck: false, // false = send acknowledgement from the job
    consumer: consumer
  );

  while (true)
  {
     ...
  }

To retrieve a message, we get the message bytes and deserialize it:

var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

try
{
  var body = ea.Body;
  var messageJson = Encoding.UTF8.GetString(body);

  executeMessage(messageJson, indexer);
}
finally
{
  debug("Acking job: " + ea.DeliveryTag);
  channel.BasicAck(
    deliveryTag: ea.DeliveryTag,
    multiple: false
  );
}

The sending side does something similar:

channel.BasicPublish(
  exchange: "",
  routingKey: app.ToString().ToLower() + "_indexer",
  basicProperties: properties,
  body: body);

When we receive messages, we can deserialize them to our message type:

try
{
  var jss = new System.Web.Script.Serialization.JavaScriptSerializer();
  var message = jss.Deserialize(messageJson);

  routeType(message.type, indexer, message);
}
catch (Exception e)
{
  error(e);
}

Finally, we need to route these to application code:

if (MESSAGE_START.Equals(type))
{
  indexer.startIndex(message);
} 
else if (MESSAGE_FOLDER.Equals(type)) 
{
  indexer.indexFolder(message);
} 
else if (MESSAGE_CONTENT.Equals(type)) 
{
  indexer.indexContent(message);
} 
else if (MESSAGE_RECENT.Equals(type)) 
{
  indexer.indexRecent(message);
}
else if (MESSAGE_NEXT.Equals(type)) 
{
  indexer.indexNext(message);
}

There are several things that are neat about this, which were not immediately apparent to me until I set this up.

One really nice feature (compared to cron) is that you can get events to run nearly instantaneously from the time they are requested.

If you use a common message queue, a web application could have clients that use many backend clients – as an example, you could use a pre-built image resizing utility in Go, a Java based natural language processing tool, and computer vision libraries in Python, with JSON as the interop language.

This design also allows for instant parallelism – you can simply connect more processing nodes to the cluster. I’ve found that many long running ETL scripts tend to accumulate resources (memory, disk, file handles, etc) if not carefully written. A nice side benefit of this design is that you can subdivide these types of problems by having scripts run a set number of jobs and shut down – it also makes failing and re-trying much simpler (especially in the out of memory case).

Exit mobile version