Routing Join in Siren Federate

The routing join is one of the join algorithms Federate has in its toolbox. It is useful when joining Elasticsearch documents by using their unique identifiers, the _id field. Such a use case is already achievable using our other join algorithms (hash join, broadcast join, and index join), but the routing join is specifically tailored for this scenario and potentially more efficient than the other implementations.

In this blog post we will illustrate the idea behind the routing join. You can use the new algorithm when joining Elasticsearch documents by their _id. It can boost join performance by reducing the amount of network transfers across an Elasticsearch cluster.

Using a practical example, we will first illustrate some of Elasticsearch’s join mechanisms and their limitations. Then, we will show how Federate’s joins overcome them and which other challenges they introduce. In particular, we will discuss the overhead coming from the network transfers that Federate needs to implement fully distributed join operations. Following, we will explain how the routing join algorithm can mitigate such overhead therefore reducing the join processing latency. Finally, we will discuss some of its limitations.

Example

Imagine we are monitoring the cellphones of some suspected criminals. We have a dataset of cellphones, each with its own phone number and the full name of the owner. Cellphones are also associated with the text messages that they received. Each message contains the phone number of the sender, the sending date, and the text content. Diagrammatically, a small portion of the dataset could look like this:

Police believe that a large amount of illegal drugs has been delivered at the city harbor during the last week. We want to investigate if any of our suspects is involved in this criminal activity. To this end, we might want to retrieve from our dataset all the phone numbers that, in the last week, received a message containing some sensitive keywords like ‚Äúharbor‚ÄĚ.

In Elasticsearch, we could try implementing this use case by using nested fields. First, we create a cellphones index as follows:

PUT /cellphones
{
  "mappings": {
    "properties": {
      "owner_fullname": {
        "type": "keyword"
      },
      "phonenumber": {
        "type": "keyword"
      },
      "messages": {
        "type": "nested",
        "properties": {
          "sender_phonenumber": {
            "type": "keyword"
          },
          "date": {
            "type": "date",
            "format": "dd/MM/yyyy HH:mm:ss"
          },
          "content": {
            "type": "text"
          }
        }
      }
    }
  }
}

Then we can index some documents (i.e., cell phones and messages):

POST cellphones/_doc/
{
  "owner_fullname": "Snake Jailbird",
  "phonenumber": "0010001122333",
  "messages": [
    {
      "sender_phonenumber": "0010002233444",
      "date": "20/10/2023 09:26:00",
      "content": "Did you have breakfast? Stay healthy. Love you, mom."
    },
    {
      "sender_phonenumber": "0010003311888",
      "date": "19/10/2023 23:10:00",
      "content": "The horse arrives at the harbor in 3hrs. Be there."
    }
  ]
}

POST cellphones/_doc/
{
  "owner_fullname": "Homer Simpson",
  "phonenumber": "0010001155666",
  "messages": [
    {
      "sender_phonenumber": "0010001234567",
      "date": "19/10/2023 20:01:00",
      "content": "Your pizza delivery just left the restaurant."
    }
  ]
}

Finally, we can search for cell phones that received a message mentioning ‚Äėharbor‚Äô in the last week:

GET /cellphones/_search
{
  "query": {
    "nested": {
      "path": "messages",
      "query": {
        "bool": {
          "must": [
            {
              "match": {
                "messages.content": "harbor"
              }
            },
            {
              "range": {
                "messages.date": {
                  "lte": "now/d",
                  "gte": "now-1w/d"
                }
              }
            }
          ]
        }
      }
    }
  }
}

Results will look like the following:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1.9227538,
    "hits": [
      {
        "_index": "cellphones",
        "_id": "BHTkYYsBCEVeAyBJs9gO",
        "_score": 1.9227538,
        "_source": {
          "owner_fullname": "Snake Jailbird",
          "phonenumber": "0010001122333",
          "messages": [
            {
              "sender_phonenumber": "0010002233444",
              "date": "20/10/2023 09:26:00",
              "content": "Did you have breakfast? Stay healthy. Love you, mom."
            },
            {
              "sender_phonenumber": "0010003311888",
              "date": "19/10/2023 23:10:00",
              "content": "The horse arrives at the harbor in 3hrs. Be there."
            }
          ]
        }
      }
    ]
  }
}

