3 : Apache Spark 2017 10 20 2017 10 20 1 / 32
2011 1.8ZB 2020 35ZB 1ZB = 10 21 = 1,000,000,000,000 GB Word Excel XML CSV JSON text... 2017 10 20 2 / 32
CPU SPECfp Pentium G3420 77.6 8,946 Xeon Gold 6128 1,470 22 Xeon Platinum 8180 1,770 130 PRIMERGY RX1330 64GB 19 PRIMERGY RX2530 768GB 45 PRIMERGY RX4770 1536GB 180 1TB SATA 200MB/s 2500 SAS 300MB/s 5.1 SSD 600MB/s 10 2017 10 20 3 / 32
CPU 2017 10 20 4 / 32
R Excel Excel 105 2650 1 2017 10 20 5 / 32
Mapreduce MapReduce Hadoop/Spark Map: Reduce: Map Hadoop/Spark1.x Spark 2.x 2017 10 20 6 / 32
Apache Spark 2017 10 20 7 / 32
twitter (JSON) /project/bigdata-lab/bda/tweet_20171004.json 150GB (10/15 ) 2017.10.4 2017.10.13 twitter 1% spark % time cat /project/bigdata-lab/bda/tweet_20171004.json wc -l 26511626 cat tweet_20171004.json 0.13s user 46.46s system 4% cpu 18:29.32 total wc -l 19.21s user 29.22s system 4% cpu 18:29.32 total 18 20 I/O 2017 10 20 8 / 32
JSON CSV XML JavaScript {"id":915179801954025472, "text":" \n ","place":null,"lang":"ja"} { :, :, :, :,...} { :, :{ :{ :, :,...}, :,...}} 2017 10 20 9 / 32
{...,"lang":"ja",...} % lang : ja % cat /project/bigdata-lab/bda/tweet_20171004.json grep \"lang\":\"ja\" w ) 2017 10 20 10 / 32
Spark MLlib GraphX Spark Scala, Java, Python, R Hadoop HDFS Hive Text CSV JSON 2017 10 20 11 / 32
Hadoop Hadoop Spark bda1node0x (x 01 18 ) bda1node01 04 ssh bda1node05 $ ssh bda1node05 Last login: Mon Oct 16 16:00:40 2017 from mm-dhcp-128-012.naist.jp 2017 10 19 12:12:14 JST [ysuzuki@bda1node05 ~]$ mandara $ cat /project/bigdata-lab/bda/tweet_20171004.json head [Tue Oct 03 20:40:40 JST 2017]Establishing connection. [Tue Oct 03 20:40:42 JST 2017]Connection established. [Tue Oct 03 20:40:42 JST 2017]Receiving status stream. {"in_reply_to_status_id_str":null,"in_reply_to_status_id":null, "created_at":"tue Oct 03 11:40:42 +0000 2017","in_reply_to _user_id_str":null,"source":"<a href=\"http://twitter.com/download/andro 2017 10 20 12 / 32
HDFS Hadoop Spark Linux HDFS HDFS % hadoop fs -put /project/bigdata-lab/bda/tweet_20171004.json HDFS % hadoop fs -ls -h Found 4 items drwx------ - ysuzuki is-staff 0 2017-10-18 09:00.Trash drwxr-xr-x - ysuzuki is-staff 0 2017-10-16 18:59.sparkStaging drwx------ - ysuzuki is-staff 0 2017-01-02 19:32.staging -rw-r--r-- 3 ysuzuki is-staff 152.1 G 2017-10-16 13:42 tweet_20171004.json 2017 10 20 13 / 32
Spark Spark Scala Java Python R Python spark % pyspark2 --master yarn % pyspark2 % spark2-shell --master yarn scala % pyspark2 --driver-memory 16g --executor-memory 16g --master yarn Welcome to / / / / \ \/ _ \/ _ / / _/ / /. /\_,_/_/ /_/\_\ version 2.0.0.cloudera2 /_/ Using Python version 2.6.6 (r266:84292, Aug 18 2016 08:36:59) SparkSession available as spark. >>> http://bda1node03.naist.jp:8088 2017 10 20 14 / 32
DataFrame Spark (DataFrame) (select,filter,groupby) (join) (read, show) https://spark.apache.org/docs/2.0.0/api/python/index.html 2017 10 20 15 / 32
>>> df = spark.read.json("/user/ysuzuki/tweet_20171004.json") df tweet_20171004.json >>> df.count() 33573798 2017 10 20 16 / 32
>>> df.filter("lang= ja ").count() 6600510 ja en es groupby 2017 10 20 17 / 32
>>> df.groupby("lang").count().sort("count", ascending=false).show() +----+-------+ lang count +----+-------+ en 7057160 ja 4458359 es 2002657 ar 1731097 und 1447410 ko 1307743 pt 1280456 th 759078... +----+-------+ only showing top 20 rows 2017 10 20 18 / 32
1) 2) Spark 2017 10 20 19 / 32
count.py count.py from pyspark.sql import SparkSession spark = SparkSession \.builder \.appname("app example") \.config("master", "yarn") \.getorcreate() df = spark.read.json("tweet_20171004.json") print df.count() count.py % spark2-submit count.py tee output.txt % cat output.txt 33573798 2017 10 20 20 / 32
>>> from pyspark.sql.functions import explode,split >>> df.filter("lang = en ").select("text").distinct().select(explode(split( text, ))).groupby("col").count().sort("count",ascending=false).show() +----+-------+ col count +----+-------+ RT 1735023 the 1218371 to 1136336 a 883388 I 737669... be 248032 me 239061 +----+-------+ only showing top 20 rows 2017 10 20 21 / 32
>>> df.groupby("text").count().show() +--------------------+-----+ text count +--------------------+-----+ Big economic call... 1 @TomiLahren reall... 1 RT @PossumPastor:... 2 RT @dsmesk:... 13 count >>> df.groupby("text").count().sort("count",ascending=false).show() +--------------------+-----+ text count +--------------------+-----+ RT @akiko_lawson:... 4284 RT @Kaepernick7:... 3747 RT @RodriguezDaGo... 3419 RT @TheRealNyha:... 3305 2017 10 20 22 / 32
>>> df.groupby("text").count().sort("count",ascending=false).show(20,false) +--------------------------------------------------------------------------- text +--------------------------------------------------------------------------- RT @akiko_lawson: #L 10/15 1 #L (^^) 3 10/13 10:59 # 4284 RT @Kaepernick7: I appreciate you @Eminem https://t.co/nwavbwsokq Eminem 2017 10 20 23 / 32
>>> df.groupby("user.name").count().sort("count",ascending=false).show(20,false) +----+-----+ name count +----+-----+. 53716 17986 15129-14615 ; 11380 10926 9044 ID 2017 10 20 24 / 32
ID >>> a = df.groupby("user.id").count().sort("count",ascending=false) >>> a.show() +------------------+-----+ id count +------------------+-----+ 115639376 9038 4823945834 3736 1662830792 3445 856385582401966080 3242 2669983818 3015 796251890908434432 2663 104120518 1592 2017 10 20 25 / 32
ID >>> b = df.select("user.id","user.name").distinct() >>> b.show() +------------------+--------------------+ id name +------------------+--------------------+ 767396295665299456 859974072834310144 [6 ] 535819067 Vibes 517553112 Liseth Valencia R. 2017 10 20 26 / 32
a b >>> c = a.join(b, a.id == b.id, inner ).select("name","count") >>> c.show() +------------------+-----+ name count +------------------+-----+ 9038 McDonalds Japan 3736 3445 3242 Test Account1 3015 (NESCAF ) 2663 1592 1496 2017 10 20 27 / 32
text >>> df.where(col( text ).like("% %")).select( text ).show() +--------------------+ text +--------------------+... RT @keigomi29:... RT @shunchoukatsu... RT @tkq12:... RT @pentabutabu:... 2017 10 20 28 / 32
CSV >>> c.write.csv("output_csv") >>> exit $ hadoop fs -get output_csv output_csv Excel utf-8 Excel 2017 10 20 29 / 32
2017 10 20 30 / 32
3 20 1 2017 10 20 31 / 32
Apache Spark 100GB 2017 10 20 32 / 32