Spark & HITS

A template for the hubs-and-authorities algorithm - HITS (hyperlink-induced topic search).

At each step in this notebook, recommended functions for implementation of MapReduce data flow pattern are listed.

In [ ]:
import sys
from pyspark import SparkConf, SparkContext

User parameters

In [ ]:
# the number of iterations
NUM_ITERATIONS = 40
In [ ]:
# input files
big_file = 'graph-full.txt'
small_file = 'graph-small.txt'

Start Spark

In [ ]:
# prevent the "cannot run multiple SparkContexts at once" error
sc = SparkContext.getOrCreate()
sc.stop()
In [ ]:
conf = SparkConf()
sc = SparkContext(conf=conf)

Load Data

In [ ]:
# Load big/small data file
data = sc.textFile(small_file).map(lambda line: line.split('\t')).map(lambda line: (line[0], line[1]))
In [ ]:
print("%d lines" % data.count())

STEP 1: obtain key-value pairs for L and LT

groupByKey()

  • apply an operation to the value of every element of an RDD
  • return a new RDD that contains the results after removing the outermost container

map()

  • apply an operation to every element of an RDD
  • return a new RDD that contains the results

Caching

Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank.

.cache()

In [ ]:
L =           # key-value pairs for L-matrix
LT =          # key-value pairs for transpose of L-matrix

STEP 2: start with h (hubbiness) vector of all 1's

If I have an RDD that has key-value pair and I want to get only the key part, what is the most efficient way of doing it?

Answer: yourRDD.keys()

map()

  • apply an operation to every element of an RDD
  • return a new RDD that contains the results
In [ ]:
h =           # initial hubbiness vector

STEP 3: compute vectors h (hubbiness) and a (authority) iteratively in mutual recursion

Repeat until convergence (or use fixed number of iterations):

  1. Compute a = LTh.
  2. Scale so the largest component is 1.
  3. Compute h = La.
  4. Scale again so the largest component is 1.

If I have an RDD that has key-value pair and I want to get only the value part, what is the most efficient way of doing it?

Answer: youRDD.values()

join()

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.

flatMap()

  • apply an operation to the value of every element of an RDD
  • return a new RDD that contains the results after dropping the outermost container

reduceByKey()

  • combine elements of an RDD by key and then
  • apply a reduce operation to pairs of keys
  • until only a single key remains.
  • return the result in a new RDD

max()

  • returns the highest value in a RDD

mapValues()

  • apply an operation to the value of every element of an RDD
  • return a new RDD that contains the results
In [ ]:
for _ in range(NUM_ITERATIONS):
    a =           # compute a = LTh
    a_max =       # obtain maximum value in a
    a =           # scale a so the largest component is 1
    
    h =           # compute h = La
    h_max =       # obtain maximum value in h
    h =           # scale h so the largest component is 1

STEP 4: List the nodes with the highest/lowest hubbiness/authority score

Tips:

  • In order to sort by value, you may change keys and values and then sort the vectors by key.
  • You may again use caching with .cache().
In [ ]:
# TO-DO: implement four for loops and print out the results
In [ ]:
sc.stop()
In [ ]: