Showing posts with label CEP. Show all posts
Showing posts with label CEP. Show all posts

Sunday, June 1, 2014

WSO2 CEP Custom Output Event Adapter for MongoDB

WSO2 CEP uses output adapters to send event from the CEP. We can write our own adapters to support our own output formats. That process is described in https://docs.wso2.org/display/CEP310/Writing+Custom+Event+Adaptors.

I am going to share my own implementation of a Output Event Adapter which uses to store event in a MongoDB database. The code is,

package org.emojot.cep.output.adaptor.mongodb;

import com.mongodb.*;
import com.mongodb.util.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.emojot.cep.output.adaptor.mongodb.internal.util.EmojotMongoDBEventAdaptorConstants;
import org.json.JSONException;
import org.json.JSONObject;
import org.wso2.carbon.event.output.adaptor.core.AbstractOutputEventAdaptor;
import org.wso2.carbon.event.output.adaptor.core.MessageType;
import org.wso2.carbon.event.output.adaptor.core.Property;
import org.wso2.carbon.event.output.adaptor.core.config.OutputEventAdaptorConfiguration;
import org.wso2.carbon.event.output.adaptor.core.message.config.OutputEventAdaptorMessageConfiguration;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;

public final class EmojotMongoDBEventAdaptorType extends AbstractOutputEventAdaptor {


    private static final Log log = LogFactory.getLog(EmojotMongoDBEventAdaptorType.class);
    private ResourceBundle resourceBundle;
    private List<Property> adaptorPropertyList;
    private List<Property> outputMessagePropertyList;
    private List<String> messageTypes;


    @Override
    protected String getName() {
        return EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_TYPE_EmojotMongoDB;
    }

    @Override
    protected List<String> getSupportedOutputMessageTypes() {
        return messageTypes;
    }

    @Override
    protected void init() {
        this.resourceBundle = ResourceBundle.getBundle("org.emojot.cep.output.adaptor.mongodb.i18n.Resources", Locale.getDefault());

        adaptorPropertyList = new ArrayList<Property>();
        outputMessagePropertyList = new ArrayList<Property>();
        messageTypes = new ArrayList<String>();


        Property property1 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1);
        property1.setDisplayName(
                resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1));
        property1.setRequired(true);
        property1.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1_HINT));
        this.adaptorPropertyList.add(property1);

        Property property2 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2);
        property2.setDisplayName(
                resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2));
        property2.setRequired(true);
        property2.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2_HINT));
        this.adaptorPropertyList.add(property2);

        Property property3 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3);
        property3.setDisplayName(
                resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3));
        property3.setRequired(true);
        property3.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3_HINT));
        this.adaptorPropertyList.add(property3);


        Property message_property1 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1);
        message_property1.setDisplayName(
                resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1));
        message_property1.setRequired(true);
        message_property1.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1_HINT));
        this.outputMessagePropertyList.add(message_property1);

        this.messageTypes.add(MessageType.JSON);

    }

    @Override
    protected List<Property> getOutputAdaptorProperties() {
        return  this.adaptorPropertyList;
    }

    @Override
    protected List<Property> getOutputMessageProperties() {
        return  this.outputMessagePropertyList;
    }

    @Override
    public void publish(
            OutputEventAdaptorMessageConfiguration outputEventAdaptorMessageConfiguration,
            Object o, OutputEventAdaptorConfiguration outputEventAdaptorConfiguration,
            int tenantId) {


        String mongodbIPAddress=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1);
        String mongodbPort=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2);
        String mongodbDatabase=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3);

        String mongodbCollection=outputEventAdaptorMessageConfiguration.getOutputMessageProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1);

        log.info("Configurations -"+mongodbIPAddress+":"+mongodbPort+":"+mongodbDatabase+":"+mongodbCollection);
        log.info("Output -"+o.toString());
        log.info("Output Type-"+o.getClass().getName());

        try {
            MongoClient mongoClient = new MongoClient( mongodbIPAddress, Integer.parseInt(mongodbPort) );
            DB db = mongoClient.getDB( mongodbDatabase );
            DBCollection collection = db.getCollection(mongodbCollection);

            if("java.lang.String".equals(o.getClass().getName())) {
                JSONObject result= new JSONObject(o.toString());
                JSONObject payload=result.getJSONObject("event").getJSONObject("payloadData");

                log.info("Output JSON-"+payload.toString());

                DBObject dbObject = (DBObject)JSON.parse(payload.toString());
                collection.insert(dbObject);
            }
            else{
                log.error("Output Event is not in Correct Type");
            }

        } catch (UnknownHostException e) {
            log.error("Error Opening Connection to Mongodb Database" + e.toString());
        } catch (JSONException e) {
            log.error("Error Parsing JSON Result" + e.toString());
        }
    }

    @Override
    public void testConnection(
            OutputEventAdaptorConfiguration outputEventAdaptorConfiguration, int tenantId) {
        //To change body of implemented methods use File | Settings | File Templates.
    }
}

