Tuesday, October 10, 2017

Simple sequential workflow using Spring framework and Spring Expression Language

Requirement

Recently, in one of my projects, there was a requirement for simple workflow around some background processing tasks. Most of the tasks were already implemented so we had to just implement the workflow around those.

Available solutions

There are many workflow frameworks available for Java. Even though they met the expectations, they seemed to be too heavy for a simple requirement. Next, there were blogs around how to use Spring framework for creating simple workflows using Spring beans. This indeed was what I was looking for. However, something was missing here. Each action in the workflow can be a Spring bean with a simple interface having an execute method as follow:

But how do we pass parameters to the actions. Most of the implementations would pass a map between actions to the execute() method. This means an action would add it's result to the map using a key and the next action would read the value from this map. It would make the actions aware of each other or at least aware of the keys for passing information. If the key or the data type of value changes then the next action reading the value would break.

Expected behavior

It would have been far better to implement an action as a normal class with its dependencies being injected using, say, Spring dependency injection. Then during workflow execution the execute() method can be invoked to get the desired output. We should be able to pass the output from one action to the next action without the actions being aware of each other.

Solution

Using the Spring Expression Language

The Spring Expression Language allows to have simple expressions defined and evaluated against specified object instance. The bean XML allows using such expression to inject property values. The idea was to use SpEL for defining the values to be injected into the workflow action. Let's take an example of a simple action which takes a string as an input and simply returns reverse string as output. The action is implemented as follows:

It simply takes a string value as an input in the constructor itself and in the execute() call returns the reverse string. The instance of the above action will be created within the code using the application context's getBean() method. But how do we inject the required value? For that first let's have a look at how to define the bean for this action class.

Above is a simple bean definition mentioning the implementation class and the parameters. In this case we are indicating the value to be passed to the constructor. Notice, that the value is actually an expression starting with %{ and ending with }. We have defined our custom expression prefix using % because Spring already uses # and $ for identifying it's expressions. The variable name source can be anything you want. For our example, the source refers to the original input to the workflow. It can be anything from a simple string, integer, class instance etc. to array, set, map etc.

For our workflow, we will have list of beans representing the actions. The workflow will be responsible to create instance of the actions and execute them but while doing so it should be able to pass the parameters defined by our custom expression. Let's see how we can do this.

Using custom expression resolver

Spring allows to have custom expression resolver implementing the BeanExpressionResolver interface. Following is the snippet for our BeanInstantiator class implementing this interface as well as the BeanFactoryPostProcessor interface.

The postProcessBeanFactory() method at Line 4 is for the BeanFactoryPostProcessor interface. Here, we are going to register our custom resolver which is the same class. Also, we are preserving the existing resolver.

The evaluate() method at Line 10 is for the BeanExpressionResolver interface. Whenever, Spring encounters an expression then this method will be called. Here, we are checking if the expression is our custom expression. The expression string can be enclosed in %{} e.g. %{source} or it can be part of any other string value e.g. This will have %{source} expression embedded inside. If we have a custom expression then:
  • we create instance of SpelExpressionParser.
  • parse the expression.
  • invoke the parsed expression and pass in the required data.
In above case, Line 16 delegates the evaluation call to doEvaluate() method which does above steps. One thing abstracted here currently on Line 16 is the EXPRESSION_EVALUATION_CONTEXT which is instance of EvaluationContext. We are storing this value in thread local because the evaluate() method is called internally by Spring and there is no other way for us to pass the context.

Passing the context to Spring expression resolver

In above snippet we have seen how to add our own expression resolver. But how do we pass in the required data to the resolver? In our case we have use the expression as %{source} saying pass in the original source input to the action instance through constructor. Following is the additional snippet for the same BeanInstantiator class above (previous methods removed for readability).

The BeanInstantiator class also implements the ApplicationContextAware interface. So we get the application context passed into the setApplicationContext() method at Line 4.

The BeanInstantiator class exposes the getAction() method. This method takes the bean id of the workflow action to be instantiated. It then uses the applicationContext.getBean() method to create instance of the action class. But how will our custom expression be resolved to inject the required value specified by the expression %{source}.

Line 11 in getAction() method creates an instance rootEvalObject of anonymous class having getters getContext(), getSource(), getOutput() returning the original values.

Line 16 in getAction() creates the instance of StandardEvaluationContext passing the rootEvalObject.

But getAction() is our method and we are just calling the applicationContext.getBean() to get the instance. Spring will internally call the evaluate() method on our class since it also implements the BeanExpressionResolver interface.

Line 17 sets the StandardEvaluationContext instance to thread local variable EXPRESSION_EVALUATION_CONTEXT and then calls applicatinContext.getBean() to create the instance. In the evaluate() method called by Spring we retrieve this context and then evaluate the expression.

Since we have used the getters as getContext(), getSource(), getOutput() you can refer to them as properties or getters directly in expressions e.g. as %{source} or %{getSource()}

You can add as many properties you want like getContext(), getOutput() etc. returning whatever values you want. You can even add methods to be used in expression e.g. to transform data, to get some configuration etc.

