Query Processing for NOSQL DB

The recent rise of NoSQL provides an alternative model in building extremely large scale storage system. Nevetheless, compare to the more mature RDBMS, NoSQL has some fundamental limitations that we need to be aware of.
  1. It calls for a more relaxed data consistency model
  2. It provides primitive querying and searching capability
There are techniques we can employ to mitigate each of these issue. Regarding the data consistency concern, I have discussed a number of design patterns in my previous blog to implement system with different strength of consistency guarantee.

Here I like to give myself a try to tackle the second issue.

So what is the problem ?

Many of the NoSQL DB today is based on the DHT (Distributed Hash Table) model, which provides a hashtable access semantics. To access or modify any object data, the client is required to supply the primary key of the object, then the DB will lookup the object using an equality match to the supplied key.

For example, if we use DHT to implement a customer DB, we can choose the customer id as the key. And then we can get/set/operate on any customer object if we know its id
  • cust_data = DHT.get(cust_id)
  • DHT.set(cust_id, modified_cust_data)
  • DHT.execute(cust_id, func(cust) {cust.add_credit(200)})
In the real world, we may want to search data based on other attributes than its primary key, we may also search attributes based on "greater/less than" relationship, or we may want to combine multiple search criteria using a boolean expression.

Using our customer DB example, we may do ...
  • Lookup customers within a zip code
  • Lookup customers whose income is above 200K
  • Lookup customer using keywords "chief executives"
Although query processing and indexing technique is pretty common in RDBMS world, it is seriously lacking in the NoSQL world because of the very nature of the "distributed architecture" underlying most of NoSQL DB.

It seems to me that the responsibility of building an indexing and query mechanism lands on the NoSQL user. Therefore, I want to explore some possible techniques to handle these.


Companion SQL-DB

A very straighforward approach is provide querying capability is to augment NoSQL with an RDBMS or TextDB (for keyword search). e.g. We add the metadata of the object into a RDBMS so we can query its metadata using standard SQL query.

Of course, this requires the RDBMS to be large enough to store the search-able attributes of each object. Since we only store the attributes required for search, rather than the whole object into the RDBMS, this turns out to be a very practical and common approach.


Scatter/Gather Local Search

Some of the NOSQL DB provides indexing and query processing mechanism within the local DB. In this case, we can have the query processor broadcast the query to every node in the DHT where a local search will be conducted with results sent back to the query processor which aggregates into a single response.

Notice that the search is happening in parallel across all nodes in the DHT.


Distributed B-Tree

B+Tree is a common indexing structure using in RDBMS. A distributed version of B+Tree can also be used in a DHT environment. The basic idea is to hash the search-able attribute to locate the root node of the B+ Tree. The "value" of the root node contains the id of its children node. So the client can then issue another DHT lookup call to find the children node. Continue this process, the client eventually navigate down to the leaf node, where the object id of the matching the search criteria is found. Then the client will issue another DHT lookup to extract the actual object.

Caution is needed when the B+Tree node is updated due to split/merge caused by object creation and deletion. This should be ideally done in an atomic fashion. This paper from Microsoft, HP and Toronto U describe a distributed transaction protocol to provide the required atomicity. Distributed transaction is an expensive operation but its uses here is justified because most of the B+ tree updates rarely involve more than a single machine.


Prefix Hash Table (distributed Trie)

Trie is an alternative data structure, where every path (from the root) contains the prefix of the key. Basically, every node in the Trie contains all the data whose key is prefixed by it. Berkeley and Intel research has a paper to describe this mechanism.

1. Lookup a key
To locate a particular key, we start with its one digit prefix and do a DHT lookup to see if we get a leaf node. If so, we search within this leaf node as we know the key must be contained inside. If it is not a leaf node, we extend the prefix with an extra digit and repeat the whole process again.
# Locate the key next to input key
def locate(key)
leaf = locate_leaf_node(key)
return leaf.locate(key)
end

# Locate leaf node containing input key
def locate_leaf_node(key)
for (i in 1 .. key.length)
node = DHT.lookup(key.prefix(n))
return node if node.is_leaf?
end
raise exception
end

