【PySpark入門】第7弾 PySpark運用Tips!

記事タイトルとURLをコピーする

はじめに

こんにちは。孔子の80代目子孫兼ディベロップメントサービス課の孔です。私は毎日納豆を食べていますが、最近遊びの一環で、ビニールはがすときの納豆の糸をどこまで伸ばせるか、毎日実験をしています。このような繊細な作業は丁寧にやらないといけない、という認識が強いかもしれませんが、実はビニールをスピーディーにはがすと、糸がより伸びるんですね。実験のおかげで厨房を汚していますが、健康にいいものなので大丈夫と割り切ってます。

前回までのブログでは、Sparkの主要概念を主に見て、実際どのように使用するかにフォーカスを当ててみてみました。今回は実際プログラムを書く際に、知っておくといい知識と概念をいくつかご紹介いたします。

Spark Web UI

SparkはウェブブラウザーでSparkジョブがどのような状態なのか、ノードのストレージはどうかなど、Sparkが動くにあたってモニタリングすべき事項をUIで提供する機能があります。実際運用を始めると、何か問題が発生したか、各Executorはどのような状態なのかなどをモニタリングするのは必須となります。その際に利用するといい機能ですね。イメージとしては、以下のようなものになります。

f:id:swx-kong:20210818175115p:plain (※2)

結構いろいろなものがモニタリングできるので、詳細を知りたい方は以下の公式ページからどのようなものが確認できるのか、見てみるといいかと思います。

https://spark.apache.org/docs/latest/web-ui.html

CacheとPersist

まず紹介する概念はCacheとPersistです。3. RDDの理解で、RDDの特徴として遅延処理があったこと、覚えていますか?DataFrameももちろんRDDを基盤として作られているため、遅延処理が基本となります。しかし途中までの計算を行って、その結果を保存しておくと、同じ処理を複数回処理しなくても済みますね。このようなケースが発生する際に使用されるメソッドがCacheとPersistとなります。(実は同じ説明が3. RDDの理解の「遅延評価(Lazy Evaluation)」段落に記載されています)

違いを見てみると、Cacheはメモリに一時的にデータを保存するメソッドであることに対して、Persistは分散処理を行っている各ノードの指定したストレージレベル(※1)に合わせて保存するのが違いとなります。ほぼ名前通りですね。 使用するメリットは、「コストの節約」につきます。再計算するために必要なコストがなくなった分、リソース・時間の節約が可能になります。

使用方法は簡単で、RDDもしくはDataFrameオブジェクトに対してメソッドを使用するだけです。

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
 
lineLengths.cache()
lineLengths.persist()

繰り返して使いたい演算結果を保持したRDD、DataFrameがある場合、リソースの節約効果が大きく得られるのであればこちらのメソッドを検討してみてください。

Partition

分散処理において、Partitionの概念も避けては通れない道となります。分散処理を行う上で、並列処理は必須となります。複数のノードに分けたデータの塊を送り、それぞれのノードで分散して並列で処理をすることで、Sparkは分散処理を実現しています。「分けたデータの塊」のことをSparkではPartitionと呼んでいます。このPartitionを正しく制御するだけでもパフォーマンスの向上、メモリーイシューに対応ができますので、常に意識することが大事です。

基本的に、Sparkはノードに搭載されたCPUコア数と同数のPartitionを生成し、各ノードに各Partitionを配置して、各Partitionに対してタスクを生成することで分散処理を行います。Partition数は指定する方法はいろいろありますが、一つ例を挙げるとスクリプトのspark.confで設定する方法があります。

spark.conf.set("spark.sql.shuffle.partitions", "10")  # partition 10で設定

また、処理の途中でもPartition数を変更することも可能です。

# Change DataFrame partitions to 10
newDF = df.repartition(10)

Partitionが多すぎると1Partitionに1タスクを生成するSparkの仕様上、タスクの作成・スケジューリング処理・管理などにリソースをとられすぎて性能が低下する原因となります。

逆にPartitionが少なすぎるとCPUを遊ばせたりすることで最適な並列処理ができないことで、非効率的なリソース使用が発生する恐れがあります。

適切にPartition数を維持するのが性能最適化の重要な要素となります。

最後に

いかがでしょうか。このPySpark入門シリーズではSparkで使われる基本的な概念の説明ましたが、皆さんのお役に立てたでしょうか。これSparkジョブの設定方法などまだまだ紹介しきれなかったものも多いです。ただし今回のシリーズは「実際どのように使うのか」や「この動作どうやって実現できるのか」などの情報よりは、「Sparkを利用するにあたって必要となる概念をしっかり理解する」ことを目的で作成していますので、実際使用する際に必要となる知識は割愛させていただきました(Qiitaに勝る情報を出せる自信がなかったためです(´;ω;`)ウッ…)

これで、今まで混乱していた知識がまとまった!とか、RDDとかDataFrameとかSparkで使われる概念全然わからなかったのに、助かった!と思ってくださる方がいらっしゃればとてもうれしいです。

それでは、Spark StreamingやSpark MLなどライブラリの紹介とか、AWSでPySparkどうやって使うのなどなどの知識もいつかまた紹介しますので、もし今回のシリーズがよかったらまた読みに来てくださるとうれしいです。

それでは、今まで読んでくださってありがとうございました!またお会いしましょう!

参考サイト

孔 允培 (執筆記事の一覧)

アプリケーションサービス部 ディベロップメントサービス課

孔子の80代目子孫