Sunday, June 1, 2014

Pin It


Get Gadget

WSO2 CEP Custom Output Event Adapter for MongoDB

WSO2 CEP uses output adapters to send event from the CEP. We can write our own adapters to support our own output formats. That process is described in https://docs.wso2.org/display/CEP310/Writing+Custom+Event+Adaptors.

I am going to share my own implementation of a Output Event Adapter which uses to store event in a MongoDB database. The code is,

package org.emojot.cep.output.adaptor.mongodb;

import com.mongodb.*;
import com.mongodb.util.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.emojot.cep.output.adaptor.mongodb.internal.util.EmojotMongoDBEventAdaptorConstants;
import org.json.JSONException;
import org.json.JSONObject;
import org.wso2.carbon.event.output.adaptor.core.AbstractOutputEventAdaptor;
import org.wso2.carbon.event.output.adaptor.core.MessageType;
import org.wso2.carbon.event.output.adaptor.core.Property;
import org.wso2.carbon.event.output.adaptor.core.config.OutputEventAdaptorConfiguration;
import org.wso2.carbon.event.output.adaptor.core.message.config.OutputEventAdaptorMessageConfiguration;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;

public final class EmojotMongoDBEventAdaptorType extends AbstractOutputEventAdaptor {


    private static final Log log = LogFactory.getLog(EmojotMongoDBEventAdaptorType.class);
    private ResourceBundle resourceBundle;
    private List<Property> adaptorPropertyList;
    private List<Property> outputMessagePropertyList;
    private List<String> messageTypes;


    @Override
    protected String getName() {
        return EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_TYPE_EmojotMongoDB;
    }

    @Override
    protected List<String> getSupportedOutputMessageTypes() {
        return messageTypes;
    }

    @Override
    protected void init() {
        this.resourceBundle = ResourceBundle.getBundle("org.emojot.cep.output.adaptor.mongodb.i18n.Resources", Locale.getDefault());

        adaptorPropertyList = new ArrayList<Property>();
        outputMessagePropertyList = new ArrayList<Property>();
        messageTypes = new ArrayList<String>();


        Property property1 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1);
        property1.setDisplayName(
                resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1));
        property1.setRequired(true);
        property1.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1_HINT));
        this.adaptorPropertyList.add(property1);

        Property property2 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2);
        property2.setDisplayName(
                resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2));
        property2.setRequired(true);
        property2.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2_HINT));
        this.adaptorPropertyList.add(property2);

        Property property3 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3);
        property3.setDisplayName(
                resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3));
        property3.setRequired(true);
        property3.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3_HINT));
        this.adaptorPropertyList.add(property3);


        Property message_property1 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1);
        message_property1.setDisplayName(
                resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1));
        message_property1.setRequired(true);
        message_property1.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1_HINT));
        this.outputMessagePropertyList.add(message_property1);

        this.messageTypes.add(MessageType.JSON);

    }

    @Override
    protected List<Property> getOutputAdaptorProperties() {
        return  this.adaptorPropertyList;
    }

    @Override
    protected List<Property> getOutputMessageProperties() {
        return  this.outputMessagePropertyList;
    }

    @Override
    public void publish(
            OutputEventAdaptorMessageConfiguration outputEventAdaptorMessageConfiguration,
            Object o, OutputEventAdaptorConfiguration outputEventAdaptorConfiguration,
            int tenantId) {


        String mongodbIPAddress=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1);
        String mongodbPort=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2);
        String mongodbDatabase=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3);

        String mongodbCollection=outputEventAdaptorMessageConfiguration.getOutputMessageProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1);

        log.info("Configurations -"+mongodbIPAddress+":"+mongodbPort+":"+mongodbDatabase+":"+mongodbCollection);
        log.info("Output -"+o.toString());
        log.info("Output Type-"+o.getClass().getName());

        try {
            MongoClient mongoClient = new MongoClient( mongodbIPAddress, Integer.parseInt(mongodbPort) );
            DB db = mongoClient.getDB( mongodbDatabase );
            DBCollection collection = db.getCollection(mongodbCollection);

            if("java.lang.String".equals(o.getClass().getName())) {
                JSONObject result= new JSONObject(o.toString());
                JSONObject payload=result.getJSONObject("event").getJSONObject("payloadData");

                log.info("Output JSON-"+payload.toString());

                DBObject dbObject = (DBObject)JSON.parse(payload.toString());
                collection.insert(dbObject);
            }
            else{
                log.error("Output Event is not in Correct Type");
            }

        } catch (UnknownHostException e) {
            log.error("Error Opening Connection to Mongodb Database" + e.toString());
        } catch (JSONException e) {
            log.error("Error Parsing JSON Result" + e.toString());
        }
    }

    @Override
    public void testConnection(
            OutputEventAdaptorConfiguration outputEventAdaptorConfiguration, int tenantId) {
        //To change body of implemented methods use File | Settings | File Templates.
    }
}

No comments:

Post a Comment