2. Range Query
Perform a range query can be done by first locate the leaf node that contains the start key and then walk in the ascending order direction until we exceed the end key. Note that we can walk across a leaf node by following the leaf node chain.
def range(startkey, endkey) {
result = Array.new
leaf = locate_leaf_node(startkey)
while leaf != nil
result.append(leaf.range(startkey, endkey))
if (leaf.largestkey < endkey)
leaf = leaf.nextleaf
end
end
return result
end
To speedup the search, we can use a parallel search mechanism. Instead of walking from the start key in a sequential manner, we can find the common prefix of the start key and end key (as we know all the result is under its subtree) and perform a parallel search of the children leaf nodes of this subtree.

3. Insert and Delete keys
To insert a new key, we first identify the leaf node that contains the inserted key. If the leaf node has available capacity (less than B keys), then simply add it there. Otherwise, we need to split the leaf node into two branches and redistribute its existing keys to the newly created child nodes.

To delete a key, we similarly identify the leaf node that contains the deleted key and then remove it there. This may cause some of my parents to have less than B + 1 keys so I may need to merge some child nodes.


Combining Multiple Search Criteria

When we have multiple criteria in the search, each criteria may use a different index that resides within a different set of machines in the DHT. Multiple criterias can be combined using boolean operators such as OR / AND. Performing OR operation is very straightforward because we just need to union the results of each individual index search that is performed separately. On the other hand, performing AND operation is trickier because we need to deal with the situation that each individual criteria may have a large number of matches but their intersection is small. The challenge is: how can we efficiently perform an intersection between two potentially very large sets ?

One naive implementation is to send all matched object ids of each criteria to a server that performs the set intersection. If each data set is large, this approach may cause a large bandwidth consumption for sending across all the potential object ids.

A number of techniques are described here in this paper

1. Bloom Filter
Instead of sending the whole set of matched object id, we can send a more compact representation called "Bloom Filter". Bloom filter is a much more compact representation that can be used for testing set membership. The output has zero false negative, but has a chance of false positive p, which is controllable.


For minimizing bandwidth, we typically pick the one with the larger set as the sending machine and perform the intersection on the receiving machine who has the smaller set.

Notice that the false positive can actually be completely eliminated by sending the matched result of Set2 back to Set1 machine, which double check the membership of set1 again. In most cases, 100% precision is not needed and a small probability of false positive is often acceptable.

2. Caching
It is possible that certain search criteria is very popular and will be issued over and over again. The corresponding bloom filter of this hot spots can be cached in the receiving machine. Since the bloom filter has a small footprint, we can cache a lot of bloom filters of popular search criterias.

3. Incremental fetch
In case if the client doesn't need to get the full set of matched results, we can stream the data back to client using a cursor mode. Basically, at the sending side, set1 is sorted and broken into smaller chunks with a bloom filter computed and attached to each chunk. At the receiving side, every element of set2 is checked for every bloom filter per chunk.

Notice that we save computation at the sending side (compute the bloom filter for the chunk rather than the whole set1) at the cost of doing more at the receiving side (since we need to repeat the checking of the whole set2 for each chunk of set1). The assumption is that client only needs a small subset of all the matched data.

An optimization we can do is to mark the range of each chunk in set1 and ask set2 to skip the objects that falls within the same range.

Input/Output with files

The following is a simple program to show reading, writing and appending to files. You can read more about this feature at cplusplus.com.




//Program tested on Microsoft Visual Studio 2008 - Zahid Ghadialy
#include <iostream>
#include <fstream>
#include <string>

using namespace
std;

int
main()
{

string line;
cout<<"\nFile 1 operations - Writing to Hello.txt"<<endl;
ofstream file1; //OUTPUT file Stream
file1.open("Hello.txt"); //Open the file
file1 << "Writing something something to a file"<<endl;
file1 << "This is enough for the time being"<<endl;

if
(file1.is_open()) //Check if open
{
/*
while (!file1.eof())
{
getline (file1,line); - NOT POSSIBLE because of ofstream
cout << line << endl;
}
*/

}

file1.close(); //Close file1

cout<<"\nFile 2 operations - Reading from Hello.txt"<<endl;
ifstream file2; //INPUT file stream
file2.open("Hello.txt"); //Open the file
if(file2.is_open()) //Check if open
{
while
(!file2.eof())
{

getline (file2,line);
cout << line << endl;
}
}

file2.close(); //Close file2


cout<<"\nFile 3 operations - Appending and Reading from Hello.txt"<<endl;
fstream file3; //INPUT file stream
file3.open("Hello.txt", ios::in | ios::out); //Open the file
if(file3.is_open()) //Check if open
{
//We want to put a new line at the end of the file
file3.seekp(0, ios::end);
file3 << "This is new line added file 3"<<endl;
//Now reset the file pointer to the start of the file
file3.seekg(0, ios::beg);
while
(!file3.eof())
{

getline (file3,line);
cout << line << endl;
}
}

file3.close(); //Close file3

return
0;
}








The output is as follows:

Increment And Decrement Operator : prefix and postfix

Increment And Decrement Operator in C++

In C++ Programming Language, increasing a value by 1 is called incrementing and decreasing value by 1 is called decrementing. As 1 is the most common value used to add, subtract and to reassign into variable. Although we have short-hand assingnment operator but special operator are provided in c++ programming to increase or decrease value by 1. The increment operator(++) increases the variable's value by 1 and the decrement operator(--) decreases the value by 1.

A C++ Program example that demonstrate the use increment and decrement operator by comparing it with short hand assignment operator.


#include <iostream>

int main ()
{

    using std::cout;
    using std::endl;

    int b = 4, c = 7;

    cout << "The value of b is : " << b << endl;
    b = b + 1;
    cout << "The value of b is : " << b << endl;
    b += 1;
    cout << "The value of b is : " << b << endl;
    b++;
    cout << "The value of b is : " << b << endl;

    cout << endl;

    cout << "The value of c is : " << c << endl;
    c = c - 1;
    cout << "The value of c is : " << c << endl;
    c -= 1;
    cout << "The value of c is : " << c << endl;
    c--;
    cout << "The value of c is : " << c << endl;

    return 0;
}


Prefixing And Postfixing In Increment And Decrement Operator.

Both increment (++) and decrement (--) operator come in two varieties : prefix and postfix. In prefix the increment or decrement operator is written before the variable's name (++a or --a) and in postfix the increment or decrement operator is written after the variable's name (a++ or a--).

In simple statement it doesn't matter what you choose. But it differs in complex statement in which you increment or decrement value and also assign it to a variable in single statement. The prefix operator is evaluated before the assignment. The postfix opertor is evaluated after the assignment. Follwing example makes it clear.

A C++ Program example that demonstrate the similarities and difference between prefix and postfix operator.


#include <iostream>

int main ()
{

    using std::cout;
    using std::endl;

    int a = 10;

    a++;
    cout << "The value of a is : " << a << endl;
    ++a;
    cout << "The value of a is : " << a << endl;
    cout << "The value of a is : " << a++ << endl;
    cout << "The value of a is : " << ++a << endl;

    cout << endl;

    int b = 3;
    a = b++;
    cout << "The value of a is : " << a << endl;
    cout << "The value of b is : " << b << endl;
    a = ++b;
    cout << "The value of a is : " << a << endl;
    cout << "The value of b is : " << b << endl;

    return 0;
}


Another C++ Program example that demonstrate the use of postfix and prefix operator.


#include <iostream>

int main ()
{

    using namespace std;

    int age = 18;

    cout << "I was " << age++ << " years old." << endl;
    cout << "Now I am " << age << " years old." << endl;
    cout << "One year passes...." << endl;
    cout << "I am " << ++age << " years old." << endl;

    return 0;
}

Short Hand Assignment Operator in C++

Short Hand Assignment Operator in C++ Programming

Short hand assignemnt operators are also known as compound assignment operator. The advantage of using short hand assignment operator is that it requires less typing and hence provides efficiency.

A C++ Program example without using short hand assignment operator.


#include <iostream>

int main ()
{

    using std::cout;
    using std::endl;

    int a = 3;
    cout << "Value of a is : "<< a << endl;

    a = a + 1;
    cout << "Value of a is : "<< a << endl;

    a = a - 1;
    cout << "Value of a is : "<< a << endl;

    a = a * 2;
    cout << "Value of a is : " << a << endl;

    a = a / 2;
    cout << "Value of a is : " << a << endl;

    a = a % 2;
    cout << "Value of a is : " << a << endl;

    return 0;
}


A C++ Program example that uses short hand assignment operator


#include <iostream>

int main ()
{

    using std::cout;
    using std::endl;

    int a = 3;
    cout << "Value of a is : " << a << endl;

    a += 1;
    cout << "Value of a is : " << a << endl;

    a -= 1;
    cout << "Value of a is : " << a << endl;

    a *= 2;
    cout << "Value of a is : " << a << endl;

    a /= 2;
    cout << "Value of a is : " << a << endl;

    a %= 2;
    cout << "Value of a is : " << a << endl;

    return 0;
}


Note :

From the above two example it is clear that statement
a = a + 1 is same as a += 1 and
a = a - 1 is same as a -= 1 and
a = a * 1 is same as a *= 1 and
a = a / 1 is same as a /= 1 and
a = a % 1 is same as a %= 1

Cloud Computing Patterns

I have attended a presentation by Simon Guest from Microsoft on their cloud computing architecture. Although there was no new concept or idea introduced, Simon has provided an excellent summary on the major patterns of doing cloud computing.

I have to admit that I am not familiar with Azure and this is my first time hearing a Microsoft cloud computing presentation. I felt Microsoft has explained their Azure platform in a very comprehensible way. I am quite impressed.

Simon talked about 5 patterns of Cloud computing. Let me summarize it (and mix-in a lot of my own thoughts) ...

1. Use Cloud for Scaling
The key idea is to spin up and down machine resources according to workload so the user only pay for the actual usage. There is two types of access patterns: passive listener model and active worker model.

Passive listener model uses a synchronous communication pattern where the client pushes request to the server and synchronously wait for the processing result.
In the passive listener model, machine instances are typically sit behind a load balancer. To scale the resource according to the work load, we can use a monitor service that send NULL client request and use the measured response time to spin up and down the size of the machine resources.

On the other hand, Active worker model uses an asynchronous communication patterns where the client put the request to a queue, which will be periodically polled by the server. After queuing the request, the client will do some other work and come back later to pickup the result. The client can also provide a callback address where the server can push the result into after the processing is done.
In the active worker model, the monitor can measure the number of requests sitting in the queue and use that to determine whether machine instances (at the consuming end) need to be spin up or down.


2. Use Cloud for Multi-tenancy
Multi-tenancy is more a SaaS provider (rather than an enterprise) usage scenario. The key idea is to use the same set of code / software to host the application for different customers (tenants) who may have slightly different requirement in
  • UI branding
  • Business rules for decision criteria
  • Data schema
The approach is to provide sufficient "customization" capability for their customer. The most challenging part is to determine which aspects should be opened for customization and which shouldn't. After identifying these configurable parameters, it is straightforward to define configuration metadata to capture that.

3. Use Cloud for Batch processing
This is about running things like statistics computation, report generation, machine learning, analytics ... etc. These task is done in batch mode and so it is more economical to use the "pay as you go" model. On the other hand, batch processing has very high tolerance in latency and so is a perfect candidate of running in the cloud.
Here is an example of how to run Map/Reduce framework in the cloud. Microsoft hasn't provided a Map/Reduce solution at this moment but Simon mentioned that Dryad in Microsoft research may be a future Microsoft solution. Interestingly, Simon also recommended Hadoop.

Of course, one challenge is how to move the data from the cloud in the first place. In my earlier blog, I have describe some best practices on this.

4. Use Cloud for Storage
The idea of storing data into the cloud and no need to worry about DBA tasks. Most cloud vendor provide large scale key/value store as well as RDBMS services. Their data storage services will also take care of data partitioning, replication ... etc. Building cloud storage is a big topic involving many distributed computing concepts and techniques, I have covered it in a separate blog.

5. Use Cloud for Communication
A queue (or mailbox) service provide a mechanism for different machines to communicate in an asynchronous manner via message passing.

Azure also provide a relay service in the cloud which is quite useful for machines behind different firewall to communicate. In a typical firewall setup, incoming connection is not allowed so these machine cannot directly establish a socket to each other. In order for them to communicate, each need to open an on-going socket connection to the cloud relay, which will route traffic between these connections.

I have used the same technique in a previous P2P project where user's PC behind their firewall need to communicate, and I know this relay approach works very well.

Impression on Scala

I have been hearing quite a lot of good comments about the Scala programming language. I personally use Java extensively in the past and have switched to Ruby (and some Erlang) in last 2 years. The following features that I heard about Scala really attracts me ...
  • Scala code is compiled in Java byte code and run natively in JVMs. Code written in Scala immediately enjoy the performance and robustness of Java VM technologies.
  • Easy to integrate with Java code and libraries, immediately enjoy the wide portfolio of exiting Java libraries.
  • It has good support to the Actor model, which I believe is an important programming paradigm for multi-core machine architecture.
So I decide to take a Scala tutorial from Dean Wampler today in the Qcon conference. This is a summary of my impression on Scala after the class.

First of all, Scala is a strongly typed language. However it has a type inference mechanism so you don't have to type declaration is optional. But in some place (like a method signature), type declaration is mandatory. It is not very clear to me when I have to declare a type.

Having the "val" and "var" declaration in variables is very nice because it makes immutability explicit. In Ruby, you can make an object immutable by sending it a freeze() method but Scala do this more explicitly.

But I found it confusing to have a method define in 2 different ways

class A() {
def hello {
...
}
}
class A() {
def hello = {
...
}
}
The MyFunction[+A1, -A2] is really confusing to me. I feel the typeless language is much more easy.

Removing the open and close bracket is also causing a lot of confusion to me.
class Person(givenName: String) {
var myName = givenName
def name =(anotherName: String) = {
myName = anotherName
}
}

class Person(givenName: String) {
var myName = givenName
def name =(anotherName: String) = myName = anotherName
}
The special "implicit" conversion method provides a mechanism to develop DSL (Domain Specific Language) in Scala but it also looks very odd to me. Basically, you need to import a SINGLE implicit conversion method that needs to take care of all possible conversions.

All the method that ends with ":" has a reverse calling order is also an odd stuff to me.

Traits provides mixins for Scala but I feel the "Module" mechanism in Ruby has done a better job.

Scala has the notion of "function" and can pass "function" as parameters. Again, I feel Ruby blocks has done a better job.

Perhaps due to JVM's limitation of supporting a dynamic language, Scala is not very strong in doing meta-programming, Scala doesn't provide the "open class" property where you can modify an existing class (add methods, change method implementation, add class ... etc.) at run time

Scala also emulate a number of Erlang features but I don't feel it is doing a very clean job. For example, it emulate the pattern matching style of Erlang programming using the case Class and unapply() method but it seems a little bit odd to me.

Erlang has 2 cool features which I couldn't find in Scala (maybe I am expecting too much)
  • The ability to run two version of class at the same time
  • Able to create and pass function objects to a remote process (kinda like a remote code loading)
Overall impression

I have to admit that my impression on Scala is not as good as before I attend the tutorial. Scala tries to put different useful programming paradigm in the JVM but I have a feeling of force-fit. Of course its close tie to JVM is still a good reason to use Scala. But from a pure programming perspective, I will prefer to use a combination of Ruby and Erlang, rather than Scala.

C String and C++ Strings: Similarities and Differences

I came across this scenario recently when a C style string (as some people refer to char[] as) had to be compared to a C++ string and even though this is straightforward, i fell in the same trap as a lot of people about forgetting that there is a Nul charachter as the end of C style string. Anyway, here is an example to demonstrate both the strings.




//Program tested on Microsoft Visual Studio 2008 - Zahid Ghadialy
//Program to demonstrate length and size of char* and string
//and to show similarity and difference between them
#include<iostream>

using namespace
std;

int
main()
{

cout<<endl;
char
a[10]="Hello";
cout<<"Size of a = "<<sizeof(a)<<endl;
a[5]='\0'; //Terminating the C string in between with Nul character
cout<<"Size of a = "<<sizeof(a)<<endl;

cout<<endl;
char
b[]="Hello";
cout<<"Size of b = "<<sizeof(b)<<endl; //Note the size
//Automatic Nul charachter added

cout<<endl;
string c("Hello");
cout<<"Size of c = "<<sizeof(c)<<endl;
cout<<"Length of c = "<<c.length()<<endl;

c += " Zahid";
cout<<"Size of modified c = "<<sizeof(c)<<endl;
cout<<"Length of modified c = "<<c.length()<<endl;

//Add an extra NULL charachter
cout<<endl;
char
d[]="Hello\0";
cout<<"Size of d = "<<sizeof(d)<<endl;
for
(int i = 0; i < sizeof(d); i++)
{

cout<<"d["<<i<<"] = "<<d[i]<<endl;
}


//When is a char[] similar to string?
cout<<endl;
char
e[]="Zahid";
string f = "Zahid\0";
const
char* temp = f.c_str();
if
(strcmp(e , temp) == 0)
{

cout<<"e == f"<<endl;
}

else

{

cout<<"e != f"<<endl;
}


return
0;
}







The output is as follows:

NOSQL Patterns

Over the last couple years, we see an emerging data storage mechanism for storing large scale of data. These storage solution differs quite significantly with the RDBMS model and is also known as the NOSQL. Some of the key players include ...
These solutions has a number of characteristics in common
  • Key value store
  • Run on large number of commodity machines
  • Data are partitioned and replicated among these machines
  • Relax the data consistency requirement. (because the CAP theorem proves that you cannot get Consistency, Availability and Partitioning at the the same time)
The aim of this blog is to extract the underlying technologies that these solutions have in common, and get a deeper understanding on the implication to your application's design. I am not intending to compare the features of these solutions, nor to suggest which one to use.


API model

The underlying data model can be considered as a large Hashtable (key/value store).

The basic form of API access is
  • get(key) -- Extract the value given a key
  • put(key, value) -- Create or Update the value given its key
  • delete(key) -- Remove the key and its associated value
More advance form of API allows to execute user defined function in the server environment
  • execute(key, operation, parameters) -- Invoke an operation to the value (given its key) which is a special data structure (e.g. List, Set, Map .... etc).
  • mapreduce(keyList, mapFunc, reduceFunc) -- Invoke a map/reduce function across a key range.

Machines layout

The underlying infratructure is composed of large number (hundreds or thousands) of cheap, commoditized, unreliable machines connected through a network. We call each machine a physical node (PN). Each PN has the same set of software configuration but may have varying hardware capacity in terms of CPU, memory and disk storage. Within each PN, there will be a variable number of virtual node (VN) running according to the available hardware capacity of the PN.


Data partitioning (Consistent Hashing)

Since the overall hashtable is distributed across many VNs, we need a way to map each key to the corresponding VN.

One way is to use
partition = key mod (total_VNs)

The disadvantage of this scheme is when we alter the number of VNs, then the ownership of existing keys has changed dramatically, which requires full data redistribution. Most large scale store use a "consistent hashing" technique to minimize the amount of ownership changes.


In the consistent hashing scheme, the key space is finite and lie on the circumference of a ring. The virtual node id is also allocated from the same key space. For any key, its owner node is defined as the first encountered virtual node if walking clockwise from that key. If the owner node crashes, all the key it owns will be adopted by its clockwise neighbor. Therefore, key redistribution happens only within the neighbor of the crashed node, all other nodes retains the same set of keys.


Data replication

To provide high reiability from individually unreliable resource, we need to replicate the data partitions.

Replication not only improves the overall reliability of data, it also helps performance by spreading the workload across multiple replicas.


While read-only request can be dispatched to any replicas, update request is more challenging because we need to carefully co-ordinate the update which happens in these replicas.

Membership Changes

Notice that virtual nodes can join and leave the network at any time without impacting the operation of the ring.

When a new node joins the network
  1. The joining node announce its presence and its id to some well known VNs or just broadcast)
  2. All the neighbors (left and right side) will adjust the change of key ownership as well as the change of replica memberships. This is typically done synchronously.
  3. The joining node starts to bulk copy data from its neighbor in parallel asynchronously.
  4. The membership change is asynchronously propagate to the other nodes.

