WSO2 CEP uses output adapters to send event from the CEP. We can write our own adapters to support our own output formats. That process is described in https://docs.wso2.org/display/CEP310/Writing+Custom+Event+Adaptors.
I am going to share my own implementation of a Output Event Adapter which uses to store event in a MongoDB database. The code is,
I am going to share my own implementation of a Output Event Adapter which uses to store event in a MongoDB database. The code is,
package org.emojot.cep.output.adaptor.mongodb; import com.mongodb.*; import com.mongodb.util.JSON; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.emojot.cep.output.adaptor.mongodb.internal.util.EmojotMongoDBEventAdaptorConstants; import org.json.JSONException; import org.json.JSONObject; import org.wso2.carbon.event.output.adaptor.core.AbstractOutputEventAdaptor; import org.wso2.carbon.event.output.adaptor.core.MessageType; import org.wso2.carbon.event.output.adaptor.core.Property; import org.wso2.carbon.event.output.adaptor.core.config.OutputEventAdaptorConfiguration; import org.wso2.carbon.event.output.adaptor.core.message.config.OutputEventAdaptorMessageConfiguration; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.ResourceBundle; public final class EmojotMongoDBEventAdaptorType extends AbstractOutputEventAdaptor { private static final Log log = LogFactory.getLog(EmojotMongoDBEventAdaptorType.class); private ResourceBundle resourceBundle; private List<Property> adaptorPropertyList; private List<Property> outputMessagePropertyList; private List<String> messageTypes; @Override protected String getName() { return EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_TYPE_EmojotMongoDB; } @Override protected List<String> getSupportedOutputMessageTypes() { return messageTypes; } @Override protected void init() { this.resourceBundle = ResourceBundle.getBundle("org.emojot.cep.output.adaptor.mongodb.i18n.Resources", Locale.getDefault()); adaptorPropertyList = new ArrayList<Property>(); outputMessagePropertyList = new ArrayList<Property>(); messageTypes = new ArrayList<String>(); Property property1 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1); property1.setDisplayName( resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1)); property1.setRequired(true); property1.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1_HINT)); this.adaptorPropertyList.add(property1); Property property2 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2); property2.setDisplayName( resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2)); property2.setRequired(true); property2.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2_HINT)); this.adaptorPropertyList.add(property2); Property property3 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3); property3.setDisplayName( resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3)); property3.setRequired(true); property3.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3_HINT)); this.adaptorPropertyList.add(property3); Property message_property1 = new Property(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1); message_property1.setDisplayName( resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1)); message_property1.setRequired(true); message_property1.setHint(resourceBundle.getString(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1_HINT)); this.outputMessagePropertyList.add(message_property1); this.messageTypes.add(MessageType.JSON); } @Override protected List<Property> getOutputAdaptorProperties() { return this.adaptorPropertyList; } @Override protected List<Property> getOutputMessageProperties() { return this.outputMessagePropertyList; } @Override public void publish( OutputEventAdaptorMessageConfiguration outputEventAdaptorMessageConfiguration, Object o, OutputEventAdaptorConfiguration outputEventAdaptorConfiguration, int tenantId) { String mongodbIPAddress=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD1); String mongodbPort=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD2); String mongodbDatabase=outputEventAdaptorConfiguration.getOutputProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_CONF_FIELD3); String mongodbCollection=outputEventAdaptorMessageConfiguration.getOutputMessageProperties().get(EmojotMongoDBEventAdaptorConstants.EVENT_ADAPTOR_Message_CONF_FIELD1); log.info("Configurations -"+mongodbIPAddress+":"+mongodbPort+":"+mongodbDatabase+":"+mongodbCollection); log.info("Output -"+o.toString()); log.info("Output Type-"+o.getClass().getName()); try { MongoClient mongoClient = new MongoClient( mongodbIPAddress, Integer.parseInt(mongodbPort) ); DB db = mongoClient.getDB( mongodbDatabase ); DBCollection collection = db.getCollection(mongodbCollection); if("java.lang.String".equals(o.getClass().getName())) { JSONObject result= new JSONObject(o.toString()); JSONObject payload=result.getJSONObject("event").getJSONObject("payloadData"); log.info("Output JSON-"+payload.toString()); DBObject dbObject = (DBObject)JSON.parse(payload.toString()); collection.insert(dbObject); } else{ log.error("Output Event is not in Correct Type"); } } catch (UnknownHostException e) { log.error("Error Opening Connection to Mongodb Database" + e.toString()); } catch (JSONException e) { log.error("Error Parsing JSON Result" + e.toString()); } } @Override public void testConnection( OutputEventAdaptorConfiguration outputEventAdaptorConfiguration, int tenantId) { //To change body of implemented methods use File | Settings | File Templates. } }
No comments:
Post a Comment