Friday, January 30, 2015

Elasticsearch: Get distinct\unique values using search instead of aggregation - Part 2

The previous post described how custom script filter can be used to filter out duplicate documents in the result set. However, the approach would not work with multiple shards. To handle this case we need to have some plugin implementation to hook into Elasticsearch where we can filter out the duplicate documents fetched from multiple shards. But before that let's have a quick look at how distributed search works.

Distributed search

Elasticsearch documentation covers distributed search very well. Following is the summary:
  • Distributed search is executed in two phases: Query and Fetch phase
  • Query phase
    • The coordinating node sends the search request to all the shards.
    • Each shard executes the search and sends back list of document IDs and the sort values back to the coordinating node.
      • For example, get documents where file.folder = 'c:\windows' sorted by
      • In above case, say, shard 1 matches three documents with file names as notepad.exe, textpad.exe, wordpad.exe. So the return values would be
        • DocID: some_doc_id_1, Sort values: [notepad.exe]
        • DocID: some_doc_id_2, Sort values: [textpad.exe]
        • DocID: some_doc_id_3, Sort values: [wordpad.exe]
    • All shards returns similar data back to the coordinating node.
    • The coordinating node now has list of fields on which sorting needs to be done and also the actual values for these fields per document.
    • This node then merges all the results into a single list.
    • The list is sorted using the sort values returned by each shard for each document.
    • The final list is prepared by applying pagination (from & size) parameters.
  • Fetch phase
    • Once the final list is prepared the coordinating node then sends the request to get the required documents from each shard using the document IDs i.e. send a multi-get request.

Remove duplicate documents in distributed search

As seen above if we need to filter out the duplicate documents for distributed search then we need to hook into the QUERY phase. Once the document IDs & sort values are retrieved for all the shards then we need to remove duplicate documents here. The steps to achieve this are as follows:
  • Apart from document Ids & sort values we also need the values for primary keys identifying the document as unique in the result set.
  • Once we have above information then we can remove duplicate documents using the primary keys. 

Passing back the primary key values in query phase

In QUERY phase for a document only its ID and sort values are passed back. The _source value is not passed. So only option to pass back the primary key values is using the sort values. For this we can use custom script for sorting. This custom script will be similar to the custom filter mentioned in previous post.
  • The script will take primary key field names as parameter.
  • The script will form a key by concatenating the values of primary key fields.
  • In the search request we will add this script based sort criteria at the end of existing sort fields, if any.
  • This way, for each document, we will get the key value during the QUERY phase itself on the coordinating node.

Removing duplicate documents

Now, we need to hook into the QUERY phase using plugin approach. Thanks to the reference plugin implementation here. By looking at this sample plugin I was able to deduce that the final sorting of docs from all shards happens in's sortDocs() method. This class is used deep inside and there is no direct way to hook into it. So I had to follow the same approach as in the reference plugin implementation i.e. to implement a custom action. In the plugin, the required classes and methods were extended\overridden. Finally, the class extending SearchPhaseController overrides the sortDocs() method. In this we remove the duplicate documents from across the shards and then call the original sortDocs() method.

Source code

You can start with the junit tests.


  • This approach extends the ES classes and methods which may change in future.
  • One major limitation is that the totalHits returned by the search request will not be accurate when pagination is used. This is because each shard will return the totalHits as the total documents matching the filter but actual documents returned can be less if pagination is used e.g. if total matches could be 100 but pagination criteria could be to return only 10 results. In this case the totalHits will be 100. Now, we do not know the remaining 90 documents are duplicate or not. But this should not be an issue if we can use the totalHits as an approximate count.

Saturday, January 10, 2015

Elasticsearch: Get distinct\unique values using search instead of aggregation - Part 1

Problem statement

While dealing with NoSQL datastores the key aspect for schema design is de-normalization or, in other words, defining your schema as per the query requirements. But you cannot keep on de-normalizing for each and every use case. One of such examples is fetching unique records in a query. Relational databases handle this use case by providing DISTINCT or equivalent keyword support. Unfortunately Elasticsearch does not have such built-in support.

Use case

For example, say, you have a files type which store file information as well as information of the machine on which the file is seen i.e. file_hash, file_name, file_folder, file_size, machine_name, machine_ip etc.With above type defined in ES we can have following queries around it:
  1. Given a file_hash get all the machines on which it is seen.
  2. Given a file_hash get all the unique combination of (file_name & file_folder) across all machines.
One of the solutions is to de-normalize i.e. create additional types in ES to store the data such that we get unique records for a given file_hash using normal search request.
Other common approach is to use aggregation. An aggregation returns count of unique terms and we can also use nested aggregations to get unique values on multiple fields as required in #2.
Another option is to use Script Filter support from ES described below.

Using Script Filter

Elasticsearch supports using custom scripts as filters. To handle the above use case we can use the script support as follows:
  • Define a custom script filter. For this discussion assume it is called AcceptDistinctDocumentScriptFilter
  • This custom filter takes in a list of primary keys as input.
  • These primary keys are the fields whose values will be used to determine uniqueness of records.
  • Now, instead of using aggregation we use normal search request and pass the custom script filter to the request.
  • If the search already has a filter\query criteria defined then append the custom filter using logical AND operator.
  • Following is example using pseudo syntax
    • if the request is: 
      • select * from myindex where file_hash = 'hash_value'
    • then append the custom filter as:   
      • select * from myindex where file_hash = 'hash_value' AND AcceptDistinctDocumentScriptFilter(params= ['file_name', 'file_folder'])

Custom filter implementation

  • The filter derives from org.elasticsearch.script.AbstractSearchScript 
  • override the run() method.
  • Also define the factory for creating the instance of the custom script.
  • From the factory pass the list of primary keys to the custom filter.
  • The run() method is called for every record matching the original filter\query criteria. 
  • We get the values for the primary keys from the current document. 
  • We concatenate the values to form a key to be used to determine if the document was already seen. For example, say, we use java Set<String>. 
  • If the set.add() returns true then it means the document is seen for first time and we return true from the run() method. 
  • However, if set.add() returns false then the document was already in set and hence we return false. Returning false will cause the filter\query criteria to fail and the record will be rejected.

Pros & Cons

  • Pros
    • Works with existing filter and hence on filtered data.
    • Is fast compared to using aggregation function.
    • Takes care of pagination i.e. pagination is applied on filtered unique results.
  • Cons
    • Since we are storing key values in the script memory usage will be more. But this should not be an issue when we use filter along with pagination.


Above approach will work only with single shard and not with multiple shards. For example, say there are 3 shards and each shard returns following unique values with page size as 3:
  • Shard 1: [A, B, C]
  • Shard 2: [P, B, R]
  • Shard 3: [X, C, Z]
Now when the results from all the shards are merged and sorted the result set would be [A, B, B, C, C, P, R, X, Z] and if we request for only 3 results in the page then we will get [A, B, B] i.e. duplicate results.
This can be handled by writing some ES plugin which hooks into the phase where results are merged from different shards. Next POC will be around this.

Source code


Even though currently above approach has limitation of multiple shards it is worth exploring this approach because of the performance gain seen when no. of documents are in millions. Also, another observation was around the field data (or the cached data). In case of aggregation, it seems like ES tries to load in memory almost all the fields specified in aggregation while the field data memory using filter is much low.

Part 2

Part 2 covers the approach to handle distributed search request.