Notice that other nodes may not have their membership view updated yet so they may still forward the request to the old nodes. But since these old nodes (which is the neighbor of the new joined node) has been updated (in step 2), so they will forward the request to the new joined node.

On the other hand, the new joined node may still in the process of downloading the data and not ready to serve yet. We use the vector clock (described below) to determine whether the new joined node is ready to serve the request and if not, the client can contact another replica.

When an existing node leaves the network (e.g. crash)
  1. The crashed node no longer respond to gossip message so its neighbors knows about it.
  2. The neighbor will update the membership changes and copy data asynchronously

We haven't talked about how the virtual nodes is mapped into the physical nodes. Many schemes are possible with the main goal that Virtual Node replicas should not be sitting on the same physical node. One simple scheme is to assigned Virtual node to Physical node in a random manner but check to make sure that a physical node doesn't contain replicas of the same key ranges.

Notice that since machine crashes happen at the physical node level, which has many virtual nodes runs on it. So when a single Physical node crashes, the workload (of its multiple virtual node) is scattered across many physical machines. Therefore the increased workload due to physical node crashes is evenly balanced.


Client Consistency

Once we have multiple copies of the same data, we need to worry about how to synchronize them such that the client can has a consistent view of the data.

There is a number of client consistency models
  1. Strict Consistency (one copy serializability): This provides the semantics as if there is only one copy of data. Any update is observed instantaneously.
  2. Read your write consistency: The allows the client to see his own update immediately (and the client can switch server between requests), but not the updates made by other clients
  3. Session consistency: Provide the read-your-write consistency only when the client is issuing the request under the same session scope (which is usually bind to the same server)
  4. Monotonic Read Consistency: This provide the time monotonicity guarantee that the client will only see more updated version of the data in future requests.
  5. Eventual Consistency: This provides the weakness form of guarantee. The client can see an inconsistent view as the update are in progress. This model works when concurrent access of the same data is very unlikely, and the client need to wait for some time if he needs to see his previous update.

