Difference between revisions of "Spark notes"

From Simson Garfinkel
Jump to navigationJump to search
m
 
(3 intermediate revisions by the same user not shown)
Line 1: Line 1:
= Spark References =
== Videos ==
* [https://www.youtube.com/playlist?list=PLIl5cUcspvNOS8wVc_O9Z5jzRXzkoTxa8 GU ANLY 502, 2017] (From the course I taught at Georgetown)
== Books ==
* [https://www.amazon.com/Learning-Spark-Lightning-Fast-Data-Analysis/dp/1449358624/ref=sr_1_3?keywords=high+performance+spark&qid=1563459239&s=gateway&sr=8-3 Learning Spark]
* [https://www.amazon.com/Spark-Definitive-Guide-Processing-Simple/dp/1491912219/ref=sr_1_4?keywords=high+performance+spark&qid=1563459239&s=gateway&sr=8-4 Spark: The Definitive Guide]
* [https://www.amazon.com/High-Performance-Spark-Practices-Optimizing/dp/1491943203/ref=sr_1_1?keywords=high+performance+spark&qid=1563459239&s=gateway&sr=8-1 High Performance Spark]
* [https://www.amazon.com/Python-Data-Analysis-Wrangling-IPython/dp/1491957662/ref=sr_1_3?keywords=python+for+data+analytics&qid=1563459260&s=gateway&sr=8-3 Python for Data Analysis: Data Wrangling with Pandas, NumPy, and IPython]
=Spark Ideas=
==Spark on MacOS==
==Spark on MacOS==
1. Install anaconda.
You can run Spark locally on a Mac without having a cluster. Spark will use all of your available CPUs.
2. pip install pyspark
 
Use this:
 
# Install anaconda
# conda install pyspark




==File Management==
Adding files to the nodes:
Adding files to the nodes:


Line 9: Line 24:


* https://medium.com/@rbahaguejr/adding-python-files-to-pyspark-job-b725e02c8ab2
* https://medium.com/@rbahaguejr/adding-python-files-to-pyspark-job-b725e02c8ab2
==Tutorials==
* https://medium.com/@rbahaguejr/adding-python-files-to-pyspark-job-b725e02c8ab2
* https://medium.com/@rbahaguejr/adding-python-files-to-pyspark-job-b725e02c8ab2


Line 20: Line 32:




==Good Blog Entries==
==Spark Practce==
* https://developerzen.com/best-practices-writing-production-grade-pyspark-jobs-cb688ac4d20f  (most about packaging and a shared context)
* https://developerzen.com/best-practices-writing-production-grade-pyspark-jobs-cb688ac4d20f  (most about packaging and a shared context)
==Demos==
Find the nodes where you are running, entirely from within Spark (using EMR). Below was run on a 2-node cluster.
<pre>
$ sudo pip-3.4 install ipython
$ cat func.py
def myfun(a):
    import socket,os
    return socket.gethostname()+"-"+str(os.getpid())
[hadoop@ip-10-239-83-234 ~]$ PYSPARK_DRIVER_PYTHON=ipython3 PYSPARK_PYTHON=python34 pyspark --py-files func.py
Python 3.4.3 (default, Sep  1 2016, 23:33:38)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.2.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/09/21 18:11:49 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
17/09/21 18:12:05 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
Welcome to
      ____              __
    / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
  /__ / .__/\_,_/_/ /_/\_\  version 2.2.0
      /_/
Using Python version 3.4.3 (default, Sep  1 2016 23:33:38)
SparkSession available as 'spark'.
In [1]: from func import myfun
In [2]: myfun(10)
Out[2]: 'ip-10-239-83-234-24121'
In [5]: a = sc.parallelize(range(1,10)).map(lambda num:myfun(num))
In [6]: a.take(10)
Out[6]:
['ip-10-144-32-86-32083',
'ip-10-144-32-86-32083',
'ip-10-144-32-86-32091',
'ip-10-144-32-86-32091',
'ip-10-144-32-86-32088',
'ip-10-144-32-86-32088',
'ip-10-144-32-86-32094',
'ip-10-144-32-86-32094',
'ip-10-144-32-86-32094']
In [10]: sc.parallelize(range(1,10)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect()
Out[10]:
[('ip-10-144-32-86-32274', 3),
('ip-10-144-32-86-32271', 2),
('ip-10-144-32-86-32264', 2),
('ip-10-144-32-86-32267', 2)]
In [11]: sc.parallelize(range(1,1000)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect()
Out[11]:
[('ip-10-144-32-86-32287', 249),
('ip-10-144-32-86-32290', 250),
('ip-10-144-32-86-32296', 250),
('ip-10-144-32-86-32284', 250)]
In [12]: sc.parallelize(range(1,1000*1000)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect()
Out[12]:
[('ip-10-144-32-86-32323', 249999),
('ip-10-144-32-86-32330', 250000),
('ip-10-144-32-86-32320', 250000),
('ip-10-144-32-86-32326', 250000)]
</pre>

Latest revision as of 05:29, 6 May 2020

Spark References

Videos

Books

Spark Ideas

Spark on MacOS

You can run Spark locally on a Mac without having a cluster. Spark will use all of your available CPUs.

Use this:

  1. Install anaconda
  2. conda install pyspark


File Management

Adding files to the nodes:

   sc.addFile(filename)

Tuning


Spark Practce

Demos

Find the nodes where you are running, entirely from within Spark (using EMR). Below was run on a 2-node cluster.

$ sudo pip-3.4 install ipython
$ cat func.py
def myfun(a):
    import socket,os
    return socket.gethostname()+"-"+str(os.getpid())

[hadoop@ip-10-239-83-234 ~]$ PYSPARK_DRIVER_PYTHON=ipython3 PYSPARK_PYTHON=python34 pyspark --py-files func.py
Python 3.4.3 (default, Sep  1 2016, 23:33:38)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.2.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/09/21 18:11:49 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
17/09/21 18:12:05 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 3.4.3 (default, Sep  1 2016 23:33:38)
SparkSession available as 'spark'.

In [1]: from func import myfun

In [2]: myfun(10)
Out[2]: 'ip-10-239-83-234-24121'

In [5]: a = sc.parallelize(range(1,10)).map(lambda num:myfun(num))

In [6]: a.take(10)
Out[6]:
['ip-10-144-32-86-32083',
 'ip-10-144-32-86-32083',
 'ip-10-144-32-86-32091',
 'ip-10-144-32-86-32091',
 'ip-10-144-32-86-32088',
 'ip-10-144-32-86-32088',
 'ip-10-144-32-86-32094',
 'ip-10-144-32-86-32094',
 'ip-10-144-32-86-32094']

In [10]: sc.parallelize(range(1,10)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect()
Out[10]:
[('ip-10-144-32-86-32274', 3),
 ('ip-10-144-32-86-32271', 2),
 ('ip-10-144-32-86-32264', 2),
 ('ip-10-144-32-86-32267', 2)]

In [11]: sc.parallelize(range(1,1000)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect()
Out[11]:
[('ip-10-144-32-86-32287', 249),
 ('ip-10-144-32-86-32290', 250),
 ('ip-10-144-32-86-32296', 250),
 ('ip-10-144-32-86-32284', 250)]

In [12]: sc.parallelize(range(1,1000*1000)).map(lambda num:(myfun(num),1)).reduceByKey(lambda a,b:a+b).collect()
Out[12]:
[('ip-10-144-32-86-32323', 249999),
 ('ip-10-144-32-86-32330', 250000),
 ('ip-10-144-32-86-32320', 250000),
 ('ip-10-144-32-86-32326', 250000)]