With nested fields, Elasticsearch implements the messages as nested documents that are part of the cellphones’ document they are associated with. This approach provides good data locality, leading to improved querying performance. At the same time, this can lead to scaling issues if a document is associated with too many nested ones, for instance if a cell phone receives too many messages in our example. In fact, nested documents must reside on the same computing node as the one they are part of. As a consequence, we cannot fully exploit Elasticsearch’s distributed nature when multiple computing nodes are available.

Additionally, this approach poses data duplication challenges. If a message is sent to multiple cell phones, it is not possible to store it only once, but it must be duplicated for each of the main documents that represent the receiving phones. If the message needs to be deleted or edited, like in modern instant messaging apps, it must be done for each main document. In the figure below, the message sent on 19/10/2023 at 23:10:00 would have to be updated in both Cellphone documents related to ‚ÄúSnake Jailbird‚ÄĚ and ‚ÄúSideshow Bob‚ÄĚ.

Moreover, data update is inefficient with nested fields as, every time we want to add, delete, or modify a nested document (i.e., a message in our case), we have to reindex the whole main document, which clearly comes with a performance overhead.

Alternatively to nested fields, Elasticsearch can also model relationships using join fields and has_parent / has_child queries. Differently from the nested fields, related entities can now be independently updated as they are represented by separate documents. However, related documents must still reside on the same index shards, leading to scalability issues similar to those of nested fields. ‚ÄúChild‚ÄĚ documents can only be linked to a single ‚Äúparent‚ÄĚ document, therefore not solving the data duplication problem we have already seen for nested fields. Finally, join fields and has_parent / has_child queries can only represent relationships between documents from the same index, i.e., documents that share the same mapping. It would be difficult to use them in our example where we need to connect different kinds of entities, like cell phones and messages. Other limitations are discussed in the Siren Federate documentation.

Federate joins

Federate joins solve the aforementioned challenges, extending Elasticsearch’s capabilities by implementing fully distributed joins. Federate can perform joins on documents from the same index or from different indices. Joins can be performed on common data type (e.g., integers, longs, keywords, etc.) fields without the need of nested or join fields, and common documents can be joined without relying on nested documents. This means that, differently from Elasticsearch’s nested documents, insertions/deletions/updates can now happen without affecting the other joined documents. Moreover, documents that are related to multiple ones can be indexed only once, solving the data duplication problem. Finally, joined documents do not need to reside on the same computing node, potentially leading to a better utilization of all the hardware resources of an Elasticsearch cluster than nested or join fields.

Assuming it has been installed on our Elasticsearch instance, let’s implement our example using Federate. First we create the cellphones and messages as separate indices as follows:

PUT /cellphones
{
  "mappings": {
    "properties": {
      "owner_fullname": {
        "type": "keyword"
      },
      "phonenumber": {
        "type": "keyword"
      }
    }
  }
}

PUT /messages
{
  "mappings": {
    "properties": {
      "from_phonenumber": {
        "type": "keyword"
      },
      "to_phonenumber": {
        "type": "keyword"
      },
      "date": {
        "type": "date",
        "format": "dd/MM/yyy HH:mm:ss"
      },
      "content": {
        "type": "text"
      }
    }
  }
}

Now, we index the cell phones:

POST cellphones/_doc/
{
  "owner_fullname": "Snake Jailbird",
  "phonenumber": "0010001122333"
}

POST cellphones/_doc/
{
  "owner_fullname": "Homer Simpson",
  "phonenumber": "0010001155666"
}

