- MNIST Demo
- 名词
- 训练一个 Spark Pipeline
- 加载数据
- 构建 ML Data Pipeline
- 训练一个随机森林模型
- 序列化 ML Data Pipeline 和 RF Model 为 Bundle.ML
- 反序列化为 MLeap 和评分新数据
MNIST Demo
本教程会向你展示如何使用 MLeap 和 Bundle.ML 组件来导出一个 Spark ML Pipeline,并在完全不依赖 Spark Context 的前提下,使用 MLeap 来转换新数据。
我们会构建一个基于 MNIST 数据集训练,包含一个 Vector Assembler、一个 Binarizer、一个 PCA 以及一个 Random Forest Model,用于手写图像分类的 Pipeline。这个练习的目的不是为了训练得到一个最优模型,而是演示在 Spark 中训练一个 Pipeline 然后在 Spark 之外部署这个 Pipeline(数据处理 + 算法)是多么得简单。
本教程的代码分为两个部分:
- Spark ML Pipeline 代码:原生 Spark 代码,用于训练 ML Pipeline,而后把它序列化成 Bundle.ML。
- MLeap 代码:加载一个序列化后的 Bundle 到 MLeap,然后用其转换 Leap Frame。
开始之前,我们先来了解一些术语:
名词
- Estimator:真正意义上的机器学习算法,基于 Data Frame 训练 Transformer 并产生一个模型。
- 模型:在 Spark 里面,模型是代码和元数据,它基于训练过的算法对新数据进行评分。
- Transformer:任何用于转换 Data Frame 的都被叫做 Transformer,对于训练一个 Estimator 来说 Transformer 不是必须的(例如一个 Binarizer)。
- LeapFrame:一种 Data Frame 的数据结构,用于存储数据以及相关联的 Schema。
训练一个 Spark Pipeline
加载数据
// Note that we are taking advantage of com.databricks:spark-csv package to load the dataimport org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,IndexToString, Binarizer}import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator}import org.apache.spark.ml.{Pipeline,PipelineModel}import org.apache.spark.ml.feature.PCA// MLeap/Bundle.ML Serialization Librariesimport ml.combust.mleap.spark.SparkSupport._import resource._import ml.combust.bundle.BundleFileimport org.apache.spark.ml.bundle.SparkBundleContextval datasetPath = "./mleap-demo/data/mnist/mnist_train.csv"var dataset = spark.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(datasetPath)val testDatasetPath = "./mleap-demo/data/mnist/mnist_test.csv"var test = spark.sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true").load(testDatasetPath)
你可以下载训练和测试数据集(存放在 S3 上),当然你必须要修改成自己的 datasetPath 和 testDatasetPath。
原始数据托管在 Yann LeCun 的网站上
构建 ML Data Pipeline
// Define Dependent and Independent Featuresval predictionCol = "label"val labels = Seq("0","1","2","3","4","5","6","7","8","9")val pixelFeatures = (0 until 784).map(x => s"x$x").toArrayval layers = Array[Int](pixelFeatures.length, 784, 800, labels.length)val vector_assembler = new VectorAssembler().setInputCols(pixelFeatures).setOutputCol("features")val stringIndexer = { new StringIndexer().setInputCol(predictionCol).setOutputCol("label_index").fit(dataset)}val binarizer = new Binarizer().setInputCol(vector_assembler.getOutputCol).setThreshold(127.5).setOutputCol("binarized_features")val pca = new PCA().setInputCol(binarizer.getOutputCol).setOutputCol("pcaFeatures").setK(10)val featurePipeline = new Pipeline().setStages(Array(vector_assembler, stringIndexer, binarizer, pca))// Transform the raw data with the feature pipeline and persist itval featureModel = featurePipeline.fit(dataset)val datasetWithFeatures = featureModel.transform(dataset)// Select only the data needed for training and persist itval datasetPcaFeaturesOnly = datasetWithFeatures.select(stringIndexer.getOutputCol, pca.getOutputCol)val datasetPcaFeaturesOnlyPersisted = datasetPcaFeaturesOnly.persist()
我们本想让 Pipeline 包含随机森林模型,但目前有一个 Bug (SPARK-16845) 让我们暂时没法这么做(这个问题会在 2.2.0 中得到修复)。
训练一个随机森林模型
// You can optionally experiment with CrossValidator and MulticlassClassificationEvaluator to determine optimal// settings for the random forestval rf = new RandomForestClassifier().setFeaturesCol(pca.getOutputCol).setLabelCol(stringIndexer.getOutputCol).setPredictionCol("prediction").setProbabilityCol("probability").setRawPredictionCol("raw_prediction")val rfModel = rf.fit(datasetPcaFeaturesOnlyPersisted)
序列化 ML Data Pipeline 和 RF Model 为 Bundle.ML
import org.apache.spark.ml.mleap.SparkUtilval pipeline = SparkUtil.createPipelineModel(uid = "pipeline", Array(featureModel, rfModel))val sbc = SparkBundleContext().withDataset(rfModel.transform(datasetWithFeatures))for(bf <- managed(BundleFile("jar:file:/tmp/mnist-spark-pipeline.zip"))) {pipeline.writeBundle.save(bf)(sbc).get}
反序列化为 MLeap 和评分新数据
这一步的目的是展示如何反序列一个 bundle 然后使用它来对 Leap Frame 进行评分,而无需任何 Spark 依赖。你可以从我们的 S3 存储桶下载这个 mnist.json。
import ml.combust.mleap.runtime.MleapSupport._import ml.combust.mleap.runtime.MleapContext.defaultContextimport java.io.File// load the Spark pipeline we saved in the previous sectionval mleapPipeline = (for(bf <- managed(BundleFile("jar:file:/tmp/mnist-spark-pipeline.zip"))) yield {bf.loadMleapBundle().get.root}).tried.get
从我们的 mleap-demo Git 仓库中加载一个样例 Leap Frame(data/mnist.json)。
import ml.combust.mleap.runtime.serialization.FrameReaderval s = scala.io.Source.fromURL("file:///./mleap-demo/mnist.json").mkStringval bytes = s.getBytes("UTF-8")val frame = FrameReader("ml.combust.mleap.json").fromBytes(bytes)// transform the dataframe using our pipelineval frame2 = mleapPipeline.transform(frame).getval data = frame2.dataset
接下来你可以从这里拿到更多的示例和 Notebook。
