fredag 5 juli 2013

MongoDB with Mule ESB Community Edition integration tips

Integrating MongoDB with Mule ESB Community Edition might sound trivial and in some ways it is with the built in cloud connector and transformer. However documentation is not flawless, the user interfaces not very intuitive and last but not least the functionality is very limited compared to what you get using the MongoDB Java libraries as an API.

To demonstrate this I'm going to use a simple use case.

Lets say you wanted to populate your MongoDB with the data  from a CSV but not by means of CSV row -> BSON document (as with the mongoimport tool) but rather a group of CSV rows -> BSON document to get more document like structures in your MongoDB collections. Remember that performance wise it is better to do operations inside of a MongoDB document than on many MongoDB operations. That said you should not use very big documents either.
After population is done you want to do some aggregations on specific fields in the documents.

Getting the data in

The problem here is that you do not want the data to be inserted row by row but grouped by and but based on the in data. The second issue is that not all fields in the CSV should be treated as Strings but might be numbers and that is very crucial when it comes to aggregating the values.
The mongoimport tool handles the second issue well , but Mule doesn't.

Two ways to go really since we cannot use the fancy Datamapper from Mule ESB Enterprise edition.

1) You could do this by using Mule ESB File endpoints listening for CSV's and then use Mule's flow controls like splitters, choice, different filters, Object to JSON.

2) The other way is to simply implement your own transformer extending the AbstractMessageTransformer and in the transformMessage method use some JSON library like Jackson or org.json and divide the CSV into JSON documents yourself.

To use an non-built in JSON library like org.json simply add it to your Maven pom as


If you use Jackson with the CSV extension you can also build a CSV schema based on a POJO that describes each fields value type and hence parse the CSV based on that. I found Jackson to be a bit overkil though for simple operations. If you use org.json and the built in CDL.toJSONArray method you will end up with the same issue with unknown types getting everyting treated as Strings as in the first Mule solution. 

You do not want to end up with documents containing:

 { "This is really a value of numbers" : "214245","This is a value of String" : "Hi there 123" }  

If you do there is a little hack you could use in both case 1) With Mule ESB's Regex filter or in case 2) With Javas String method replaceAll(). I will show you the Java version for clarity:

 String good_json_data = baad_json_data.replaceAll("(\"([0-9]+)(\\.[0-9]+)?\")+", "$2$3");  

This regular expression search out all fields in your JSON and replaces those that actually are numbers with their numeric representation (without quotes).

Now you can pass your JSON data as payload for the next Mule flow item either directly to a MongoDB connector element with insert or your own MongoDB Java component.

Your flow might look something like this depending on which way you choose to attack the issue.

Using the latest Mongo Java driver

If your plan is (like in our use case) to use aggregate functionality on your BSON documents and you are not very fond of writing tons of map / reduce function code into tiny text fields in Mule's MongoDB connectors user interface you will have to come up with something else than Mule's built in MongoDB support. Also be aware that MongoDB's excellent aggregation framework is not even included in the version of the Java driver that Mule uses.

Again you need to edit your Maven pom.


Also after this you need to remove any MongoDB references you might have in the namespaces declarations in your xml configuration our you will end up with very strange error messages because of conflicting libraries.

Now we have access to the sweet com.mongodb.MongoClient (by the way handles connection pooling for you) object which lets us write our own Java components to connect and query MongoDB.