POST cellphones/_doc/
{
  "owner_fullname": "Sideshow Bob",
  "phonenumber": "0010007788999"
}

Then, we index the messages:

POST messages/_doc
{
  "from_phonenumber": "0010002233444",
  "to_phonenumber": "0010001122333",
  "date": "20/10/2023 09:26:00",
  "content": "Did you have lunch today? Stay healty. Love, mom"
}

POST messages/_doc
{
  "from_phonenumber": "0010003311888",
  "to_phonenumber": [
    "0010001122333",
    "0010007788999"
  ],
  "date": "19/10/2023 23:10:00",
  "content": "The horse will be at the harbor in 3hrs. Be there."
}

POST messages/_doc
{
  "from_phonenumber": "0010001234567",
  "to_phonenumber": "0010001155666",
  "date": "19/10/2023 20:01:00",
  "content": "Your pizza delivery just left the restaurant."
}

Notice how the message ‚ÄúThe horse will be at the harbor in 3hrs. Be there.‚ÄĚ only needs to be indexed once despite being sent to multiple numbers.

Finally, we search for cell phones that have received a message mentioning ‚Äėharbor‚Äô in the last week, using the Federate join DSL:

GET /siren/cellphones/_search

{
  "query": {
    "join": {
      "indices": [
        "messages"
      ],
      "on": [
        "phonenumber",
        "to_phonenumber"
      ],
      "request": {
        "query": {
          "bool": {
            "must": [
              {
                "match": {
                  "content": "harbor"
                }
              },
              {
                "range": {
                  "date": {
                    "lte": "now/d",
                    "gte": "now-1w/d"
                  }
                }
              }
            ]
          }
        }
      }
    }
  }
}

Notice how the messages’ to_phonenumber field acts as a foreign key for this join. The results will look like this:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 2,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "cellphones",
        "_id": "DHQ2YosBCEVeAyBJ6NiM",
        "_score": 1,
        "_source": {
          "owner_fullname": "Snake Jailbird",
          "phonenumber": "0010001122333"
        }
      },
      {
        "_index": "cellphones",
        "_id": "DnQ2YosBCEVeAyBJ9Nh4",
        "_score": 1,
        "_source": {
          "owner_fullname": "Sideshow Bob",
          "phonenumber": "0010007788999"
        }
      }
    ]
  },
  "planner": {
    "node": "bDpXqRZ_QQ2Nh7I2aW5MfA",
    "took_in_millis": 16,
    "timestamp": {
      "start_in_millis": 1698159857371,
      "stop_in_millis": 1698159857387,
      "took_in_millis": 16
    },
    "is_truncated": false
  }
}

As we have seen, Federate joins provide a better flexibility than Elasticsearch’s nested or join fields. Part of this enhanced flexibility comes from the fact that related documents must not be co-located anymore. This avoids document duplications and allows independent document updates. However, this greater flexibility also comes with a cost. Since related documents are not necessarily co-located anymore, it may be necessary to exchange them over the Elasticsearch’s cluster network in order to carry out the join operations. Network transfers can be time consuming, taking more time as more data needs to be exchanged.

In the next section, we briefly see how Federate join algorithms exchange data and how the new routing join algorithm can reduce network transfer to improve join processing time.

How Federate’s join algorithms work

In order to achieve fast search, it is a popular solution to deploy Elasticsearch on a cluster of multiple computing nodes. The indices’ documents can be partitioned into multiple shards, and each index shard is potentially hosted by a different computing node. Search can then be performed in a distributed manner, with each node independently processing its shards, hence achieving quick response times.

Federate leverages this distributed infrastructure to efficiently perform join operations, as illustrated in the following picture. Assume we want to join the documents of child set Index A with those of parent set Index B. Federate can carry out this same operation according to different join algorithms.