Depends on which consistency model to provide, 2 mechanisms need to be arranged ...
  • How the client request is dispatched to a replica
  • How the replicas propagate and apply the updates
There are various models how these 2 aspects can be done, with different tradeoffs.

Master Slave (or Single Master) Model

Under this model, each data partition has a single master and multiple slaves. In above model, B is the master of keyAB and C, D are the slaves. All update requests has to go to the master where update is applied and then asynchronously propagated to the slaves. Notice that there is a time window of data lost if the master crashes before it propagate its update to any slaves, so some system will wait synchronously for the update to be propagated to at least one slave.

Read requests can go to any replicas if the client can tolerate some degree of data staleness. This is where the read workload is distributed among many replicas. If the client cannot tolerate staleness for certain data, it also need to go to the master.

Note that this model doesn't mean there is one particular physical node that plays the role as the master. The granularity of "mastership" happens at the virtual node level. Each physical node has some virtual nodes acts as master of some partitions while other virtual nodes acts as slaves of other partitions. Therefore, the write workload is also distributed across different physical node, although this is due to partitioning rather than replicas

When a physical node crashes, the masters of certain partitions will be lost. Usually, the most updated slave will be nominated to become the new master.

Master Slave model works very well in general when the application has a high read/write ratio. It also works very well when the update happens evenly in the key range. So it is the predominant model of data replication.