For example the MongoDB insert component from the flow image above simply takes the JSON payload from the element before and stores it straight into MongoDB, sweet.

 package se.redpill.mulecomponents.mongo;  
 import org.bson.types.ObjectId;  
 import org.mule.api.MuleEventContext;  
 import org.mule.api.lifecycle.Callable;  
 import com.mongodb.DBObject;  
 import com.mongodb.WriteConcern;  
 import com.mongodb.util.JSON;  
 public class InsertMongoComponent extends AbstractMongoComponent implements Callable{  
      public Object onCall(MuleEventContext eventContext) throws Exception {  
           Object payload = eventContext.getMessage().getPayload();  
           DBObject thedata = (DBObject) JSON.parse((String) payload);  
           db.getCollection("mycollection").insert(thedata, WriteConcern.SAFE);  
        ObjectId id = (ObjectId) thedata.get("_id");  
     if (id == null) return null;  
     return id.toStringMongod();  

The AbstractMongoComponent extended simply holds the db connection details and are instantiated from the xml configuration like:

        <spring:bean id="mongoDb" class="com.mongodb.MongoClient" scope="singleton">  
          <spring:constructor-arg index="0" type="java.lang.String" value="localhost"/>       
          <spring:constructor-arg index="1" type="int" value="27017"/>  
        <spring:bean id="aggregateMongo" class="se.redpill.mulecomponents.mongo.AggregateMongoComponent" scope="singleton" init-method="init">  
             <spring:property name="mongoDb" ref="mongoDb"/>  
             <spring:property name="dbName" value="mydatabase"/>  
        <spring:bean id="insertMongo" class="se.redpill.mulecomponents.mongo.InsertMongoComponent" scope="singleton" init-method="init">  
             <spring:property name="mongoDb" ref="mongoDb"/>  
             <spring:property name="dbName" value="mydatabase"/>  

And when using the instance:

  <foreach doc:name="For Each">  
          <component doc:name="MongoDB insert">  
                  <spring-object bean="insertMongo"/>  


Getting aggregated data out

Now how to get data out? Well simply enough you just write a QueryMongoComponent in the same style as the InsertMongoComponent above and reference it from the xml in the same way.

But in our case we wanted to use MongoDB's aggregation framework. Same thing. Simply write a component that uses the latest Java drivers aggregation functionality. Like in this example:
 package se.redpill.mulecomponents.mongo;  
 import java.util.Map;  
 import org.mule.api.MuleEventContext;  
 import org.mule.api.annotations.param.OutboundHeaders;  
 import org.mule.api.annotations.param.Payload;  
 import org.mule.api.lifecycle.Callable;  
 import com.mongodb.AggregationOutput;  
 import com.mongodb.BasicDBObject;  
 import com.mongodb.CommandResult;  
 import com.mongodb.DBObject;  
 public class AggregateMongoComponent extends AbstractMongoComponent implements Callable{  
      public String aggregateObject() {  
        // create our pipeline operations,   
        // build the $projection operation  
        DBObject fields = new BasicDBObject("OurCoolDocument.SpecificValue", 1);  
        fields.put("OurCoolDocument.Unit", 1);  
        fields.put("_id", 0);  
        DBObject project = new BasicDBObject("$project", fields );  
        // now unwind  
        DBObject unwind = new BasicDBObject("$unwind", "$OurCoolDocument");  
        // Now the $group operation  
        DBObject groupFields = new BasicDBObject( "_id", "$OurCoolDocument.Unit");  
        groupFields.put("average", new BasicDBObject( "$avg", "$OurCoolDocument.SpecificValue"));  
        DBObject group = new BasicDBObject("$group", groupFields);  
        // Now the $match operation  
        DBObject match = new BasicDBObject("$match", new BasicDBObject("average", new BasicDBObject( "$gte", 93)) );  
        // run aggregation  
        AggregationOutput output = db.getCollection("mycollection").aggregate( project,unwind,group, match);  
        return output.toString();  
      public Object onCall(MuleEventContext eventContext) throws Exception {  
           return aggregateObject();  

You can of course  pass all the aggregation parameters on the Mule message to the component from the element before.

The results might look like this:

 serverUsed: "localhost/",  
 result: [  
 _id: "My special",  
 average: 93.55555555555556  
 _id: "Another special one",  
 average: 96.66666666666667  
 _id: "Whats so special?",  
 average: 93.77777777777777  
 _id: "Special for you my friend!",  
 average: 96.88888888888889  
 ok: 1  

Best of luck!

4 kommentarer:

  1. This is the blog that provide the lots of good information thanks for provide a sucha good information.

    Mulesoft Online Training Hyderabad


  2. That is very interesting; you are a very skilled blogger. I have shared your website in my social networks! A very nice guide. I will definitely follow these tips. Thank you for sharing such detailed article.

    Mulesoft online training hyderabad


  3. Great Article. its is very very helpful for all of us and I never get bored while reading your article because, they are becomes a more and more interesting from the starting lines until the end.

    Mulesoft online training hyderabad

  4. the blog is good and Interactive it is about Mule integration hacks it is useful for students and Mulesoft Developers for more updates on Mulesoft follow the link

    mulesoft Online cousre Bangalore

    For more info on other technologies go with below links

    Python Online Training

    tableau online training hyderabad

    ServiceNow Online Training