In the case of the broadcast join, all the values from the child set must be uploaded to the computing nodes hosting the parent set’s documents. There, an in-memory hash table is built over the child data. The doc values of the parent set are scanned, and for each, the doc values are probed against the hash table to find the join matches. The index join (not shown in the diagram) works in a similar fashion. But instead of building a hash table of the child values, those are used to probe the parent set dictionary. Differently, the hash join partitions and shuffles both the parent and the child set over the whole computing cluster. Then each computing node locally performs the join on the data it received, again building and probing a hash table.

Introducing the routing join algorithm

The routing join is a new algorithm that can be used in those situations where we want to join a parent and a child set on the _id of the parent set of documents. The _id is an Elasticsearch metafield that uniquely identifies a document within an index. For partitioned indices, Elasticsearch also uses the _id value to decide on which index shard the document must be stored.

Similarly to the broadcast join, to perform the join, the routing join algorithm has to upload the values from the child set (e.g., Index A in the previous picture) to the computing nodes hosting the parent set (e.g., Index B). There an in-memory hash table is built over the child data. The dictionary of _ids for the parent set of documents is scanned. Each value is probed against the hash table to find matches.

The difference between the broadcast and the routing join is that the broadcast join will upload all the data from the child set to all the nodes hosting the parent set. Differently, the routing join will upload the child data only to those nodes hosting those parent index shards that can contain a join match. This is possible because the join happens on the _id field of the parent relation, which Elasticsearch uses to decide on which index shard a document must be stored. Using the same strategy, the routing join algorithm decides on which computing nodes a child value must be uploaded to locally compute the results of the join operation. Since each document is uploaded to only some of the nodes, as opposed to all nodes, this potentially reduces time-consuming network transfers, hence speeding up the processing of join queries.

The routing join has some advantages over the other join algorithms. As we just illustrated, the routing join incurs in a smaller network overhead as it has to upload less data. Moreover, the routing join has a smaller memory footprint than the broadcast join. Since each computing node is receiving only a portion of the child set data, it also builds smaller hash tables. On the contrary, with the broadcast join, each computing node has to create a hash table for the whole child set. You can learn more about the specific advantages and disadvantages of each join algorithm in the documentation.

Going back to our example, we can use the routing join algorithm to join between cell phones and messages containing sensitive keywords. Unfortunately, the cell phones _ids are strings randomly generated by Elasticsearch. These are useful to uniquely identify a document in an index but convey no particular meaning. In contrast, a cell phone number is a meaningful piece of information, and it uniquely identifies the phone. Therefore, we can use the phone number as the _id of a cell phone document.

Let’s re-index our cell phone from scratch, first by deleting and re-creating the index as follows:

DELETE /cellphones

PUT /cellphones
{
  "mappings": {
    "properties": {
      "owner_fullname": {
        "type": "keyword"
      },
      "phonenumber": {
        "type": "keyword"
      }
    }
  }
}

Then re-inserting the cell phones using their numbers as _id, by including the phone numbers as part of the POST requests:

POST cellphones/_doc/0010001122333
{
  "owner_fullname": "Snake Jailbird",
  "phonenumber": "0010001122333"
}

POST cellphones/_doc/0010001155666
{
  "owner_fullname": "Homer Simpson",
  "phonenumber": "0010001155666"
}

POST cellphones/_doc/0010007788999
{
  "owner_fullname": "Sideshow Bob",
  "phonenumber": "0010007788999"
}

Now imagine that the documents corresponding to the cell phones of ‚ÄúHomer Simpson‚ÄĚ, ‚ÄúSnake Jailbird‚ÄĚ, and ‚ÄúSideshow Bob‚ÄĚ are each stored on different shards hosted by different computing nodes. Thanks to the routing join algorithm, when joining the data, the message ‚ÄúThe horse arrives at the harbor in 3hrs. Be there.‚ÄĚ now only needs to be uploaded to the computing nodes hosting the documents for the cell phones of ‚ÄúSnake Jailbird‚ÄĚ and ‚ÄúSideshow Bob‚ÄĚ, the message‚Äôs recipients. As already mentioned, this is possible as the routing join algorithm is aware of how documents have been stored on different shards using the _id value, which is now the phone number. Therefore, messages can be routed to shards in a similar manner using the messages‚Äô to_phonenumber field that acts as a foreign key.

