Apache Spark 1.2 with PySpark (Spark Python API) Wordcount using CDH5
The Spark Python API (PySpark) exposes the Spark programming model to Python (Spark - Python Programming Guide)
Picture from PySpark Internals
PySpark is built on top of Spark's Java API. Data is processed in Python and cached / shuffled in the JVM. In the Python driver program, SparkContext uses Py4J to launch a JVM and create a JavaSparkContext. Py4J is only used on the driver for local communication between the Python and Java SparkContext objects; large data transfers are performed through a different mechanism. RDD transformations in Python are mapped to transformations on PythonRDD objects in Java. On remote worker machines, PythonRDD objects launch Python subprocesses and communicate with them using pipes, sending the user's code and the data to be processed.
Here is our code:
from __future__ import print_function import sys from operator import add from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: wordcount", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonWordCount") lines = sc.textFile(sys.argv[1], 1) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) sc.stop()
We put the input file (Robert Frost's "The Road Not Taken") into HDFS:
$ sudo su hdfs bash-4.1$ hadoop fs -copyFromLocal two_roads.txt /user/hdfs bash-4.1$ hadoop fs -ls /user/hdfs Found 3 items drwxr-xr-x - hdfs supergroup 0 2015-05-02 17:00 /user/hdfs/.Trash drwxrwxrwx - hdfs supergroup 0 2015-04-05 13:25 /user/hdfs/flume-channel -rw-r--r-- 1 hdfs supergroup 731 2015-05-02 16:58 /user/hdfs/two_roads.txt
export SPARK_HOME=/usr/lib/spark export PYTHONPATH=$SPARK_HOME/python export PYTHONPATH=$SPARK_HOME/python/build/:$PYTHONPATH
[cloudera@quickstart MYSPARK]$ python pySparkWordCount.py hdfs:///user/hdfs/two_roads.txt 15/05/02 20:53:22 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0) 15/05/02 20:53:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/05/02 20:53:22 INFO SecurityManager: Changing view acls to: cloudera 15/05/02 20:53:22 INFO SecurityManager: Changing modify acls to: cloudera 15/05/02 20:53:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 15/05/02 20:53:23 INFO Slf4jLogger: Slf4jLogger started 15/05/02 20:53:23 INFO Remoting: Starting remoting 15/05/02 20:53:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.2.15:44296] 15/05/02 20:53:23 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@10.0.2.15:44296] 15/05/02 20:53:23 INFO Utils: Successfully started service 'sparkDriver' on port 44296. 15/05/02 20:53:23 INFO SparkEnv: Registering MapOutputTracker 15/05/02 20:53:23 INFO SparkEnv: Registering BlockManagerMaster 15/05/02 20:53:23 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150502205323-05c0 15/05/02 20:53:23 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 15/05/02 20:53:24 INFO HttpFileServer: HTTP File server directory is /tmp/spark-150ceac1-29b0-46da-b772-912a71b5f689 15/05/02 20:53:24 INFO HttpServer: Starting HTTP Server 15/05/02 20:53:24 INFO Utils: Successfully started service 'HTTP file server' on port 59341. 15/05/02 20:53:25 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/05/02 20:53:25 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040 15/05/02 20:53:25 INFO AppClient$ClientActor: Connecting to master spark://quickstart.cloudera:7077... 15/05/02 20:53:25 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150502205325-0007 15/05/02 20:53:25 INFO AppClient$ClientActor: Executor added: app-20150502205325-0007/0 on worker-20150430084112-10.0.2.15-7078 (10.0.2.15:7078) with 1 cores 15/05/02 20:53:25 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150502205325-0007/0 on hostPort 10.0.2.15:7078 with 1 cores, 512.0 MB RAM 15/05/02 20:53:25 INFO AppClient$ClientActor: Executor updated: app-20150502205325-0007/0 is now LOADING 15/05/02 20:53:25 INFO AppClient$ClientActor: Executor updated: app-20150502205325-0007/0 is now RUNNING 15/05/02 20:53:26 INFO NettyBlockTransferService: Server created on 33968 15/05/02 20:53:26 INFO BlockManagerMaster: Trying to register BlockManager 15/05/02 20:53:26 INFO BlockManagerMasterActor: Registering block manager 10.0.2.15:33968 with 267.3 MB RAM, BlockManagerId(, 10.0.2.15, 33968) 15/05/02 20:53:26 INFO BlockManagerMaster: Registered BlockManager 15/05/02 20:53:29 INFO EventLoggingListener: Logging events to hdfs://quickstart.cloudera:8020/user/spark/applicationHistory/app-20150502205325-0007 15/05/02 20:53:30 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/05/02 20:53:30 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@10.0.2.15:57280/user/Executor#-871914476] with ID 0 15/05/02 20:53:31 INFO MemoryStore: ensureFreeSpace(259846) called with curMem=0, maxMem=280248975 15/05/02 20:53:31 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 253.8 KB, free 267.0 MB) 15/05/02 20:53:31 INFO MemoryStore: ensureFreeSpace(21134) called with curMem=259846, maxMem=280248975 15/05/02 20:53:31 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.6 KB, free 267.0 MB) 15/05/02 20:53:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.15:33968 (size: 20.6 KB, free: 267.2 MB) 15/05/02 20:53:31 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/05/02 20:53:31 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2 15/05/02 20:53:31 INFO BlockManagerMasterActor: Registering block manager 10.0.2.15:53772 with 267.3 MB RAM, BlockManagerId(0, 10.0.2.15, 53772) 15/05/02 20:53:31 INFO FileInputFormat: Total input paths to process : 1 15/05/02 20:53:31 INFO SparkContext: Starting job: collect at pySparkWordCount.py:17 15/05/02 20:53:31 INFO DAGScheduler: Registering RDD 4 (reduceByKey at pySparkWordCount.py:16) 15/05/02 20:53:31 INFO DAGScheduler: Got job 0 (collect at pySparkWordCount.py:17) with 1 output partitions (allowLocal=false) 15/05/02 20:53:31 INFO DAGScheduler: Final stage: Stage 1(collect at pySparkWordCount.py:17) 15/05/02 20:53:31 INFO DAGScheduler: Parents of final stage: List(Stage 0) 15/05/02 20:53:31 INFO DAGScheduler: Missing parents: List(Stage 0) 15/05/02 20:53:31 INFO DAGScheduler: Submitting Stage 0 (PairwiseRDD[4] at reduceByKey at pySparkWordCount.py:16), which has no missing parents 15/05/02 20:53:31 INFO MemoryStore: ensureFreeSpace(7792) called with curMem=280980, maxMem=280248975 15/05/02 20:53:31 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 7.6 KB, free 267.0 MB) 15/05/02 20:53:31 INFO MemoryStore: ensureFreeSpace(5007) called with curMem=288772, maxMem=280248975 15/05/02 20:53:31 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.9 KB, free 267.0 MB) 15/05/02 20:53:31 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:33968 (size: 4.9 KB, free: 267.2 MB) 15/05/02 20:53:31 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/05/02 20:53:31 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/05/02 20:53:31 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (PairwiseRDD[4] at reduceByKey at pySparkWordCount.py:16) 15/05/02 20:53:31 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/05/02 20:53:31 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.0.2.15, ANY, 1308 bytes) 15/05/02 20:53:32 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:53772 (size: 4.9 KB, free: 267.3 MB) 15/05/02 20:53:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.15:53772 (size: 20.6 KB, free: 267.2 MB) 15/05/02 20:53:35 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3378 ms on 10.0.2.15 (1/1) 15/05/02 20:53:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/05/02 20:53:35 INFO DAGScheduler: Stage 0 (reduceByKey at pySparkWordCount.py:16) finished in 3.554 s 15/05/02 20:53:35 INFO DAGScheduler: looking for newly runnable stages 15/05/02 20:53:35 INFO DAGScheduler: running: Set() 15/05/02 20:53:35 INFO DAGScheduler: waiting: Set(Stage 1) 15/05/02 20:53:35 INFO DAGScheduler: failed: Set() 15/05/02 20:53:35 INFO DAGScheduler: Missing parents for Stage 1: List() 15/05/02 20:53:35 INFO DAGScheduler: Submitting Stage 1 (PythonRDD[7] at collect at pySparkWordCount.py:17), which is now runnable 15/05/02 20:53:35 INFO MemoryStore: ensureFreeSpace(4576) called with curMem=293779, maxMem=280248975 15/05/02 20:53:35 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.5 KB, free 267.0 MB) 15/05/02 20:53:35 INFO MemoryStore: ensureFreeSpace(2802) called with curMem=298355, maxMem=280248975 15/05/02 20:53:35 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.7 KB, free 267.0 MB) 15/05/02 20:53:35 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.15:33968 (size: 2.7 KB, free: 267.2 MB) 15/05/02 20:53:35 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/05/02 20:53:35 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838 15/05/02 20:53:35 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (PythonRDD[7] at collect at pySparkWordCount.py:17) 15/05/02 20:53:35 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 15/05/02 20:53:35 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, 10.0.2.15, PROCESS_LOCAL, 1056 bytes) 15/05/02 20:53:35 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.15:53772 (size: 2.7 KB, free: 267.2 MB) 15/05/02 20:53:35 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@10.0.2.15:57280 15/05/02 20:53:35 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 138 bytes 15/05/02 20:53:35 INFO DAGScheduler: Stage 1 (collect at pySparkWordCount.py:17) finished in 0.161 s 15/05/02 20:53:35 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 175 ms on 10.0.2.15 (1/1) 15/05/02 20:53:35 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/05/02 20:53:35 INFO DAGScheduler: Job 0 finished: collect at pySparkWordCount.py:17, took 3.977545 s : 6 all: 1 just: 1 less: 1 hence:: 1 Had: 1 yellow: 1 leads: 1 not: 1 trodden: 1 Oh,: 1 had: 1 fair,: 1 better: 1 to: 1 sorry: 1 has: 1 them: 1 far: 1 wood,: 2 telling: 1 one: 3 Because: 1 morning: 1 by,: 1 where: 1 difference.: 1 other,: 1 sigh: 1 wear,: 1 really: 1 stood: 1 both: 2 for: 2 ever: 1 day!: 1 knowing: 1 be: 2 step: 1 wanted: 1 come: 1 on: 1 about: 1 could: 2 passing: 1 black.: 1 Yet: 1 first: 1 equally: 1 Somewhere: 1 Two: 2 traveled: 1 down: 1 another: 1 roads: 2 should: 1 grassy: 1 Though: 1 there: 1 long: 1 way: 2 was: 1 that: 3 took: 2 traveler,: 1 same,: 1 with: 1 And: 6 made: 1 this: 1 worn: 1 leaves: 1 and: 3 claim: 1 bent: 1 ages: 2 it: 2 To: 1 as: 5 diverged: 2 in: 3 undergrowth;: 1 if: 1 Then: 1 no: 1 perhaps: 1 travel: 1 how: 1 I,: 1 shall: 1 I: 8 lay: 1 a: 3 kept: 1 back.: 1 looked: 1 doubted: 1 In: 1 the: 8 having: 1 15/05/02 20:53:35 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4040 15/05/02 20:53:35 INFO DAGScheduler: Stopping DAGScheduler 15/05/02 20:53:35 INFO SparkDeploySchedulerBackend: Shutting down all executors 15/05/02 20:53:35 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 15/05/02 20:53:36 ERROR TaskSchedulerImpl: Lost executor 0 on 10.0.2.15: remote Akka client disassociated 15/05/02 20:53:37 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/05/02 20:53:37 INFO MemoryStore: MemoryStore cleared 15/05/02 20:53:37 INFO BlockManager: BlockManager stopped 15/05/02 20:53:37 INFO BlockManagerMaster: BlockManagerMaster stopped 15/05/02 20:53:37 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/05/02 20:53:37 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/05/02 20:53:37 INFO SparkContext: Successfully stopped SparkContext 15/05/02 20:53:37 INFO Remoting: Remoting shut down 15/05/02 20:53:37 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization