【PySpark入門】第4弾 実戦!RDDを操作してプログラムを作ってみる

記事タイトルとURLをコピーする

はじめに

こんにちは。孔子の80代目子孫兼ディベロップメントサービス課の孔です。最近、チアシードを買ってます。朝ヨーグルト+チアシード+バナナを食べてますが、すごく健康がよくなったような気がします。健康って、悪くなったらつらいのに、よくなってもあまり感じれないもんですね。幸せもなくなったらつらいのに、ある時はなかなか気づけないものなので、健康って幸せだなと思いました。孔子もそういってた気がします。

前回のブログ、3. RDDの理解ではRDDがどのようなものなのか及びいろいろ代表的なAPIを見てみました。RDDの概念と使い方のイメージがいまいちの方は、再度前回のブログをご確認いただけると幸いです。今回は、前回学んだ使い方をもとに、いくつかプログラムを実際作ってもう少し親しんでもらう会にしたらいいかと思い、実戦編を用意しました。要件をみて、どのようにRDDを操作すればプログラムが作れるか、工夫しながら手を動かしてみましょう。それでは、始めます!

実際プログラムを作ってみよう

あなたはあるイベントのアンケートの結果を集計する仕事を任されました。イベントは3日間行われ、大量のアンケートの回答をもらっています。アンケート結果はCSVファイル(後ほどデータ段落にデータを記載しておきます)でマーケティング部から渡されました。アンケート結果は、以下の項目が記載されています。

  • 名前

  • メールアドレス

  • 年齢

  • 性別

  • イベント満足度(5段階)

  • イベント会場評価(5段階)

提供されたCSVファイルから、あなたは以下の結果を知るためのプログラムを作る必要があります。(計3つ作成)

  1. イベント満足度・イベント会場評価の評点が計9点以上の来場客の名前・メールアドレス一覧(3日間全部)

  2. 男性・女性のイベント満足度の平均点(各日毎に)

  3. 年代(20代・30代・40代・50代・60代)によるイベント満足度の平均点およびどの年代からの評価が最もよかったのか(各日毎に)

今回のお仕事がうまくできれば、部長がツナマヨおにぎりをおごってくれるらしいです。頑張りましょう!(ツナマヨ好きじゃない方はご自身の好きなおにぎりの具をいれてください。ちなみに私は高菜が一番好きです)

準備

2. PySparkの環境構築構築したコンテナ環境にウェブブラウザからアクセスします。アクセスができましたら、ターミナルを開き、以下のコマンドを実行します。

$ git clone -b pyspark/vol4 https://github.com/kongyunbae/blog-src.git

git cloneをしたら、blog-srcフォルダーに入ります。srcディレクトリ配下には本ブログに記載された、私が考えたコードが入っています。それを見ないで先に書いてみたい!と思った方は、blog-src/dataにサンプルデータを載せていますので、それを使って要件1~3を満たすプログラムを書いてみてください。

※ 注意

コードを書く際には、必ず以下のコードを冒頭に入れてください。

from pyspark import SparkConf, SparkContext
 
conf = SparkConf().setMaster('local').setAppName('questionnaire_analysis_1')
sc = SparkContext(conf=conf)

SparkConf、SparkContextに関する説明が載ったリンクは本ブログの最下段の「参考サイト」に載せておきますので、ご参考ください。(※1, ※2)

また、テキストファイルからRDDを生成するメソッドはsc.textFile(Path)となります。以下サンプルです。

from pyspark import SparkConf, SparkContext
  
conf = SparkConf().setMaster('local').setAppName('questionnaire_analysis_1')
sc = SparkContext(conf=conf)

origin_rdd_1 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_1.txt')  # 1日目のアンケート結果RDD
origin_rdd_2 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_2.txt')  # 2日目のアンケート結果RDD
origin_rdd_3 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_3.txt')  # 3日目のアンケート結果RDD

データ

やってみる

今回は1個目のリクエストだけ解説しますが、同じ手順に従ってやれば2個目、3個目のリクエストも解決できます。考え方だけ参考いただいて、2,3個目のリクエストは直接実装してみましょう!(2021.8.13記載:Githubに2個目、3個目のリクエストの実装も載せる予定ですが、まだ未実装となります。できる限り早めにコードを実装してアップロードします。)

1個目のリクエスト

1個目のリクエストは、「イベント満足度・イベント会場評価の評点が計9点以上の来場客の名前・メールアドレス一覧(3日間全部)」でした。順序は以下となります(もちろん、他の方法もあります!これは一例です)

  1. 各来場客に一意となるキーを選定する
  2. サンプルデータを読み込み、目的に合った形のRDDが生成できる形に整形する(総評点が出せるようにする)
  3. RDDを生成する
  4. 各キーごとのイベント満足度・イベント会場評価の合計した値を出す
  5. 合計した値が9以上の項目だけをフィルタリングする
  6. 名前とメールアドレスを抽出したRDDを生成し、5の結果物とJoinする
  7. 一覧を出す

それでは、実際作ってみましょう。まずは各来場客にキーを選定しましょう。メールアドレスは絶対被ることがないので、よさそうですね。キーを決めたので、総評点(イベント満足度・イベント会場評価の合計点)が出せる形にRDDを整形しましょう。今回は(メールアドレス, [イベント満足度, イベント会場評価])の形にして、mapValuesを使って合計値を求めようと考えています。そのため、(メールアドレス, [イベント満足度, イベント会場評価])の形に最初に生成したRDDを整形します。ここまでのコードが以下となります。

from pyspark import SparkConf, SparkContext
 
conf = SparkConf().setMaster('local').setAppName('questionnaire_analysis_1')
sc = SparkContext(conf=conf)
 
# 3日分のデータを読み込んでRDD作成
origin_rdd_1 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_1.txt')
origin_rdd_2 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_2.txt')
origin_rdd_3 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_3.txt')
 
# 3日分のデータを一つのRDDにまとめる
origin_rdd = origin_rdd_1.union(origin_rdd_2)
origin_rdd = origin_rdd.union(origin_rdd_3)
 
# (メールアドレス, [イベント満足度, イベント会場評価])の形に変換する
new_rdd = origin_rdd.map(lambda x: x.split(','))
new_rdd = new_rdd.map(lambda x: (x[1], [x[4], x[5]]))

new_rddを生成する際に、最初split関数を使っていますが、これはtextFileを使用してRDDを生成すると一つの行の文字列が一塊のレコードになってしまうためです。textFileを使ってRDDを生成する際に気を付けないといけないポイントなので、覚えておくといいですね。

このように、map関数を使えば簡単に使用したい形にRDDを修正することができます。それでは、総評点を計算してみましょう。これはmapValues関数を使うと、簡単に計算ができます。

# 総評点を計算する
new_rdd = new_rdd.mapValues(lambda x: int(x[0]) + int(x[1]))

lambda式のx, yはリストの中に入っているそれぞれの要素(イベント満足度・イベント会場評価)になります。その合計値を返すので、ここまで処理が終わったらnew_rddの中身は(メールアドレス: 合計点)になっています。

また、ここで気を付けないといけないポイントは、Pythonは静的型付け言語でないため、intを使ってしっかりキャスティングをしてあげる必要があることです。stringになってて、ちゃんと動いてるように見えるけど何かがうまくいかない…ってことも起きうるので、気を付けてください。

それでは、次は合計した値が9以上のものを探さないといけないですね。今回はfilter関数を使って合計値が9以上のレコードを抽出してみます。コードとしては以下となります。

# 総評点が9以上のレコードを抽出する
new_rdd = new_rdd.filter(lambda x: x[1] >= 9)

lambda関数が受け取る引数xには(メールアドレス: 合計点)のタプルが渡されます。filter関数は式がTrueであるレコードのみを残す関数のため、x[1] >= 9がTrueであるレコード、つまり合計値が9以上のレコードのみが抽出されます。

これで、ほぼ完成ですね。残りは名前とメールアドレスだけを持った新しいRDDを生成し、今まで作ってきたnew_rddと結合すれば完成です。その式が以下となります。

# 結果物生成
join_rdd = origin_rdd.map(lambda x: (x.split(',')[1], x.split(',')[0]))
result_rdd = join_rdd.join(new_rdd).map(lambda x: (x[1][0], x[0]))
 
# 結果物出力
result = result_rdd.collect()
print(result)

これで完成です。コードの全体像は以下となります。

from pyspark import SparkConf, SparkContext
  
conf = SparkConf().setMaster('local').setAppName('questionnaire_analysis_1')
sc = SparkContext(conf=conf)
  
# 3日分のデータを読み込んでRDD作成
origin_rdd_1 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_1.txt')
origin_rdd_2 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_2.txt')
origin_rdd_3 = sc.textFile('/home/jovyan/blog-src/data/questionnaire_data_3.txt')
  
# 3日分のデータを一つのRDDにまとめる
origin_rdd = origin_rdd_1.union(origin_rdd_2)
origin_rdd = origin_rdd.union(origin_rdd_3)
 
# (メールアドレス, [イベント満足度, イベント会場評価])の形に変換する
new_rdd = origin_rdd.map(lambda x: x.split(','))
new_rdd = new_rdd.map(lambda x: (x[1], [x[4], x[5]]))
# 総評点を計算する
new_rdd = new_rdd.mapValues(lambda x: int(x[0]) + int(x[1]))
# 総評点が9以上のレコードを抽出する
new_rdd = new_rdd.filter(lambda x: x[1] >= 9)
 
# 結果物生成
join_rdd = origin_rdd.map(lambda x: (x.split(',')[1], x.split(',')[0]))
result_rdd = join_rdd.join(new_rdd).map(lambda x: (x[1][0], x[0]))
 
# 結果物出力
result = result_rdd.collect()
print(result)

結果は以下のような形になります。

[('高橋 修平', 'suzukisayuri@fujii.jp'), ('山口 舞', 'satokyosuke@takahashi.org'), ('山崎 陽子', 'hiroshi86@yamaguchi.jp'), ('伊藤 くみ子', 'abeyoko@hotmail.com'), ('岡本 直樹', 'minoruabe@nishimura.com'), ('吉田 京助', 'rei95@yamazaki.com'), ('前田 英樹', 'akira52@hotmail.com'), ...(以下省略)]

最後に

今回は簡単なプログラムを作って、どのようにRDDを使ってデータを操るかを見てみました。いかがでしたか?大体のイメージがつかめられたら、実際いろいろなプログラムを作ってみて、より理解を深めたらより勉強になるかと思います。次回からはSpark SQLについてみていきます。RDDを使ってデータを操作することもできますが、おなじみのSQLを使ってもデータを変換したりできる、とてもいい便利グッズになります。

Spark SQLを理解するためには、まずDataFrameという、もう一つのPySparkで使用されるオブジェクトを理解する必要があるため、DataFrameの話もする予定です。それでは、お楽しみに!

参考サイト

孔 允培 (執筆記事の一覧)

アプリケーションサービス部 ディベロップメントサービス課

孔子の80代目子孫