Showing posts with label scalability. Show all posts
Showing posts with label scalability. Show all posts

BI at large scale

As more and more data being collected everywhere from pretty much everything a user do, such as transactions activities, social interactions, information search ... enterprises has been actively looking into ways to turn these vast amount of raw data into useful information.

BI process flow

It include the following stages of processing
  1. ETL: Extract operational data (inside enterprise or external sources) into data warehouse (typically organized in Star/Snowflake schema with Fact and Dimension tables).
  2. Data exploration: Get insight into data using simple visualization tools (e.g. histogram, summary statistics) or sophisticated OLAP tools (slice, dice, rollup, drilldown)
  3. Report generation: Produce executive reports
  4. Data mining: Extract patterns of the underlying data to form models (e.g. bayesian networks, linear regression, neural networks, decision trees, support vector machines, nearest neighbors, association rules, principal component analysis)
  5. Feedback: The model will be used to assist business decision making (predicting the future)
The gap of processing BIG data
Many data mining and machine learning algorithms are available in both commercial packages (e.g. SAS, SPSS) as well as open source libraries (e.g. Weka, R). Nevertheless, most of these ML algorithms implementation are based on fitting al data in memory and not designed to process big data (e.g. Tera byte data volume).

On the other hand, massively parallel processing platform such as Hadoop, Map/Reduce, over the last few years, has been proven in processing Terabyte or even Petabyte range of data. Although many sequential algorithm can be restructured to run in map reduce, including a big portion of machine learning algorithm, there isn't a corresponding parallel implementation of ML available in massively parallel form.

Approach 1: Apache Mahout
One approach is to "re-implement" the ML algorithm in Map/Reduce and this is the path of Apache Mahout project. Mahout seems to have implemented an impressive list of algorithms although I haven't used them for my projects yet.

Approach 2: Ensemble of parallel independent learners
This is an alternative path that doesn't require re-implementation of existing algorithms. It works in the following way.
  1. Draw samples from the Big data into many sample data sets, which can fit into the memory of a single, individual learner.
  2. Assign each sample data set to an individual learner, who use existing algorithms to learn the model. After learning, each individual learner keep their own learned model
  3. When a decision / prediction request is received, each individual learner will come up with its own prediction and then combine their results in some ways. (e.g. for classification task, the learners will vote for the predicted class and the majority wins. for regression, the average of the estimate values will be used to predict the output value)

I also found this approach can smoothly fade out outdated model. As user's behavior may change over time, same happens to the validity of a learned model. With this ensemble approach, I can have multiple learners each learn their model periodically. Everytime when a prediction is needed, I will pick the latest k models and combine the final prediction based on a time-decayed weighted voting model. Outdated model will automatically slide out the k-size window automatically.

One gotchas of sampling approach is the handling of rare events (since you may lost those rare events in sampling). In this case, stratified sampling (instead of simple random sampling) should be used.

Map Reduce and Stream Processing

Hadoop Map/Reduce model is very good in processing large amount of data in parallel. It provides a general partitioning mechanism (based on the key of the data) to distribute aggregation workload across different machines. Basically, map/reduce algorithm design is all about how to select the right key for the record at different stage of processing.

However, "time dimension" has a very different characteristic compared to other dimensional attributes of data, especially when real-time data processing is concerned. It presents a different set of challenges to the batch oriented, Map/Reduce model.
  1. Real-time processing demands a very low latency of response, which means there isn't too much data accumulated at the "time" dimension for processing.
  2. Data collected from multiple sources may not have all arrived at the point of aggregation.
  3. In the standard model of Map/Reduce, the reduce phase cannot start until the map phase is completed. And all the intermediate data is persisted in the disk before download to the reducer. All these added to significant latency of the processing.
Here is a more detail description of this high latency characteristic of Hadoop.

Although Hadoop Map/Reduce is designed for batch-oriented work load, certain application, such as fraud detection, ad display, network monitoring requires real-time response for processing large amount of data, have started to looked at various way of tweaking Hadoop to fit in the more real-time processing environment. Here I try to look at some technique to perform low-latency parallel processing based on the Map/Reduce model.


General stream processing model

In this model, data are produced at various OLTP system, which update the transaction data store and also asynchronously send additional data for analytic processing. The analytic processing will write the output to a decision model, which will feed back information to the OLTP system for real-time decision making.

Notice the "asynchronous nature" of the analytic processing which is decoupled from the OLTP system, this way the OLTP system won't be slow down waiting for the completion of the analytic processing. Nevetheless, we still need to perform the analytic processing ASAP, otherwise the decision model will not be very useful if it doesn't reflect the current picture of the world. What latency is tolerable is application specific.

Micro-batch in Map/Reduce


One approach is to cut the data into small batches based on time window (e.g. every hour) and submit the data collected in each batch to the Map Reduce job. Staging mechanism is needed such that the OLTP application can continue independent of the analytic processing. A job scheduler is used to regulate the producer and consumer so each of them can proceed independently.

Continuous Map/Reduce

Here lets imagine some possible modification of the Map/Reduce execution model to cater for real-time stream processing. I am not trying to worry about the backward compatibility of Hadoop which is the approach that Hadoop online prototype (HOP) is taking.

Long running
The first modification is to make the mapper and reducer long-running. Therefore, we cannot wait for the end of the map phase before starting the reduce phase as the map phase never ends. This implies the mapper push the data to the reducer once it complete its processing and let the reducer to sort the data. A downside of this approach is that it offers no opportunity to run the combine() function on the map side to reduce the bandwidth utilization. It also shift more workload to the reducer which now needs to do the sorting.

