Spark notes
From Simson Garfinkel
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.
Spark References
Videos
- GU ANLY 502, 2017 (From the course I taught at Georgetown)
Books
- Learning Spark
- Spark: The Definitive Guide
- High Performance Spark
- Python for Data Analysis: Data Wrangling with Pandas, NumPy, and IPython
Spark Ideas
Spark on MacOS
You can run Spark locally on a Mac without having a cluster. Spark will use all of your available CPUs.
Use this:
- Install anaconda
- conda install pyspark
File Management
Adding files to the nodes:
sc.addFile(filename)
- https://medium.com/@rbahaguejr/adding-python-files-to-pyspark-job-b725e02c8ab2
- https://medium.com/@rbahaguejr/adding-python-files-to-pyspark-job-b725e02c8ab2
Tuning
- https://stackoverflow.com/questions/37871194/how-to-tune-spark-executor-number-cores-and-executor-memory
- https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
- http://site.clairvoyantsoft.com/understanding-resource-allocation-configurations-spark-application/
Spark Practce
- https://developerzen.com/best-practices-writing-production-grade-pyspark-jobs-cb688ac4d20f (most about packaging and a shared context)
Demos
Find the nodes where you are running, entirely from within Spark (using EMR). Below was run on a 2-node cluster.
$ sudo pip-3.4 install ipython $ cat func.py def myfun(a): import socket,os return socket.gethostname()+"-"+str(os.getpid()) [hadoop@ip-10-239-83-234 ~]$ PYSPARK_DRIVER_PYTHON=ipython3 PYSPARK_PYTHON=python34 pyspark --py-files func.py Python 3.4.3 (default, Sep 1 2016, 23:33:38) Type 'copyright', 'credits' or 'license' for more information IPython 6.2.0 -- An enhanced Interactive Python. Type '?' for help. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/09/21 18:11:49 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 17/09/21 18:12:05 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials]. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Python version 3.4.3 (default, Sep 1 2016 23:33:38) SparkSession available as 'spark'. In [1]: from func import myfun In [2]: myfun(10) Out[2]: 'ip-10-239-83-234-24121' In [5]: a = sc.parallelize(range(1,10)).map(lambda num:myfun(num)) In [6]: a.take(10) Out[6]: ['ip-10-144-32-86-32083', 'ip-10-144-32-86-32083', 'ip-10-144-32-86-32091', 'ip-10-144-32-86-32091', 'ip-10-144-32-86-32088', 'ip-10-144-32-86-32088', 'ip-10-144-32-86-32094', 'ip-10-144-32-86-32094', 'ip-10-144-32-86-32094'] In [10]: sc.parallelize(range(1,10)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect() Out[10]: [('ip-10-144-32-86-32274', 3), ('ip-10-144-32-86-32271', 2), ('ip-10-144-32-86-32264', 2), ('ip-10-144-32-86-32267', 2)] In [11]: sc.parallelize(range(1,1000)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect() Out[11]: [('ip-10-144-32-86-32287', 249), ('ip-10-144-32-86-32290', 250), ('ip-10-144-32-86-32296', 250), ('ip-10-144-32-86-32284', 250)] In [12]: sc.parallelize(range(1,1000*1000)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect() Out[12]: [('ip-10-144-32-86-32323', 249999), ('ip-10-144-32-86-32330', 250000), ('ip-10-144-32-86-32320', 250000), ('ip-10-144-32-86-32326', 250000)]