読者です 読者をやめる 読者になる 読者になる

sekaie engineers' blog

セカイエ株式会社が主催するエンジニア勉強会について

GCP の DataProc で MLlib を使う

はじめに

おはようございます。こんにちは。こんばんわ。

佐々木です。

Google Cloud Platform の DataProc で MLlib を使ってみたのでメモ書きします。

ちなみに機械学習とかは全然疎いです。

DataProc と MLLib

DataProc は Google Cloud Platform にある Spark / Hadoop が使えるマネージサービスです

cloud.google.com

MLLib は Spark に載ってる機械学習用のライブラリです

spark.apache.org

前提

  • Google Cloud Platform の DataProc API を有効にしておく
  • Google Cloud Storage も使っているので有効にしておく
  • gcloud command はインストール済

MLlib の学習用データを作成する

今回 MLlib で扱うデータは BigQuery のデータから抽出します。

今回は MLlib の決定木を使ってみます。

学習用データは csv で入力することにします。

SQLは擬似的なもので、アクセス数と購入の有無を取れるようなイメージです

SELECT
 IF(orders.user_id IS NOT NULL, 1, 0) AS result,
 COUNT(access_logs.user_id) AS cnt
FROM access_logs.access_20160711 access_logs
LEFT OUTER JOIN test.order_log_20160711 orders
  ON access_logs.user_id = orders.user_id
GROUP BY access_logs.user_id

こんな感じの SQL を書いてこれを Google Cloud Storage に export します

MLLib 用の python script を書く

DataProc で用意されていた Python Script をちょっと改修して書きました

#!/usr/bin/python
import pyspark
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

def parseRow(row):
    tokens = row.split(',')
    label = tokens[0]
    features = tokens[1:]
    return LabeledPoint(label, features)

sc = pyspark.SparkContext()

data = sc.textFile("gs://test/orders.csv")
labeled_points = data.map(parseRow)

(trainingData, testData) = labeled_points.randomSplit([0.7, 0.3])
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', maxDepth=5, maxBins=32)
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

DataProc のクラスタを作成する

これは GCP Console でも gcloud command でもどちらでもいいですが、クラスタを作成します

特に何も困らず作成出来ると思います

DataProc に JOB を投げる

作成したクラスタに上記の python script を投げます

$ gcloud dataproc jobs submit pyspark --cluster cluster-1 order.py

DecisionTreeModel classifier of depth 5 with 23 nodes
  If (feature 0 <= 18.0)
   If (feature 0 <= 10.0)
    If (feature 0 <= 4.0)
     If (feature 0 <= 3.0)
      Predict: 0.0
     Else (feature 0 > 3.0)
      Predict: 1.0
    Else (feature 0 > 4.0)
     Predict: 0.0
   Else (feature 0 > 10.0)
    If (feature 0 <= 11.0)
     Predict: 1.0
    Else (feature 0 > 11.0)
     If (feature 0 <= 16.0)
      Predict: 0.0
     Else (feature 0 > 16.0)
      If (feature 0 <= 17.0)
       Predict: 1.0
      Else (feature 0 > 17.0)
       Predict: 0.0
  Else (feature 0 > 18.0)
   If (feature 0 <= 68.0)
    If (feature 0 <= 44.0)
     If (feature 0 <= 37.0)
      If (feature 0 <= 20.0)
       Predict: 0.0
      Else (feature 0 > 20.0)
       Predict: 1.0
     Else (feature 0 > 37.0)
      Predict: 0.0
    Else (feature 0 > 44.0)
     Predict: 1.0
   Else (feature 0 > 68.0)
    Predict: 0.0

うーん。学習用データが少なくてちょっと残念な感じになってしまいました。

もうちょっとデータを増やしてやる必要があるみたいですね。

しかし、今回の目的の DataProc で MLlib を利用して決定木を作成することはできました。

おわりに

DataProc をつかってデータを処理するのは楽しいですね。

DataProc と Bigquery あたりを連携して定常的にデータ分析ができる仕組みを作りたいなと思います。

また何か進捗があったらこのブログで報告させていただければと思います。

ほな!