Monday, July 15, 2013

Communicating with WSO2 CEP through Thrift in NodeJS + JAVA

In our  final year research project we use WSO2 CEP to analyze complex event coming through a JSON stream. That JSON stream is coming form a NodeJS application. In this article I am describing how to send the JSON stream to WSO2 CEP using Thrift.

Initially I have developed a JAVA Thrift client to send JSON events to WSO2 CEP. That can be found in here https://github.com/andunslg/Sith/tree/master/SithCEPPublisher.

SithCEPPublisher is the main communicator class,
package org.sith.cep.publisher;

import org.sith.cep.publisher.config.SithPerceptionPublisherStreamConfig;
import org.sith.cep.publisher.config.StreamConfig;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.exception.*;
import java.net.MalformedURLException;

public class SithCEPPublisher{

    private DataPublisher dataPublisher;
    private StreamConfig streamConfig;

    public SithCEPPublisher(String url, String userName, String password){

        //KeyStoreUtil.setTrustStoreParams();
        System.setProperty("javax.net.ssl.trustStore","wso2carbon.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");

        System.out.println("Key Store is set..");
        //according to the convention the authentication port will be 7611+100= 7711 and its host will be the same

        try{
            dataPublisher = new DataPublisher(url, userName, password);
        }catch(MalformedURLException e){
            e.printStackTrace();
        }catch(AgentException e){
            e.printStackTrace();
        }catch(AuthenticationException e){
            e.printStackTrace();
        }catch(TransportException e){
            e.printStackTrace();
        }

        System.out.println("Logged in..");        
        this.streamConfig=new SithPerceptionPublisherStreamConfig();
        System.out.println("Default Stream Config Added..");
    }

    public SithCEPPublisher(String url, String userName, String password,String streamName, String streamVersion, String streamDefinition){

        //KeyStoreUtil.setTrustStoreParams();
        System.setProperty("javax.net.ssl.trustStore","wso2carbon.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");

        System.out.println("Key Store is set..");
        //according to the convention the authentication port will be 7611+100= 7711 and its host will be the same

        try{
            dataPublisher = new DataPublisher(url, userName, password);
        }catch(MalformedURLException e){
            e.printStackTrace();
        }catch(AgentException e){
            e.printStackTrace();
        }catch(AuthenticationException e){
            e.printStackTrace();
        }catch(TransportException e){
            e.printStackTrace();
        }

        System.out.println("Logged in..");

        this.streamConfig=new StreamConfig(streamName,streamVersion,streamDefinition);
        System.out.println("Stream Config Added..");
    }

    public String publishToCEP(Object[] metaDataArray,Object[] payloadArray) {

        System.out.println("Starting publishing process..");
        String streamId;

        try {
            streamId = dataPublisher.findStream(streamConfig.getStreamName(), streamConfig.getStreamVersion());
        } catch (NoStreamDefinitionExistException e) {
            try{
                streamId = dataPublisher.defineStream(streamConfig.getStreamDefinition());
                System.out.println("Stream Definition created..");
            }catch(AgentException e1){
                e1.printStackTrace();
                return "Failed";
            }catch(MalformedStreamDefinitionException e1){
                e1.printStackTrace();
                return "Failed";
            }catch(StreamDefinitionException e1){
                e1.printStackTrace();
                return "Failed";
            }catch(DifferentStreamDefinitionAlreadyDefinedException e1){
                e1.printStackTrace();
                return "Failed";
            }

        }catch(StreamDefinitionException e){
            e.printStackTrace();
            return "Failed";
        }catch(AgentException e){
            e.printStackTrace();
            return "Failed";
        }

        System.out.println("Stream created..");

        try{
            //dataPublisher.publish(streamId, new Object[]{ipAddress}, null, new Object[]{eventID, userID, percetionValue});
            dataPublisher.publish(streamId, metaDataArray, null, payloadArray);
        }catch(AgentException e){
            e.printStackTrace();
            return "Failed";
        }
        System.out.println("Data published..");
        return "Done";
    }
}
The I have two configuration classes,

