Solving TF-IDF using Map-Reduce

TF-IDF (Term Frequency, Inverse Document Frequency) is a basic technique to compute the relevancy of a document with respect to a particular term.

"Term" is a generalized element contains within a document. A "term" is a generalized idea of what a document contains. (e.g. a term can be a word, a phrase, or a concept).

Intuitively, the relevancy of a document to a term can be calculated from the percentage of that term shows up in the document (ie: the count of the term in that document divide by the total number of terms in it). We called this the "term frequency"

On the other hand, if this is a very common term which appears in many other documents, then its relevancy should be reduced. (ie: the count of documents having this term divided by total number of documents). We called this the "document frequency"

The overall relevancy of a document with respect to a term can be computed using both the term frequency and document frequency.

relevancy = term frequency * log (1 / document frequency)

This is called tf-idf. A "document" can be considered as a multi-dimensional vector where each dimension represents a term with the tf-idf as its value.

Compute TF-IDF using Map/Reduce

To extract the terms from a document, the following process is common
  • Extract words by tokenize the input streams
  • Make the words case-insensitive (e.g. transform to all lower case)
  • Apply n-gram to extract phrases (e.g. statistically frequent n-grams is likely a phrase)
  • Filter out stop words
  • Stemming (e.g. transform cat, cats, kittens to cat)
To keep the term simple, each word itself is a term in our example below.

We use multiple rounds of Map/Reduce to gradually compute …
  1. the word count of per word/doc combination
  2. the total number of words per doc
  3. the total number of docs per word. And finally compute the TF-IDF



Implementation in Apache PIG

There are many ways to implement the Map/Reduce paradigm above. Apache Hadoop is a pretty popular approach using Java or other programming language (ie: Hadoop Streaming).

Apache PIG is another approach based on a higher level language with parallel processing construct built in. Here is the 3 rounds of map/reduce logic implemented in PIG Script
REGISTER rickyudf.jar

/* Build up the input data stream */
A1 = LOAD 'dirdir/data.txt' AS (words:chararray);
DocWordStream1 =
FOREACH A1 GENERATE
'data.txt' AS docId,
FLATTEN(TOKENIZE(words)) AS word;

A2 = LOAD 'dirdir/data2.txt' AS (words:chararray);
DocWordStream2 =
FOREACH A2 GENERATE
'data2.txt' AS docId,
FLATTEN(TOKENIZE(words)) AS word;

A3 = LOAD 'dirdir/data3.txt' AS (words:chararray);
DocWordStream3 =
FOREACH A3 GENERATE
'data3.txt' AS docId,
FLATTEN(TOKENIZE(words)) AS word;

InStream = UNION DocWordStream1,
DocWordStream2,
DocWordStream3;

/* Round 1: word count per word/doc combination */
B = GROUP InStream BY (word, docId);
Round1 = FOREACH B GENERATE
group AS wordDoc,
COUNT(InStream) AS wordCount;

/* Round 2: total word count per doc */
C = GROUP Round1 BY wordDoc.docId;
WW = GROUP C ALL;
C2 = FOREACH WW GENERATE
FLATTEN(C),
COUNT(C) AS totalDocs;
Round2 = FOREACH C2 GENERATE
FLATTEN(Round1),
SUM(Round1.wordCount) AS wordCountPerDoc,
totalDocs;

/* Round 3: Compute the total doc count per word */
D = GROUP Round2 BY wordDoc.word;
D2 = FOREACH D GENERATE
FLATTEN(Round2),
COUNT(Round2) AS docCountPerWord;
Round3 = FOREACH D2 GENERATE
$0.word AS word,
$0.docId AS docId,
com.ricky.TFIDF(wordCount,
wordCountPerDoc,
totalDocs,
docCountPerWord) AS tfidf;

/* Order the output by relevancy */
ORDERRound3 = ORDER Round3 BY word ASC,
tfidf DESC;
DUMP ORDERRound3;



Here is the corresponding User Defined Function in Java (contained in rickyudf.jar)
package com.ricky;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class TFIDF extends EvalFunc<Double> {

@Override
public Double exec(Tuple input) throws IOException {
// TODO Auto-generated method stub
long wordCount = (Long) input.get(0);
long wordCountPerDoc = (Long) input.get(1);
long totalDocs = (Long) input.get(2);
long docCountPerWord = (Long) input.get(3);

double tf = (wordCount * 1.0) / wordCountPerDoc;
double idf = Math.log((totalDocs * 1.0) / docCountPerWord);

return tf * idf;
}
}

Check out this stream