There are 2 ways how the master propagate updates to the slave; State transfer and Operation transfer. In State transfer, the master passes its latest state to the slave, which then replace its current state with the latest state. In operation transfer, the master propagate a sequence of operations to the slave which then apply the operations in its local state.

The state transfer model is more robust against message lost because as long as a latter more updated message arrives, the replica still be able to advance to the latest state.

Even in state transfer mode, we don't want to send the full object for updating other replicas because changes typically happens within a small portion of the object. In will be a waste of network bandwidth if we send the unchanged portion of the object, so we need a mechanism to detect and send just the delta (the portion that has been changed). One common approach is break the object into chunks and compute a hash tree of the object. So the replica can just compare their hash tree to figure out which chunk of the object has been changed and only send those over.

In operation transfer mode, usually much less data need to be send over the network. However, it requires a reliable message mechanism with delivery order guarantee.


Multi-Master (or No Master) Model

If there is hot spots in certain key range, and there is intensive write request, the master slave model will be unable to spread the workload evenly. Multi-master model allows updates to happen at any replica (I think call it "No-Master" is more accurate).

If any client can issue any update to any server, how do we synchronize the states such that we can retain client consistency and also eventually every replica will get to the same state ? We describe a number of different approaches in following ...

Quorum Based 2PC

To provide "strict consistency", we can use a traditional 2PC protocol to bring all replicas to the same state at every update. Lets say there is N replicas for a data. When the data is update, there is a "prepare" phase where the coordinator ask every replica to confirm whether each of them is ready to perform the update. Each of the replica will then write the data to a log file and when success, respond to the coordinator.