Notice there is a tradeoff between latency and optimization. Optimization requires more data to be accumulated at the source (ie: the Mapper) so local consolidation (ie: combine) can be performed. Unfortunately, low latency requires the data to be sent ASAP so not much accumulation can be done.

HOP suggest an adaptive flow control mechanism such that data is pushed out to reducer ASAP until the reducer is overloaded and push back (using some sort of flow control protocol). Then the mapper will buffer the processed message and perform combine() before it send to the reducer. This approach automatically shift back and forth the aggregation workload between the reducer and the mapper.

Time Window: Slice and Range
This is a "time slice" concept and a "time range" concept. "Slice" defines a time window where result is accumulated before the reduce processing is executed. This is also the minimum amount of data that the mapper should accumulate before sending to the reducer.

"Range" defines the time window where results are aggregated. It can be a landmark window where it has a well-defined starting point, or a jumping window (consider a moving landmark scenario). It can also be a sliding window where is a fixed size window from the current time is aggregated.

After receiving a specific time slice from every mapper, the reducer can start the aggregation processing and combine the result with the previous aggregation result. Slice can be dynamically adjusted based on the amount of data sent from the mapper.

Incremental processing
Notice that the reducer need to compute the aggregated slice value after receive all records of the same slice from all mappers. After that it calls the user-defined merge() function to merge the slice value with the range value. In case the range need to be refreshed (e.g. reaching a jumping window boundary), the init() functin will be called to get a refreshed range value. If the range value need to be updated (when certain slice value falls outside a sliding range), the unmerge() function will be invoked.

Here is an example of how we keep tracked of the average hit rate (ie: total hits per hour) within a 24 hour sliding window with update happens per hour (ie: an one-hour slice).
# Call at each hit record
map(k1, hitRecord) {
site = hitRecord.site
# lookup the slice of the particular key
slice = lookupSlice(site)
if (slice.time - now > 60.minutes) {
# Notify reducer whole slice of site is sent
advance(site, slice)
slice = lookupSlice(site)
}
emitIntermediate(site, slice, 1)
}

combine(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
# Send the message to the downstream node
emitIntermediate(site, slice, hitCount)
}

# Called when reducer receive full slice from all mappers
reduce(site, slice, countList) {
hitCount = 0
for count in countList {
hitCount += count
}
sv = SliceValue.new
sv.hitCount = hitCount
return sv
}

# Called at each jumping window boundary
init(slice) {
rangeValue = RangeValue.new
rangeValue.hitCount = 0
return rangeValue
}

# Called after each reduce()
merge(rangeValue, slice, sliceValue) {
rangeValue.hitCount += sliceValue.hitCount
}

# Called when a slice fall out the sliding window
unmerge(rangeValue, slice, sliceValue) {
rangeValue.hitCount -= sliceValue.hitCount
}

Scalable System Design Patterns

Looking back after 2.5 years since my previous post on scalable system design techniques, I've observed an emergence of a set of commonly used design patterns. Here is my attempt to capture and share them.

Load Balancer

In this model, there is a dispatcher that determines which worker instance will handle the request based on different policies. The application should best be "stateless" so any worker instance can handle the request.

This pattern is deployed in almost every medium to large web site setup.



Scatter and Gather

In this model, the dispatcher multicast the request to all workers of the pool. Each worker will compute a local result and send it back to the dispatcher, who will consolidate them into a single response and then send back to the client.

This pattern is used in Search engines like Yahoo, Google to handle user's keyword search request ... etc.



Result Cache

In this model, the dispatcher will first lookup if the request has been made before and try to find the previous result to return, in order to save the actual execution.

This pattern is commonly used in large enterprise application. Memcached is a very commonly deployed cache server.



Shared Space

This model also known as "Blackboard"; all workers monitors information from the shared space and contributes partial knowledge back to the blackboard. The information is continuously enriched until a solution is reached.

This pattern is used in JavaSpace and also commercial product GigaSpace.



Pipe and Filter

This model is also known as "Data Flow Programming"; all workers connected by pipes where data is flow across.

This pattern is a very common EAI pattern.



Map Reduce

The model is targeting batch jobs where disk I/O is the major bottleneck. It use a distributed file system so that disk I/O can be done in parallel.

This pattern is used in many of Google's internal application, as well as implemented in open source Hadoop parallel processing framework. I also find this pattern can be used in many many application design scenarios.



Bulk Synchronous Parellel

This model is based on lock-step execution across all workers, coordinated by a master. Each worker repeat the following steps until the exit condition is reached, when there is no more active workers.
  1. Each worker read data from input queue
  2. Each worker perform local processing based on the read data
  3. Each worker push local result along its direct connection
This pattern has been used in Google's Pregel graph processing model as well as the Apache Hama project.



Execution Orchestrator

This model is based on an intelligent scheduler / orchestrator to schedule ready-to-run tasks (based on a dependency graph) across a clusters of dumb workers.

This pattern is used in Microsoft's Dryad project



Although I tried to cover the whole set of commonly used design pattern for building large scale system, I am sure I have missed some other important ones. Please drop me a comment and feedback.

Also, there is a whole set of scalability patterns around data tier that I haven't covered here. This include some very basic patterns underlying NOSQL. And it worths to take a deep look at some leading implementations.

Check out this stream