WSO2 CEP uses output adapters to send event from the CEP. We can write our own adapters to support our own output formats. That process is described in https://docs.wso2.org/display/CEP310/Writing+Custom+Event+Adaptors.
I am going to share my own implementation of a Output Event Adapter which uses to store event in a MongoDB database. The code is,
I am going to share my own implementation of a Output Event Adapter which uses to store event in a MongoDB database. The code is,
package org.emojot.cep.output.adaptor.mongodb;
import com.mongodb.*;
import com.mongodb.util.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.emojot.cep.output.adaptor.mongodb.internal.util.EmojotMongoDBEventAdaptorConstants;
import org.json.JSONException;
import org.json.JSONObject;
import org.wso2.carbon.event.output.adaptor.core.AbstractOutputEventAdaptor;
import org.wso2.carbon.event.output.adaptor.core.MessageType;
import org.wso2.carbon.event.output.adaptor.core.Property;
import org.wso2.carbon.event.output.adaptor.core.config.OutputEventAdaptorConfiguration;
import org.wso2.carbon.event.output.adaptor.core.message.config.OutputEventAdaptorMessageConfiguration;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
public final class EmojotMongoDBEventAdaptorType extends AbstractOutputEventAdaptor {
private static final Log log = LogFactory.getLog(EmojotMongoDBEventAdaptorType.class);
private ResourceBundle resourceBundle;
private List<Property> adaptorPropertyList;
private List<Property> outputMessagePropertyList;
private List<String> messageTypes;
@Override
protected String getName() {
return EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_TYPE_EmojotMongoDB;
}
@Override
protected List<String> getSupportedOutputMessageTypes() {
return messageTypes;
}
@Override
protected void init() {
this.resourceBundle = ResourceBundle.getBundle("org.emojot.cep.output.adaptor.mongodb.i18n.Resources", Locale.getDefault());
adaptorPropertyList = new ArrayList<Property>();
outputMessagePropertyList = new ArrayList<Property>();
messageTypes = new ArrayList<String>();
Property property1 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1);
property1.setDisplayName(
resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1));
property1.setRequired(true);
property1.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1_HINT));
this.adaptorPropertyList.add(property1);
Property property2 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2);
property2.setDisplayName(
resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2));
property2.setRequired(true);
property2.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2_HINT));
this.adaptorPropertyList.add(property2);
Property property3 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3);
property3.setDisplayName(
resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3));
property3.setRequired(true);
property3.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3_HINT));
this.adaptorPropertyList.add(property3);
Property message_property1 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1);
message_property1.setDisplayName(
resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1));
message_property1.setRequired(true);
message_property1.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1_HINT));
this.outputMessagePropertyList.add(message_property1);
this.messageTypes.add(MessageType.JSON);
}
@Override
protected List<Property> getOutputAdaptorProperties() {
return this.adaptorPropertyList;
}
@Override
protected List<Property> getOutputMessageProperties() {
return this.outputMessagePropertyList;
}
@Override
public void publish(
OutputEventAdaptorMessageConfiguration outputEventAdaptorMessageConfiguration,
Object o, OutputEventAdaptorConfiguration outputEventAdaptorConfiguration,
int tenantId) {
String mongodbIPAddress=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1);
String mongodbPort=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2);
String mongodbDatabase=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3);
String mongodbCollection=outputEventAdaptorMessageConfiguration.getOutputMessageProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1);
log.info("Configurations -"+mongodbIPAddress+":"+mongodbPort+":"+mongodbDatabase+":"+mongodbCollection);
log.info("Output -"+o.toString());
log.info("Output Type-"+o.getClass().getName());
try {
MongoClient mongoClient = new MongoClient( mongodbIPAddress, Integer.parseInt(mongodbPort) );
DB db = mongoClient.getDB( mongodbDatabase );
DBCollection collection = db.getCollection(mongodbCollection);
if("java.lang.String".equals(o.getClass().getName())) {
JSONObject result= new JSONObject(o.toString());
JSONObject payload=result.getJSONObject("event").getJSONObject("payloadData");
log.info("Output JSON-"+payload.toString());
DBObject dbObject = (DBObject)JSON.parse(payload.toString());
collection.insert(dbObject);
}
else{
log.error("Output Event is not in Correct Type");
}
} catch (UnknownHostException e) {
log.error("Error Opening Connection to Mongodb Database" + e.toString());
} catch (JSONException e) {
log.error("Error Parsing JSON Result" + e.toString());
}
}
@Override
public void testConnection(
OutputEventAdaptorConfiguration outputEventAdaptorConfiguration, int tenantId) {
//To change body of implemented methods use File | Settings | File Templates.
}
}

No comments:
Post a Comment