After gathering all replicas responses positively, the coordinator will initiate the second "commit" phase and then ask every replicas to commit and each replica then write another log entry to confirm the update. Notice that there are some scalability issue as the coordinator need to "synchronously" wait for quite a lot of back and forth network roundtrip and disk I/O to complete.

On the other hand, if any one of the replica crashes, the update will be unsuccessful. As there are more replicas, chance of having one of them increases. Therefore, replication is hurting the availability rather than helping. This make traditional 2PC not a popular choice for high throughput transactional system.

A more efficient way is to use the quorum based 2PC (e.g. PAXOS). In this model, the coordinator only need to update W replicas (rather than all N replicas) synchronously. The coordinator still write to all the N replicas but only wait for positive acknowledgment for any W of the N to confirm. This is much more efficient from a probabilistic standpoint.

However, since no all replicas are update, we need to be careful when reading the data to make sure the read can reach at least one replica that has been previously updated successful. When reading the data, we need to read R replicas and return the one with the latest timestamp.

For "strict consistency", the important condition is to make sure the read set and the write set overlap. ie: W + R > N


As you can see, the quorum based 2PC can be considered as a general 2PC protocol where the traditional 2PC is a special case where W = N and R = 1. The general quorum-based model allow us to pick W and R according to our tradeoff decisions between read and write workload ratio.

If the user cannot afford to pick W, R large enough, ie: W + R <= N, then the client is relaxing its consistency model to a weaker one.

If the client can tolerate a more relax consistency model, we don't need to use the 2PC commit or quorum based protocol as above. Here we describe a Gossip model where updates are propagate asynchronous via gossip message exchanges and an auto-entropy protocol to apply the update such that every replica eventually get to the latest state.

Vector Clock


Vector Clock is a timestamp mechanism such that we can reason about causal relationship between updates. First of all, each replica keeps vector clock. Lets say replica i has its clock Vi. Vi[i] is the logical clock which if every replica follows certain rules to update its vector clock.
  • Whenever an internal operation happens at replica i, it will advance its clock Vi[i]
  • Whenever replica i send a message to replica j, it will first advance its clock Vi[i] and attach its vector clock Vi to the message
  • Whenever replica j receive a message from replica i, it will first advance its clock Vj[j] and then merge its clock with the clock Vm attached in the message. ie: Vj[k] = max(Vj[k], Vm[k])

A partial order relationship can be defined such that Vi > Vj iff for all k, Vi[k] >= Vj[k]. We can use these partial ordering to derive causal relationship between updates. The reasoning behind is
  • The effect of an internal operation will be seen immediately at the same node
  • After receiving a message, the receiving node knows the situation of the sending node at the time when the message is send. The situation is not only including what is happening at the sending node, but also all the other nodes that the sending node knows about.
  • In other words, Vi[i] reflects the time of the latest internal operation happens at node i. Vi[k] = 6 reflects replica i has known the situation of replica k up to its logical clock 6.
Notice that the term "situation" is used here in an abstract sense. Depends on what information is passed in the message, the situation can be different. This will affect how the vector clock will be advanced. In below, we describe the "state transfer model" and the "operation transfer model" which has different information passed in the message and the advancement of their vector clock will also be different.

Because state is always flow from the replica to the client but not the other way round, the client doesn't have an entry in the Vector clock. The vector clock contains only one entry for each replica. However, the client will also keep a vector clock from the last replica it contacts. This is important for support the client consistency model we describe above. For example, to support monotonic read, the replica will make sure the vector clock attached to the data is > the client's submitted vector clock in the request.


Gossip (State Transfer Model)

In a state transfer model, each replica maintain a vector clock as well as a state version tree where each state is neither > or < among each other (based on vector clock comparison). In other words, the state version tree contains all the conflicting updates.

At query time, the client will attach its vector clock and the replica will send back a subset of the state tree which precedes the client's vector clock (this will provide monotonic read consistency). The client will then advance its vector clock by merging all the versions. This means the client is responsible to resolve the conflict of all these versions because when the client sends the update later, its vector clock will precede all these versions.


At update, the client will send its vector clock and the replica will check whether the client state precedes any of its existing version, if so, it will throw away the client's update.


Replicas also gossip among each other in the background and try to merge their version tree together.

Gossip (Operation Transfer Model)

In an operation transfer approach, the sequence of applying the operations is very important. At the minimum causal order need to be maintained. Because of the ordering issue, each replica has to defer executing the operation until all the preceding operations has been executed. Therefore replicas save the operation request to a log file and exchange the log among each other and consolidate these operation logs to figure out the right sequence to apply the operations to their local store in an appropriate order.

"Causal order" means every replica will apply changes to the "causes" before apply changes to the "effect". "Total order" requires that every replica applies the operation in the same sequence.

In this model, each replica keeps a list of vector clock, Vi is the vector clock the replica itself and Vj is the vector clock when replica i receive replica j's gossip message. There is also a V-state that represent the vector clock of the last updated state.

When a query is submitted by the client, it will also send along its vector clock which reflect the client's view of the world. The replica will check if it has a view of the state that is later than the client's view.


When an update operation is received, the replica will buffer the update operation until it can be applied to the local state. Every submitted operation will be tag with 2 timestamp, V-client indicates the client's view when he is making the update request. V-@receive is the replica's view when it receives the submission.

This update operation request will be sitting in the queue until the replica has received all the other updates that this one depends on. This condition is reflected in the vector clock Vi when it is larger than V-client


On the background, different replicas exchange their log for the queued updates and update each other's vector clock. After the log exchange, each replica will check whether certain operation can be applied (when all the dependent operation has been received) and apply them accordingly. Notice that it is possible that multiple operations are ready for applying at the same time, the replica will sort these operation in causal order (by using the Vector clock comparison) and apply them in the right order.


The concurrent update problem at different replica can also happen. Which means there can be multiple valid sequences of operation. In order for different replica to apply concurrent update in the same order, we need a total ordering mechanism.

One approach is whoever do the update first acquire a monotonic sequence number and late comers follow the sequence. On the other hand, if the operation itself is commutative, then the order to apply the operations doesn't matter

After applying the update, the update operation cannot be immediately removed from the queue because the update may not be fully exchange to every replica yet. We continuously check the Vector clock of each replicas after log exchange and after we confirm than everyone has receive this update, then we'll remove it from the queue.

Map Reduce Execution

Notice that the distributed store architecture fits well into distributed processing as well. For example, to process a Map/Reduce operation over an input key list.

The system will push the map and reduce function to all the nodes (ie: moving the processing logic towards the data). The map function of the input keys will be distributed among the replicas of owning those input, and then forward the map output to the reduce function, where the aggregation logic will be executed.


Handling Deletes

In a multi-master replication system, we use Vector clock timestamp to determine causal order, we need to handle "delete" very carefully such that we don't lost the associated timestamp information of the deleted object, otherwise we cannot even reason the order of when to apply the delete.

Therefore, we typically handle delete as a special update by marking the object as "deleted" but still keep its metadata / timestamp information around. Around a long enough time that we are confident that every replica has marked this object deleted, then we garbage collected the deleted object to reclaim its space.


Storage Implementaton

One strategy is to use make the storage implementation pluggable. e.g. A local MySQL DB, Berkeley DB, Filesystem or even a in memory Hashtable can be used as a storage mechanism.

Another strategy is to implement the storage in a highly scalable way. Here are some techniques that I learn from CouchDB and Google BigTable.

CouchDB has a MVCC model that uses a copy-on-modified approach. Any update will cause a private copy being made which in turn cause the index also need to be modified and causing the a private copy of the index as well, all the way up to the root pointer.

Notice that the update happens in an append-only mode where the modified data is appended to the file and the old data becomes garbage. Periodic garbage collection is done to compact the data. Here is how the model is implemented in memory and disks

In Google BigTable model, the data is broken down into multiple generations and the memory is use to hold the newest generation. Any query will search the mem data as well as all the data sets on disks and merge all the return results. Fast detection of whether a generation contains a key can be done by checking a bloom filter.

When update happens, both the mem data and the commit log will be written so that if the machine crashes before the mem data flush to disk, it can be recovered from the commit log.

C++ Constant : Types And Uses

C++ Constant : Types And Uses

What is Constant ?

As similar to variable constant are data storage location. As the name implies constant's value do not change. They remain constant throughout the program. Unlike variable whose value can be changed anywhere in the program.

There are two types of constant in C++. They are as follows :
1) Literal Constant
float PI=3.14;
The value that is directly typed into the program is called literal constant.
Here 3.14 is called literal constant. You cannot assign a value to 3.14.
2) Symbolic Constant
Symbolic Constant are represented by name.

There are two ways to declare a symbolic constant. They are as follows :
1) By using preprocessor directive #define.
This is old way of declaring constant. It has now became obsolete way.
2) By using keyword const.
This way is appropriate way to declare constant.

A C++ Program example that demonstrate the use of constant by using preprocessor directive #define


/* Area Of Circle Program */

#include <iostream>

#define PI 3.14

using std::cout;
using std::cin;
using std::endl;

int main ()
{

    int r;

    cout << "Find the area of circle." << endl;
    cout << "Enter radius : ";
    cin >> r;

    float area = PI * r * r;

    cout << "The area of circle of radius     " << r << " is "
    << area << endl;

    return 0;
}


A C++ Program example that demonstrate the use of constant by using the keyword const


/* Area Of Circle Program */

#include <iostream>

using std::cout;
using std::cin;
using std::endl;

const float PI = 3.14;

int main ()
{

    int r;

    cout << "Find the area of circle." << endl;
    cout << "Enter radius : ";
    cin >> r;

    float area = PI * r * r;

    cout << "The area of circle of radius " << r << " is " << area
    << endl;

    return 0;
}


Another C++ Program example that demonstrate the use of constant


/* Program that calculate total income of the year */

#include <iostream>

using std::cout;
using std::endl;

int main ()
{

    const int salary = 20000;
    float tax = (float) 10 / 100 * salary;        // tax is 10% of salary
    float monthlyIncome = salary - tax;
    // bonus is 5% of salary
    float yearlyBonus = (float) 5 / 100 * salary;
    float yearlyIncome = (monthlyIncome*12) + yearlyBonus;

    cout << "My yearly income is " << yearlyIncome << endl;

    return 0;
}


In the above program example, salary is declared as constant of type int. You can assign a value to constant only at the declaration time. This value could not be changed later on the program. If you do you will get compiler error " assignment of read-only variable 'salary' " and your program wont compile.

C++ Notes :

(1) The way to declare a string constant with #define :
#define HOBBY "Programming"
String constant must be enclosed with double-inverted commas.

(2) The way to declare a character constant with #define :
#define AGREE 'y'
Character constant must be enclosed with single-inverted commas.

(3) Numeric type of data are not enclosed with inverted commas.

(4) The advantage of using const keyword is that you can create constant of various data types by mentioning it explicitly.
For example :
const unsigned short int myVal = 40;

Designing Your Own Lightbox in Javascript

In nowadays web 2.0 world use of Lightbox is very common. While Lightbox, fancybox (similar to the former) are great scripts and have wide uses, creating a script similar to these is never a bad idea. If you learn, read on else use of one of those scripts, they’re great and easy-to-use.


For those of you who haven’t heard about the script or don’t know what they do, see the following image:


Screenshot og Blackbox - Our own Lightbox clone


Chances are, you might surely have seen it somewhere or the other. These scripts are generally used to display some content in kind of like a dialog box (modal one, for those of you who're geeks) while the rest of the content gets blackened. Looks great? Yes it does!


Okay, for those of you still here I wanna confess that I didn’t put enough time knowing how those scripts actually work. I just got an idea myself the other day and thought it just might work. This is not to say that I myself have invented some new way, it’s just that I don’t know how those scripts work but I know one way that gives similar results.


As you can see from the above image, there is not much to a simple Lightbox clone, we have a (1) Blackening effect (2) The content box.




  1. Blackening Effect: For this I’ll create a “div” element on the fly and set its properties such that it has a black color and some transparency, a large z-index means floats on top of the rest of the content and back content (with normal z-index) cannot be interacted with anymore. We’ll fill the current screen with this “div” which will require us to place this element at the topmost and leftmost coordinates relative to the current viewable area. This will be (0, 0) when the page isn’t scrolled at all.


    We’ll also have to size the element to have it span the whole viewable area of the browser.


    These two things will make sure that no matter where we have scrolled in a page and whatever be the window size, this black overlay element always covers the current viewport.




  2. 2. Content Box: A nicely styled box with a close button is all we need. We’ll place it at the center of the screen. Since we have calculated the topmost and leftmost coordinates relative to the current viewport and we also have the current viewport’s dimension, we can easily position this at the center, no brainer! We’ll give this a z-index larger than the black overlay element such that this is at the top of everything.


    Besides this, we’ll also have to take care that these two elements move along with the page in case user tries to scroll the page when the our lightbox is open. This will make sure that (1) black overlay element always fills the screen (2) content box is always at the center.




Sounds pretty simple? Well, it is! It’ll call this Blackbox, you may call it whatever you feel like. Here is the code (Demo here):



<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">

<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>Blackbox - A very simple Lightbox clone</title>

<script type="text/javascript">
/*
* Script: Blackbox (very simple Lightbox clone)
* Author: Arvind Gupta (contact@arvindgupta.co.in)
* Date: 14-Nov-09
* Copyright: 2009 Arvind Gupta

* You may freely use this script wherever
* you want and in whatever way you wish
* but please don't remove this note.
*
*/

// OBJECTS

// Black overlay element
var darkbox;
// Content box
var content;

// FUNCTIONS
function init()
{
// Set "onScroll" event handler

window.onscroll = scroll_box;
}

function open()
{
// Create elements
darkbox = document.createElement('div');
content = document.createElement('div');

// Style them with the existing ids
darkbox.id = 'darkbox';
content.id = 'content';

// FILL CONTENT BOX

// Have the close button
content.innerHTML = '<a style="position: absolute; top: -30px; right: -30px; text-decoration: none;" href="javascript:close();"><img style="border: none;" src="fancy_closebox.png" /></a>';
// The main content

content.innerHTML += '<div id="main_content"><h1>Hello</h1><p>Hello World!<br /> How is this looking?</p></div>';

// Add these elements to the body

document.body.appendChild(darkbox);
document.body.appendChild(content);

// Calciulate coordinates and such
var pos_top = document.documentElement.scrollTop
var pos_left = document.documentElement.scrollLeft;
var screen_width = document.documentElement.clientWidth;
var screen_height = document.documentElement.clientHeight;

// Place the "darkbox" element and give it the size

darkbox.style.top = pos_top + 'px';
darkbox.style.left = pos_left + 'px';
darkbox.style.height = screen_height + 'px';
darkbox.style.width = screen_width + 'px';

// Now place the content box at the center
content.style.left = (pos_left + (screen_width / 2.0) - (content.offsetWidth / 2.0)) + 'px';
content.style.top = (pos_top + (screen_height / 2.0) - (content.offsetHeight / 2.0)) + 'px';
}


function scroll_box ()
{
// If "Darkbox" open
if(darkbox != null)
{
// Find new topmost, leftmost position w.r.t the current viewport
// Also find new window size

var pos_top = document.documentElement.scrollTop
var pos_left = document.documentElement.scrollLeft;
var screen_width = document.documentElement.clientWidth;
var screen_height = document.documentElement.clientHeight;

// Positions elements accordingly
darkbox.style.top = pos_top + 'px';
darkbox.style.left = pos_left + 'px';
darkbox.style.height = screen_height + 'px';
darkbox.style.width = screen_width + 'px';

content.style.left = (pos_left + (screen_width / 2.0) - (content.offsetWidth / 2.0)) + 'px';
content.style.top = (pos_top + (screen_height / 2.0) - (content.offsetHeight / 2.0)) + 'px';
}
}


function close()
{
// Delete elements
document.body.removeChild(darkbox);
document.body.removeChild(content);
}
</script>

<style>
#darkbox {
position: absolute;
top: 0px;
left: 0px;
opacity: 0.6;
filter:alpha(opacity=60);
background: #000;
}

#content {
position: absolute;
z-index: 1001;
background: #fff;
border: 10px solid #000;
width: 500px;
height: 300px;
}
#content #main_content {
overflow: auto;
width: 500px;
height: 300px;
}

</style>
</head>

<body onload="init();">
<a href="javascript:open()">Open Box</a>
</body>
</html>


Related Posts:


Check out this stream