Dynamic CEP - StreamAnalytix

Dynamic CEP

DynamicCEP allows registration of queries with pre-defined actions (“PUBLISH_TO_RABBITMQ”, “INVOKE_WEBSERVICE_CALL” and “CUSTOM_ACTION”) applied on the running pipeline. It also enables dynamic configuration of queries and actions such as Update and Delete.

Detailed information about the pre-defined actions is as mentioned below:

  • PUBLISH_TO_RABBITMQ: this action publishes data to the RabbitMQ for the defined exchange name and the routing key
  • INVOKE_WEBSERVICE_CALL: this action invokes a web service and publish the data to the same defined URL (supports two Methods: POST | PUT)
  • CUSTOM_ACTION: unlike the other two pre-defined actions, it can be used to edit the data, or publish the data to a custom location.

The procedure to register a query involves working on StreamAnalytix UI and REST UI simultaneously. Query Registration and Update is only possible through the REST UI, whereas Query Deletion is possible through both.

As a first step, create a Data Pipeline and Save it. Configure the Basic and Advanced configurations. To configure the DynamicCEP query, provide the Component ID in the REST UI. Now edit the pipeline and make note of the Component ID as it should now be renamed as per the pipeline’s name.

This section will let you specify the metadata of the data using the REST interface.

Let us take a sample JSON representing an Employee data that can be defined as follows:

Next is to configure an entity by defining the schema of the entity.

Prerequisites: For making any RESTClient call, Request Header must be set to “TokenName: TokenValue” otherwise it gives an error of “Unauthorized authentication” token.

To get the Token value, Go to the tab Manage Users, and Edit User details.

Create a header with the name ‘token’. Now go back to the S-Ax UI, and create a Data Pipeline. Go to theQuery Config tab of the DynamicCEP processor and copy the Sample REST Client URL: http://<<IP:PORT>>/StreamAnalytix/datafabric/dynacep/query/register

Go to the Rest Client and paste the URL as shown below. Provide the IP and the PORT numbers of the machine where StreamAnalytix is deployed.

The token used can be seen under Headers. Write the schema JSON in the Body.

How to Register a CEPConfiguration?

  • Method:  Method section should have the value: POST
  • URLhttp://<<IP:PORT>>/StreamAnalytix/datafabric/dynacep/query/register/componentId
  • BODY: It is a sample entity schema JSON. Make sure that the syntax remains the same and the keys, however with the requirements, you can change the values as shown in the highlighted section of Figure below “Sample Entity Schema JSON”. Use the keys below for creating the entity schema JSON:

As mentioned in the Figure above, the key cepAction can have multiple actions. However it is required to have atleast one action. Also regarding headerParams, requestParams and initParams keys, you can have multiple values as well or none.

Before clicking Send make sure that the user is logged into the application, else “context is not initialized” message will occur. Status “SUCCESS” notifies the successful completion of CEPConfiguration. Keep the cepQueryId handy for further operations.

How to update the CEPConfiguration?

  • URL: URL section should have the value:http://<<IP:PORT>>/StreamAnalytix/datafabric/dynacep/query/update/componentId/cepQueryId
  • BODY: Use the keys below for updating the entity schema JSON:

How to delete the CEPConfiguration?

  • URL to delete CEP configuration based on componentId and cepQueryId: http://<<IP:PORT>>/StreamAnalytix/datafabric/dynacep/query/delete/componentId/cepQueryId.
  • BODY: Leave it blank.

How to get CEP configuration?

  • URL to get all the CEP configuration:http://<<IP:PORT>>/StreamAnalytix/datafabric/dynacep/query/get.
  • BODY: Leave it blank.

Schedule a Demo