- 基础 Demo
- 构建和导出 MLeap Bundle
- 导入 MLeap Bundle
基础 Demo
基础 Demo 会引导你使用 Spark 来构建 ML Pipeline,导出 Pipeline 为 MLeap Bundle,以及随后在 MLeap Runtime 中使用它来转换 Data Frame。
构建和导出 MLeap Bundle
本章节我们会通过编码来创建一个简单的 Spark ML Pipeline,然后将其导出成 MLeap Bundle。我们的 Pipeline 非常简单,它在一个离散特征上进行字符串索引,然后使用一个二分器将结果转为 0 或 1。这个 Pipeline 没有实际的用途,但能够展示出从 Spark ML Pipeline 构建得到 MLeap Bundle 是多么容易。
import ml.combust.bundle.BundleFileimport ml.combust.mleap.spark.SparkSupport._import org.apache.spark.ml.Pipelineimport org.apache.spark.ml.bundle.SparkBundleContextimport org.apache.spark.ml.feature.{Binarizer, StringIndexer}import org.apache.spark.sql._import org.apache.spark.sql.functions._import resource._val datasetName = "./mleap-docs/assets/spark-demo.csv"val dataframe: DataFrame = spark.sqlContext.read.format("csv").option("header", true).load(datasetName).withColumn("test_double", col("test_double").cast("double"))// User out-of-the-box Spark transformers like you normally wouldval stringIndexer = new StringIndexer().setInputCol("test_string").setOutputCol("test_index")val binarizer = new Binarizer().setThreshold(0.5).setInputCol("test_double").setOutputCol("test_bin")val pipelineEstimator = new Pipeline().setStages(Array(stringIndexer, binarizer))val pipeline = pipelineEstimator.fit(dataframe)// then serialize pipelineval sbc = SparkBundleContext().withDataset(pipeline.transform(dataframe))for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {pipeline.writeBundle.save(bf)(sbc).get}
训练数据集可以从这里获取。
注意:由于 GitBook 不允许用户直接点击链接下载,请右键另存为。
导入 MLeap Bundle
本节中我们会加载上一节生成的 MLeap Bundle 到 MLeap Runtime 中。我们将会使用 MLeap Runtime 来转换一帧 Leap Frame。
import ml.combust.bundle.BundleFileimport ml.combust.mleap.runtime.MleapSupport._import resource._// load the Spark pipeline we saved in the previous sectionval bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) yield {bundleFile.loadMleapBundle().get}).opt.get// create a simple LeapFrame to transformimport ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row}import ml.combust.mleap.core.types._// MLeap makes extensive use of monadic types like Tryval schema = StructType(StructField("test_string", ScalarType.String),StructField("test_double", ScalarType.Double)).getval data = Seq(Row("hello", 0.6), Row("MLeap", 0.2))val frame = DefaultLeapFrame(schema, data)// transform the dataframe using our pipelineval mleapPipeline = bundle.rootval frame2 = mleapPipeline.transform(frame).getval data2 = frame2.dataset// get data from the transformed rows and make some assertionsassert(data2(0).getDouble(2) == 1.0) // string indexer outputassert(data2(0).getDouble(3) == 1.0) // binarizer output// the second rowassert(data2(1).getDouble(2) == 2.0)assert(data2(1).getDouble(3) == 0.0)
搞定!这个例子非常简单。你很可能不会像我们那样手动去构建 Spark ML Pipeline,而是使用 Estimator 和 Pipeline 基于你的数据来训练得到有用的模型。更高级的例子,可以参见我们的 MNIST Demo 章节。
