sudo apt install default-jdk
java -version
sudo mkdir /opt/spark/
sudo mv spark-3.3.2-bin-hadoop3/ /opt/spark/
pip install pyspark
vi ~/.bashrc
export SPARK_HOME=/opt/spark/spark-3.1.1-bin-hadoop2.7
export PATH=$SPARK_HOME/bin:$PATH
source ~/.bashrc
To set up Spark in Jupyter notebook, do the following:
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=<port> --ip='*'"
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser"
Don't forget to run source ~/.bashrc
at the end.
pyspark
Note that remote access to jupyter notebook requires a tunnel. On Windows machines, you can use Putty to set it up. In Linux environments, the following command can be used:
ssh -N -L localhost:<port>:localhost:<local_port> <user>
Finally, you can run the notebook in your browser:
http://localhost:<local_port>
import random
import re
The following code helps to suppress unnecessary warnings caused by the new Jupyter notebook version.
%%html
<style>
div.output_stderr {
display: none;
}
</style>
PySpark can be used from standalone Python scripts by creating a SparkContext
. You can set configuration properties by passing a SparkConf
object to SparkContext
.
Documentation: pyspark package
from pyspark import SparkContext, SparkConf
# cannot run multiple SparkContexts at once (so stop one just in case)
sc = SparkContext.getOrCreate()
sc.stop()
# spark conf
conf = SparkConf()
# create a Spark context
sc = SparkContext(conf=conf)
resilient:
Spark is RDD-centric!
RDD - Resilient Distributed Datasets
Some useful actions:
file1.txt:
Apple,Amy
Butter,Bob
Cheese,Chucky
Dinkel,Dieter
Egg,Edward
Oxtail,Oscar
Anchovie,Alex
Avocado,Adam
Apple,Alex
Apple,Adam
Dinkel,Dieter
Doughboy,Pilsbury
McDonald,Ronald
file2.txt:
Wendy,
Doughboy,Pillsbury
McDonald,Ronald
Cheese,Chucky
# input files
file1 = 'file1.txt'
file2 = 'file2.txt'
# load data
data1 = sc.textFile(file1)
data2 = sc.textFile(file2)
data1.collect()
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky', 'Dinkel,Dieter', 'Egg,Edward', 'Oxtail,Oscar', 'Anchovie,Alex', 'Avocado,Adam', 'Apple,Alex', 'Apple,Adam', 'Dinkel,Dieter', 'Doughboy,Pilsbury', 'McDonald,Ronald']
print("file1: %d lines" % data1.count())
file1: 13 lines
data1.take(3)
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']
data2.collect()
['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']
print("file2: %d lines" % data2.count())
file2: 4 lines
data2.take(3)
['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald']
Note: the following produces output on Jupyter notebook server!
# prints each element in the Jupyter notebook output
data2.foreach(print)
Cheese,Chucky Wendy, Doughboy,Pillsbury McDonald,Ronald
Return a new RDD by applying a function to each element of this RDD.
data = sc.textFile(file1)
data
file1.txt MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0
data.take(3)
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']
data.map(lambda line: line.split(',')).take(3)
[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
data = sc.textFile(file1)
data.take(4)
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky', 'Dinkel,Dieter']
data.flatMap(lambda line: line.split(',')).take(7)
['Apple', 'Amy', 'Butter', 'Bob', 'Cheese', 'Chucky', 'Dinkel']
Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
Only works with pair RDDs.
data = sc.textFile(file1)
data = data.map(lambda line: line.split(','))
data.take(3)
[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]
data.collect()
[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky'], ['Dinkel', 'Dieter'], ['Egg', 'Edward'], ['Oxtail', 'Oscar'], ['Anchovie', 'Alex'], ['Avocado', 'Adam'], ['Apple', 'Alex'], ['Apple', 'Adam'], ['Dinkel', 'Dieter'], ['Doughboy', 'Pilsbury'], ['McDonald', 'Ronald']]
data = data.map(lambda pair: (pair[0], pair[1]))
data.take(3)
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]
data.mapValues(lambda name: name.lower()).take(3)
[('Apple', 'amy'), ('Butter', 'bob'), ('Cheese', 'chucky')]
Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning.
Only works with pair RDDs.
data = sc.textFile(file1)
data = data.map(lambda line: line.split(','))
data = data.map(lambda pair: (pair[0], pair[1]))
data.take(3)
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]
data.flatMapValues(lambda name: name.lower()).take(9)
[('Apple', 'a'), ('Apple', 'm'), ('Apple', 'y'), ('Butter', 'b'), ('Butter', 'o'), ('Butter', 'b'), ('Cheese', 'c'), ('Cheese', 'h'), ('Cheese', 'u')]
Return a new RDD containing only the elements that satisfy a predicate.
data = sc.textFile(file1)
data.take(3)
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']
data.filter(lambda line: re.match(r'^[AEIOU]', line)).take(3)
['Apple,Amy', 'Egg,Edward', 'Oxtail,Oscar']
data.filter(lambda line: re.match(r'^[AEIOU]', line)).collect()
['Apple,Amy', 'Egg,Edward', 'Oxtail,Oscar', 'Anchovie,Alex', 'Avocado,Adam', 'Apple,Alex', 'Apple,Adam']
data.filter(lambda line: re.match(r'.+[y]$', line)).take(3)
['Apple,Amy', 'Cheese,Chucky', 'Doughboy,Pilsbury']
data.filter(lambda line: re.search(r'[x]$', line)).take(3)
['Anchovie,Alex', 'Apple,Alex']
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
Only works with pair RDDs.
data = sc.textFile(file1)
data = data.map(lambda line: line.split(','))
data.take(3)
[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]
data = data.map(lambda pair: (pair[0], pair[1]))
data.take(3)
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]
data.groupByKey().take(1)
[('Apple', <pyspark.resultiterable.ResultIterable at 0x7f2ba8087b50>)]
for pair in data.groupByKey().take(1):
print("%s: %s" % (pair[0], ",".join([n for n in pair[1]])))
Apple: Amy,Alex,Adam
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
data = sc.textFile(file1)
data = data.map(lambda line: line.split(","))
data = data.map(lambda pair: (pair[0], pair[1]))
data.take(3)
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]
data.reduceByKey(lambda v1, v2: v1 + ":" + v2).take(6)
[('Apple', 'Amy:Alex:Adam'), ('Butter', 'Bob'), ('Dinkel', 'Dieter:Dieter'), ('Doughboy', 'Pilsbury'), ('Cheese', 'Chucky'), ('Egg', 'Edward')]
Sorts this RDD by the given keyfunc.
data = sc.textFile(file1)
data = data.map(lambda line: line.split(","))
data = data.map(lambda pair: (pair[0], pair[1]))
data.collect()
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky'), ('Dinkel', 'Dieter'), ('Egg', 'Edward'), ('Oxtail', 'Oscar'), ('Anchovie', 'Alex'), ('Avocado', 'Adam'), ('Apple', 'Alex'), ('Apple', 'Adam'), ('Dinkel', 'Dieter'), ('Doughboy', 'Pilsbury'), ('McDonald', 'Ronald')]
data.sortBy(lambda pair: pair[1][1]).take(10)
[('Egg', 'Edward'), ('Avocado', 'Adam'), ('Apple', 'Adam'), ('Cheese', 'Chucky'), ('Dinkel', 'Dieter'), ('Dinkel', 'Dieter'), ('Doughboy', 'Pilsbury'), ('Anchovie', 'Alex'), ('Apple', 'Alex'), ('Apple', 'Amy')]
Sorts this RDD, which is assumed to consist of (key, value) pairs.
data = sc.textFile(file1)
data = data.map(lambda line: line.split(","))
data = data.map(lambda pair: (pair[0], pair[1]))
data.collect()
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky'), ('Dinkel', 'Dieter'), ('Egg', 'Edward'), ('Oxtail', 'Oscar'), ('Anchovie', 'Alex'), ('Avocado', 'Adam'), ('Apple', 'Alex'), ('Apple', 'Adam'), ('Dinkel', 'Dieter'), ('Doughboy', 'Pilsbury'), ('McDonald', 'Ronald')]
data.sortByKey().take(6)
[('Anchovie', 'Alex'), ('Apple', 'Amy'), ('Apple', 'Alex'), ('Apple', 'Adam'), ('Avocado', 'Adam'), ('Butter', 'Bob')]
Return each value in self that is not contained in other.
data1 = sc.textFile(file1)
data1.collect()
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky', 'Dinkel,Dieter', 'Egg,Edward', 'Oxtail,Oscar', 'Anchovie,Alex', 'Avocado,Adam', 'Apple,Alex', 'Apple,Adam', 'Dinkel,Dieter', 'Doughboy,Pilsbury', 'McDonald,Ronald']
data1.count()
13
data2 = sc.textFile(file2)
data2.collect()
['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']
data2.count()
4
data1.subtract(data2).collect()
['Egg,Edward', 'Doughboy,Pilsbury', 'Oxtail,Oscar', 'Apple,Alex', 'Apple,Amy', 'Butter,Bob', 'Anchovie,Alex', 'Avocado,Adam', 'Dinkel,Dieter', 'Dinkel,Dieter', 'Apple,Adam']
data1.subtract(data2).count()
11
Return an RDD containing all pairs of elements with matching keys in self and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.
data1 = sc.textFile(file1).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))
data1.collect()
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky'), ('Dinkel', 'Dieter'), ('Egg', 'Edward'), ('Oxtail', 'Oscar'), ('Anchovie', 'Alex'), ('Avocado', 'Adam'), ('Apple', 'Alex'), ('Apple', 'Adam'), ('Dinkel', 'Dieter'), ('Doughboy', 'Pilsbury'), ('McDonald', 'Ronald')]
data1.count()
13
data2 = sc.textFile(file2).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))
data2.collect()
[('Wendy', ''), ('Doughboy', 'Pillsbury'), ('McDonald', 'Ronald'), ('Cheese', 'Chucky')]
data2.count()
4
data1.join(data2).collect()
[('Doughboy', ('Pilsbury', 'Pillsbury')), ('McDonald', ('Ronald', 'Ronald')), ('Cheese', ('Chucky', 'Chucky'))]
data1.join(data2).count()
3
data1.fullOuterJoin(data2).take(2)
[('Dinkel', ('Dieter', None)), ('Dinkel', ('Dieter', None))]
# stop Spark context
sc.stop()
We will now count the occurences of each word. The typical "Hello, world!" app for Spark applications is known as word count. The map/reduce model is particularly well suited to applications like counting words in a document.
# create a Spark context
sc = SparkContext(conf=conf)
# read the target file into an RDD
lines = sc.textFile(file1)
lines.take(3)
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']
The flatMap()
operation first converts each line into an array of words, and then makes
each of the words an element in the new RDD.
# split the lines into individual words
words = lines.flatMap(lambda l: re.split(r'[^\w]+', l))
words.take(3)
['Apple', 'Amy', 'Butter']
The map()
operation replaces each word with a tuple of that word and the number 1. The
pairs RDD is a pair RDD where the word is the key, and all of the values are the number 1.
# replace each word with a tuple of that word and the number 1
pairs = words.map(lambda w: (w, 1))
pairs.take(3)
[('Apple', 1), ('Amy', 1), ('Butter', 1)]
The reduceByKey()
operation keeps adding elements' values together until there are no
more to add for each key (word).
# group the elements of the RDD by key (word) and add up their values
counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)
counts.take(3)
[('Apple', 3), ('Amy', 1), ('Butter', 1)]
# sort the elements by values in descending order
counts.sortBy(lambda pair: pair[1], ascending=False).take(10)
[('Apple', 3), ('Dinkel', 2), ('Alex', 2), ('Dieter', 2), ('Adam', 2), ('Amy', 1), ('Butter', 1), ('Chucky', 1), ('Edward', 1), ('Doughboy', 1)]
It is good to know that the code above can also be written in the following way:
sorted_counts = (lines.flatMap(lambda l: re.split(r'[^\w]+', l)) # words
.map(lambda w: (w, 1)) # pairs
.reduceByKey(lambda n1, n2: n1 + n2) # counts
.sortBy(lambda pair: pair[1], ascending=False)) # sorted counts
sorted_counts.take(10)
[('Apple', 3), ('Dinkel', 2), ('Alex', 2), ('Dieter', 2), ('Adam', 2), ('Amy', 1), ('Butter', 1), ('Chucky', 1), ('Edward', 1), ('Doughboy', 1)]
# stop Spark context
sc.stop()