Spark Something Practical
2 min readEdit on GitHub
Spark Something Practical
This guide assumes your
data.csv is located at:hljs jsx
hdfs:///data/data.csvEverything below works inside:
hljs jsx
spark-shell0. Check HDFS Path (Before Starting)
hljs jsx
hdfs dfs -ls /dataYou must see:
hljs jsx
/data/data.csv1. Read the File (Basic RDD Operation)
Spark loads the file as an RDD (Distributed dataset):
Scala:
- For Windows
hljs jsx
val rdd = sc.textFile("hdfs:///data/data.csv")
rdd.take(10).foreach(println)- For Mac
hljs jsx
val rdd = sc.textFile("hdfs://localhost:9000/<foldername>/data.csv")
rdd.take(10).foreach(println)2. Word Count (Spark Equivalent of MapReduce WordCount)
hljs jsx
val words = rdd.flatMap(_.split("\\W+")).filter(_.nonEmpty).map(_.toLowerCase)
val counts = words.map((_,1)).reduceByKey(_+_)
counts.take(20).foreach(println)3. Search / Grep (Spark Equivalent of MapReduce grep)
hljs jsx
val myword = rdd.filter(_.contains("Eminem"))
errors.take(10).foreach(println)4. Sorting the Dataset (Spark Equivalent of MR Sort)
Sort entire file alphabetically:
hljs jsx
val sorted = rdd.sortBy(x => x)
sorted.take(20).foreach(println)5. Count Lines / Count Words / Count Characters
Count lines
hljs scala
rdd.count()Count characters (per line)
hljs scala
rdd.map(_.length).take(20)Count total characters
hljs scala
rdd.map(_.length).reduce(_ + _)7. Top 10 Most Frequent Words (Simple Analytics)
hljs jsx
counts.takeOrdered(10)(Ordering.by(-_._2))8. Convert Your CSV Into Structured DataFrame
If your
data.csv has simple comma-separated fields, use DataFrames:- For Windows
hljs jsx
val df = spark.read.option("header","false").csv("hdfs:///data/data.csv")
df.show(10)- For Mac
hljs jsx
val df = spark.read.option("header","false").csv("hdfs://localhost:9000/<foldername>/data.csv")
df.show(10)DataFrame makes Spark 10x easier.
9. DataFrame Analytics
Assume you want to count rows:
hljs jsx
df.count()Filter rows:
hljs jsx
df.filter($"_c0".contains("error")).show()Sort:
hljs jsx
df.orderBy($"_c0".asc).show()Add new column:
hljs jsx
df.withColumn("len", length($"_c0")).show()10. Save Output Back to HDFS
RDD:
- For Windows
hljs jsx
counts.saveAsTextFile("hdfs:///output/wordcount_out")- For Mac
hljs jsx
counts.saveAsTextFile("hdfs://localhost:9000/output/wordcount_out")DataFrame:
hljs jsx
df.write.csv("hdfs:///output/df_out")- For Mac
hljs jsx
df.write.csv("hdfs://localhost:9000/output/df_out")11. Full Simple Pipeline Example
- For Windows
hljs jsx
val file = sc.textFile("hdfs:///data/data.csv")
val cleaned = file.flatMap(_.split("\\W+")).filter(_.nonEmpty).map(_.toLowerCase)
val wc = cleaned.map((_,1)).reduceByKey(_+_)
wc.saveAsTextFile("hdfs:///output/my_wordcount")- For Mac
hljs jsx
val file = sc.textFile("hdfs://localhost:9000/<foldername>/data.csv")
val cleaned = file.flatMap(_.split("\\W+")).filter(_.nonEmpty).map(_.toLowerCase)
val wc = cleaned.map((_,1)).reduceByKey(_+_)
wc.saveAsTextFile("hdfs://localhost:9000/output/my_wordcount")Now Just Exit From Scala:
hljs jsx
hdfs dfs -ls /output/my_wordcountYou should see something like:
hljs jsx
/output/my_wordcount/_SUCCESS
/output/my_wordcount/part-00000
/output/my_wordcount/part-00001Spark creates multiple output files = one per partition.
hljs jsx
hdfs dfs -cat /output/my_wordcount/part-00000or
hljs jsx
hdfs dfs -cat /output/my_wordcount/part-00001or
hljs jsx
hdfs dfs -cat /output/my_wordcount/part-* | head