Workflow-lite implementation

The implementation for the workflow is at: https://github.com/ajeydudhe/workflow-lite

It allows one to define & execute the workflow using UML activity diagram. Mostly useful for implementing predefined workflows in a product.

Thursday, June 22, 2017

Elasticsearch: Extending ES using AOP

Elasticsearch provides the ability for extending the basic functionality using scripts or plug-ins. But at the same time ES has restrictions on changing or extending existing functionality like search actions etc. For example, say, I need to change the result set for every query or change the input search criteria etc. In such a case, we can use AOP to extend ES functionality.
AOP is a vast topic in itself so I will not cover it here. Basically, it allows to control what happens before a method execution, after a method execution, change the original input parameters, change the return values etc.
For this post we will try to monitor the ES search parameters using AOP. We will use AspectJ programming using maven project as follows:
  • Create the maven project
  • Find out the ES methods to be monitored
  • Define the Pointcuts & Advices
  • Compile the jar
  • Use Load Time Weaving
  • Start ES and monitor the queries

Source code
The source code for this example is located here.

Create the maven project
Create a simple  maven jar project with following POM xml.
  • Add dependencies for aspectjrt line 13-17 and aspectjweaver line 18-22
  • Add aspectj-maven-plugin line 31-48

ES methods to monitor
We will monitor the searches by ES. The actual search execution is done by the SearchService. We will monitor the executeSearch() and executeFetch() methods on this class.

Define Pointcuts & Advices
The SearchServiceAspect class defines the Pointcuts & Advices using annotations. Following is the definition to monitor the SearchService.executeSearch() method:
  • Line 5 defines the Around advice.
  • The advice is for ES method SearchService.executeQueryPhase() which takes two parameters: ShardSearchTransportRequest & SearchTask
  • It also mentions that these two should be passed to our method which will handle the execution.
  • Line 6 is the method in our class which will be executed when SearchService.executeQueryPhase() is to be executed.
  • In this method we can decide what to do i.e. do some processing or continue execution using joinPoint.proceeed() etc.
  • For this example, we are just logging the search query information.

aop.xml
Next we define the aop.xml which acts as input to the AOP compiler.
  • Line 4 declares that our aspect pointcuts & advices are in class my.elasticsearch.aspects.SearchServiceAspect
  • Line 6 tells the weaver to be more verbose to help troubleshooting the issues if any.
  • Line 7 tells the weaver which class in the target application is to be woven.
  • The aop.xml needs to be present at src/main/resources/META-INF in the project.

Deploying the jars
  • Compile the project and generate the jar using command: mvn install
  • Copy the generated es5x-method-interceptor-0.0.1-SNAPSHOT.jar into the elasticsearch/lib folder as per the ES installation.

Configure ES startup
Now, we need to start ES using the aspect weaver as java agent. 
  • Copy the aspectjweaver-1.8.9.jar into elasticsearch/bin folder.
  • Open the bin/elasticsearch.bat or bin/elasticsearch.sh file (I have verified with Windows batch file)
  • Add following line before the java command execution to launch the ES
             SET ES_JAVA_OPTS=%ES_JAVA_OPTS% -Djava.security.policy=enable_aspectj_classes.policy -javaagent:aspectjweaver-1.8.9.jar
  • With above change we are basically passing two additional command line parameters while start the ES:
    • javaagent: Using the aspectj weaver as java agent.
    • java.security.policy: ES 5x uses java security manager and hence our aspect weaver won't work. Using the above parameter we are asking the java security manager to grant the required permissions. The content of enable_aspectj_classes.policy is as follows:
                    grant {
                                      permission java.security.AllPermission;
                                    };
          • We are granting all permissions to the application. It is recommended that you find out granular permissions by enabling the logging at access level and update the above file as required.
          • The file is placed in elasticsearch/bin folder.
      The logs from weaver won't be included in the ES logs. So you can redirect the ES command line output (if not running as service/daemon) to a file as follows:
      elasticsearch.bat > c:\es5x.log 2>&1
      You should see following logs:
      Line 18 & 19 indicates that out pointcuts & advices have been woven.

      Populating test data
      • We will create two indices as aop_test_01 & aop_test_02 having file type with following data:
              {"name": "file_01", "size": 5678}
      • Populate as many entries as you want.
      • Make sure both indices have some data

      Querying the data
      Lets issue a simple search snapping both the indices as:
      http://localhost:9200/aop_test_*/_search
      You should see that our advice is getting executed from the following logs:
      • Since we are querying multiple indices resulting in multiple shards query we see both executeQueryPhase() & executeFetchPhase() advice getting called.
      • In executeQueryPhase() we are just logging the indices. We can log additional info like the query to be executed etc.
      • In the executeFetchPhase() we are also logging the docIds which the search returns.
      • You can read more on how search gets executed here.

      Summary
      With this simple POC we have hooked our custom code into ES which will allow use to customize ES behavior as needed. Currently, we have just logged the input requests but can do more like change the input/output etc.