StreamConfig class which has the configuration of JSON stream,
package org.sith.cep.publisher.config;

public class StreamConfig{

    private String streamName;
    private String streamVersion;
    private String streamDefinition;

    public StreamConfig(String streamName, String streamVersion, String streamDefinition){
        this.streamName=streamName;
        this.streamVersion=streamVersion;
        this.streamDefinition=streamDefinition;
    }

    public String getStreamName(){
        return streamName;
    }

    public void setStreamName(String streamName){
        this.streamName=streamName;
    }

    public String getStreamVersion(){
        return streamVersion;
    }

    public void setStreamVersion(String streamVersion){
        this.streamVersion=streamVersion;
    }

    public String getStreamDefinition(){
        return streamDefinition;
    }

    public void setStreamDefinition(String streamDefinition){
        this.streamDefinition=streamDefinition;
    }
}


SithPerceptionPublisherStreamConfig class which holds the stream definition,

package org.sith.cep.publisher.config;

public class SithPerceptionPublisherStreamConfig extends StreamConfig{
    private static String sithPerceptionStreamName="sith_Perception_Analytic";
    private static String sithPerceptionStreamVersion="1.0.0";
    private static String sithPerceptionStreamDefinition=

                    "{"+
                    "    'name':'sith_Perception_Analytics',"+
                    "    'version':'1.0.0',"+
                    "    'nickName': 'Sith Analytics',"+
                    "    'description': 'Sith_Perception_Analytics',"+
                    "    'metaData':["+
                    "        {"+
                    "            'name':'ipAdd','type':'STRING'\n"+
                    "        }"+
                    "    ],"+
                    "    'payloadData':["+
                    "        {"+
                    "            'name':'eventID','type':'STRING'"+
                    "        },"+
                    "        {"+
                    "            'name':'userID','type':'STRING'"+
                    "        },"+
                    "        {"+
                    "            'name':'perceptionValue','type':'STRING'"+
                    "        },"+
                    "        {"+
                    "            'name':'comment','type':'STRING'"+
                    "        }"+
                    "    ]"+
                    "}";

    public SithPerceptionPublisherStreamConfig(){
        super(sithPerceptionStreamName,sithPerceptionStreamVersion,sithPerceptionStreamDefinition);
    }

}

Then what I have did was invoking this JAVA code using nodejs application. To do that I have used this nodejs module which provides cross language integration. https://github.com/nearinfinity/node-java. The node JS appication is given below,
exports.publishComment = function(req,res){
    percepManager.publishToCEP(req.body.userID , req.body.eventID , req.body.perceptionValue,req.body.text);
    res.writeHead(200, {'Content-Type': 'application/json'});
      var result = JSON.stringify({response: true });
    res.write(result);
    res.end();
}

var java = require("java");
java.classpath.push("cep-publisher-1.0.jar");
var jClass = java.newInstanceSync("org.sith.cep.publisher.SithCEPPublisher","tcp://192.248.8.246:7611","admin","apst@sith");

exports.publishToCEP = function(userID,eventID,perceptionVal,comment) {
    var metaDataArray = java.newArray("java.lang.Object", ["192.248.8.246"]);
    var payloadArray = java.newArray("java.lang.Object", [eventID,userID,perceptionVal,comment]);
    var result=jClass.publishToCEPSync(metaDataArray,payloadArray);
    console.log("Returned data - "+result);
} 
To run this application what you have to do is run npm install java and then run node app command. After that you can call this method to send the defined JSON stream to CEP.