Using the routing join algorithm, the join query will look like the following:

GET /siren/cellphones/_search
{
  "query": {
    "join": {
      "indices": [
        "messages"
      ],
      "on": [
        "_id",
        "to_phonenumber"
      ],
      "type": "ROUTING_JOIN",
      "request": {
        "query": {
          "bool": {
            "must": [
              {
                "match": {
                  "content": "harbor"
                }
              },
              {
                "range": {
                  "date": {
                    "lte": "now/d",
                    "gte": "now-1w/d"
                  }
                }
              }
            ]
          }
        }
      }
    }
  }
}

Notice how we are now joining on the cellphones’ _id fields instead of the phonenumber fields. Of course, the content of the results does not change so we are not repeating them here.

For the sake of this example, we explicitly selected the routing join algorithm using the type field from Federate’s query DSL. This ensures that the routing join will be used, but it is not strictly necessary. If you leave the type field empty, Federate’s planner will automatically select the most appropriate joining algorithm. The planner will automatically select the routing join algorithm if you are joining on the parent set’s _id and the planner deems the routing join to be more efficient than the other alternatives.

Limitations

While the routing join can provide a performance boost over the broadcast and the hash join, it also has some limitations.

The routing join can only be used to join on the parent set’s _id, and this requires that the user carefully models their data. We recommend explicitly setting a meaningful _id value when indexing a document, rather than letting Elasticsearch automatically generate one. Such identifiers must then be used as a foreign key in the child set. We believe that modelling data in this way is not difficult. Identifiers are a popular and well understood concept in other database systems, plus they exist in the real world: in our example, this was the case of phone numbers. If possible, another recommendation is to use _id values that are numeric strings (like incremental integer identifiers, or phone numbers, etc.). These are more efficiently encoded and decoded by Elasticsearch, hence leading to smaller join processing times.

Another limitation of the routing joins is regarding the index settings. For the routing join to be effective, the parent set index must be partitioned into more than one shard. In case of indices with a single shard, the algorithm behaves exactly like the broadcast join thus providing no benefits. The same happens when joining on multiple parent set indices, which is frequent with index patterns (e.g., cellphones-*). In such a case, the routing join may have to send the same data from the child to multiple computing nodes, if they host different shards from different parent indices. On the contrary, the routing join shines when joining on a single parent index, partitioned into many shards, each hosted by a different computing node.

The routing join must not be selected when the parent set’s indices use custom routing as this can result in incorrect results. When using custom routing, the best practice is to declare the routing value as required in the index mapping, as per Elasticsearch’s documentation. Following this best practice from Elasticsearch will also prevent incorrect join results from being generated inadvertently by Federate. If this best practice is followed, Federate will prompt users with a meaningful error message when they select the routing join algorithm despite the custom routing on the parent set’s indices. Additionally, Federate’s query planner will never select the routing join in case of a custom routing being used on the parent set’s indices. You can find other limitations in the documentation.

Conclusions

The routing join algorithm adds to the many joins already available in Federate. Compared to our other implementations, the routing join can reduce network transfers in order to speed up the join response time. This is made possible by exploiting Elasticsearch’s document routing.

The routing join is ideal when joining a parent set of documents which represent entities that naturally possess unique identifiers like phone numbers, car plates, tax ID codes, and so on. These must be used as values for the _id metafield of the documents. Child set documents must use the same values as foreign keys, by storing them in some of their fields.

The routing join algorithm is a useful new tool for analysts to speed up their investigations. Analysts do not have to worry to explicitly select it. Federate’s planner can automatically select the routing join for them, when it is deemed to be the most efficient option to carry out a join operation and provided that we are joining on the _id field of the parent set.

4 Likes