はじめに
こんにちは。孔子の80代目子孫兼ディベロップメントサービス課の孔です。Youtubeをうろうろしてたら、「毎日1%成長すると、1年で37倍成長できる」というサムネを見て、今日も1%成長できるようにがんばろう…と思って頑張ってブログを書いています。
前回のブログでは、DataFrameがどのようなものが見てみました。その特徴の中の一つとしてSpark SQLを使用できる、がありましたね。今回のブログではそのSpark SQLがどのようなものなのか、そして実際どのように使用できるのかを見ていきましょう。
Spark SQLの概要
Spark SQL(※1)は文字通りSparkでSQL(※2 SQLがどのようなものかわからない方は「※2」をご参照ください)が使用できる、Sparkのライブラリであり、構造化されたデータを処理するために使われるものとなります。Spark SQLはDataFrameオブジェクトに対して使用することができます。DataFrameはRDBと似たようなオブジェクトだと、前回のブログで紹介しましたが、そのためSQL文を使ってRDBでデータを操作することとほぼ同じ方法でSparkでデータを操作するができます。
また、RDBでSQL文を書くときにいろいろ考慮しなければならないパフォーマンスを出すためのロジックなども、DataFrameを使用しているためパフォーマンス最適化が行われていますので、より簡単にコードの作成ができるというのもメリットとなります。
もちろん、多様なデータソース(Parquetファイル(※3)、JSON、Hive(※4)テーブル、JDBC(※5)などなど)からデータ読み込みもDataFrameをもとにしているので使用可能ですので、とても汎用性が高いのも特徴となります。それでは、実際どのように使用するのか、見てみましょう。
実際使ってみよう
まずはDataFrameを生成しよう
まず、使用する際にはいままで同様SparkSessionオブジェクトを作成してエントリーポイントを作成します。
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
そして、Spark SQLはDataFrameに対して使用可能なので、DataFrameを作成する必要があります。
# spark is an existing SparkSession df = spark.read.load("パス", format="ファイル形式")
loadの引数には、例えばspark.read.load("examples/src/main/resources/people.json", format="json")
のような形でファイル指定が可能です。また、必要に応じてオプションを付けることも可能です。例えば以下のような形です。
df = spark.read.load("examples/src/main/resources/people.csv", format="csv", sep=":", inferSchema="true", header="true")
DataFrameにSQLクエリを発行して操作してみよう
それでは、DataFrameに対してSQLクエリを発行してみましょう。以下の2通りで発行することが可能です。
SQLをそのまま書く
まずはSQLを書きなじんだ人に嬉しい、SQL文をそのまま利用する書き方になります。以下の例を見てください。
df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people")
SQL文をそのまま引数に渡していますね。SQL文をSpark SQLが解析し、どのような操作か理解して内部で最適化したロジックを実行してくれます。クエリ文が複数行になる場合は以下のような書き方をすると読みやすくなります。
df.createOrReplaceTempView("people") query = """ SELECT * FROM people FILTER age > 20 """ sqlDF = spark.sql("SELECT * FROM people")
SQL文をメソッドとして書く
もう一つの方法はSQLの関数をDataFrameのメソッドを書く感覚の書き方になります。以下の例をみてください。
# Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+
show()
メソッドはDataFrameのレコードを標準出力するメソッドになります。上の例ではDataFrameに対して、df.selectやdf.filterのように、SQLの関数をそのままメソッドとして使用して、SQL文と同じ結果を得ています。このような書き方でもSpark SQLを使ってDataFrameのデータを操作することが可能です。ただし、メソッドが多くなると読みづらくなるので、もし複雑な処理が必要な場合には先ほど紹介した「SQLをそのまま書く」方法を検討してください。
最後に
すでにSQLが書ける方にとって、Spark SQLはとても嬉しいライブラリですね。さらに、性能最適化や分散処理を自動的にいい感じにやってくれる点で、SQLが書ける方にとってデータ分析においてSparkは良い選択肢の一つになれるのではないかと思います。
それでは、次回はSparkを使用する上で、どのようなところを気を付けないといけないのか、をみていきたいと思います。少し深い話になってしまうので、基礎的な概念だけ紹介しようと思っていたシリーズの趣旨から少し離れるかもしれませんが、実際Sparkを使ったプログラムを作成する上で最低限知っておくべきポイントをいくつか紹介できればと思います。
それでは、次回もお楽しみに!
参考サイト
前回の記事
※1. Spark SQLドキュメント
※2. 入門者でもわかるSQLを使って表を作ってみよう!【SQL文の書き方付き】
※3. Databricks用語集
※4. Apache Hive
※5. Oracle Database JDBC開発者ガイド