Query Processing for NOSQL DB

The recent rise of NoSQL provides an alternative model in building extremely large scale storage system. Nevetheless, compare to the more mature RDBMS, NoSQL has some fundamental limitations that we need to be aware of.
  1. It calls for a more relaxed data consistency model
  2. It provides primitive querying and searching capability
There are techniques we can employ to mitigate each of these issue. Regarding the data consistency concern, I have discussed a number of design patterns in my previous blog to implement system with different strength of consistency guarantee.

Here I like to give myself a try to tackle the second issue.

So what is the problem ?

Many of the NoSQL DB today is based on the DHT (Distributed Hash Table) model, which provides a hashtable access semantics. To access or modify any object data, the client is required to supply the primary key of the object, then the DB will lookup the object using an equality match to the supplied key.

For example, if we use DHT to implement a customer DB, we can choose the customer id as the key. And then we can get/set/operate on any customer object if we know its id
  • cust_data = DHT.get(cust_id)
  • DHT.set(cust_id, modified_cust_data)
  • DHT.execute(cust_id, func(cust) {cust.add_credit(200)})
In the real world, we may want to search data based on other attributes than its primary key, we may also search attributes based on "greater/less than" relationship, or we may want to combine multiple search criteria using a boolean expression.

Using our customer DB example, we may do ...
  • Lookup customers within a zip code
  • Lookup customers whose income is above 200K
  • Lookup customer using keywords "chief executives"
Although query processing and indexing technique is pretty common in RDBMS world, it is seriously lacking in the NoSQL world because of the very nature of the "distributed architecture" underlying most of NoSQL DB.

It seems to me that the responsibility of building an indexing and query mechanism lands on the NoSQL user. Therefore, I want to explore some possible techniques to handle these.


Companion SQL-DB

A very straighforward approach is provide querying capability is to augment NoSQL with an RDBMS or TextDB (for keyword search). e.g. We add the metadata of the object into a RDBMS so we can query its metadata using standard SQL query.

Of course, this requires the RDBMS to be large enough to store the search-able attributes of each object. Since we only store the attributes required for search, rather than the whole object into the RDBMS, this turns out to be a very practical and common approach.


Scatter/Gather Local Search

Some of the NOSQL DB provides indexing and query processing mechanism within the local DB. In this case, we can have the query processor broadcast the query to every node in the DHT where a local search will be conducted with results sent back to the query processor which aggregates into a single response.

Notice that the search is happening in parallel across all nodes in the DHT.


Distributed B-Tree

B+Tree is a common indexing structure using in RDBMS. A distributed version of B+Tree can also be used in a DHT environment. The basic idea is to hash the search-able attribute to locate the root node of the B+ Tree. The "value" of the root node contains the id of its children node. So the client can then issue another DHT lookup call to find the children node. Continue this process, the client eventually navigate down to the leaf node, where the object id of the matching the search criteria is found. Then the client will issue another DHT lookup to extract the actual object.

Caution is needed when the B+Tree node is updated due to split/merge caused by object creation and deletion. This should be ideally done in an atomic fashion. This paper from Microsoft, HP and Toronto U describe a distributed transaction protocol to provide the required atomicity. Distributed transaction is an expensive operation but its uses here is justified because most of the B+ tree updates rarely involve more than a single machine.


Prefix Hash Table (distributed Trie)

Trie is an alternative data structure, where every path (from the root) contains the prefix of the key. Basically, every node in the Trie contains all the data whose key is prefixed by it. Berkeley and Intel research has a paper to describe this mechanism.

1. Lookup a key
To locate a particular key, we start with its one digit prefix and do a DHT lookup to see if we get a leaf node. If so, we search within this leaf node as we know the key must be contained inside. If it is not a leaf node, we extend the prefix with an extra digit and repeat the whole process again.
# Locate the key next to input key
def locate(key)
leaf = locate_leaf_node(key)
return leaf.locate(key)
end

# Locate leaf node containing input key
def locate_leaf_node(key)
for (i in 1 .. key.length)
node = DHT.lookup(key.prefix(n))
return node if node.is_leaf?
end
raise exception
end

2. Range Query
Perform a range query can be done by first locate the leaf node that contains the start key and then walk in the ascending order direction until we exceed the end key. Note that we can walk across a leaf node by following the leaf node chain.
def range(startkey, endkey) {
result = Array.new
leaf = locate_leaf_node(startkey)
while leaf != nil
result.append(leaf.range(startkey, endkey))
if (leaf.largestkey < endkey)
leaf = leaf.nextleaf
end
end
return result
end
To speedup the search, we can use a parallel search mechanism. Instead of walking from the start key in a sequential manner, we can find the common prefix of the start key and end key (as we know all the result is under its subtree) and perform a parallel search of the children leaf nodes of this subtree.

3. Insert and Delete keys
To insert a new key, we first identify the leaf node that contains the inserted key. If the leaf node has available capacity (less than B keys), then simply add it there. Otherwise, we need to split the leaf node into two branches and redistribute its existing keys to the newly created child nodes.

To delete a key, we similarly identify the leaf node that contains the deleted key and then remove it there. This may cause some of my parents to have less than B + 1 keys so I may need to merge some child nodes.


Combining Multiple Search Criteria

When we have multiple criteria in the search, each criteria may use a different index that resides within a different set of machines in the DHT. Multiple criterias can be combined using boolean operators such as OR / AND. Performing OR operation is very straightforward because we just need to union the results of each individual index search that is performed separately. On the other hand, performing AND operation is trickier because we need to deal with the situation that each individual criteria may have a large number of matches but their intersection is small. The challenge is: how can we efficiently perform an intersection between two potentially very large sets ?

One naive implementation is to send all matched object ids of each criteria to a server that performs the set intersection. If each data set is large, this approach may cause a large bandwidth consumption for sending across all the potential object ids.

A number of techniques are described here in this paper

1. Bloom Filter
Instead of sending the whole set of matched object id, we can send a more compact representation called "Bloom Filter". Bloom filter is a much more compact representation that can be used for testing set membership. The output has zero false negative, but has a chance of false positive p, which is controllable.


For minimizing bandwidth, we typically pick the one with the larger set as the sending machine and perform the intersection on the receiving machine who has the smaller set.

Notice that the false positive can actually be completely eliminated by sending the matched result of Set2 back to Set1 machine, which double check the membership of set1 again. In most cases, 100% precision is not needed and a small probability of false positive is often acceptable.

2. Caching
It is possible that certain search criteria is very popular and will be issued over and over again. The corresponding bloom filter of this hot spots can be cached in the receiving machine. Since the bloom filter has a small footprint, we can cache a lot of bloom filters of popular search criterias.

3. Incremental fetch
In case if the client doesn't need to get the full set of matched results, we can stream the data back to client using a cursor mode. Basically, at the sending side, set1 is sorted and broken into smaller chunks with a bloom filter computed and attached to each chunk. At the receiving side, every element of set2 is checked for every bloom filter per chunk.

Notice that we save computation at the sending side (compute the bloom filter for the chunk rather than the whole set1) at the cost of doing more at the receiving side (since we need to repeat the checking of the whole set2 for each chunk of set1). The assumption is that client only needs a small subset of all the matched data.

An optimization we can do is to mark the range of each chunk in set1 and ask set2 to skip the objects that falls within the same range.

Check out this stream