博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark机器学习4·分类模型(spark-shell)
阅读量:7009 次
发布时间:2019-06-28

本文共 19426 字,大约阅读时间需要 64 分钟。

  • 线性模型

    • 逻辑回归--逻辑损失(logistic loss)
    • 线性支持向量机(Support Vector Machine, SVM)--合页损失(hinge loss)
  • 朴素贝叶斯(Naive Bayes)
  • 决策树

0 准备数据

kaggle2.blob.core.windows.net/competitions-data/kaggle/3526/train.tsv

sed 1d train.tsv > train_noheader.tsv

0 运行环境

cd /Users/erichan/Garden/spark-1.5.1-bin-cdh4bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2import org.apache.spark.mllib.feature._import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.evaluation.BinaryClassificationMetricsimport org.apache.spark.mllib.linalg.distributed.RowMatriximport org.apache.spark.rdd.RDDimport org.apache.spark.mllib.optimization._import org.apache.spark.mllib.classification._import org.apache.spark.mllib.evaluation._import org.apache.spark.mllib.classification._import org.apache.spark.mllib.tree.DecisionTreeimport org.apache.spark.mllib.tree.configuration.Algoimport org.apache.spark.mllib.tree.impurity._

1 提取特征

val PATH = "/Users/erichan/sourcecode/book/Spark机器学习"val rawData = sc.textFile(PATH+"/train_noheader.tsv")val records = rawData.map(line => line.split("\t"))records.first

Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ...

val data = records.map { r =>    val trimmed = r.map(_.replaceAll("\"", ""))    val label = trimmed(r.size - 1).toInt    val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)    LabeledPoint(label, Vectors.dense(features))}data.cacheval numData = data.count

numData: Long = 7395

// note that some of our data contains negative feature vaues. For naive Bayes we convert these to zerosval nbData = records.map { r =>    val trimmed = r.map(_.replaceAll("\"", ""))    val label = trimmed(r.size - 1).toInt    val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d)    LabeledPoint(label, Vectors.dense(features))}

2 训练分类模型

2.1 逻辑回归模型

// train a Logistic Regression modelval numIterations = 10val maxTreeDepth = 5val lrModel = LogisticRegressionWithSGD.train(data, numIterations)

2.2 SVM模型

val svmModel = SVMWithSGD.train(data, numIterations)

2.3 朴素贝叶斯

val nbModel = NaiveBayes.train(nbData)

2.4 决策树

val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth)

3 使用分类模型

3.1 预测

以逻辑回归模型为例

val dataPoint = data.firstval prediction = lrModel.predict(dataPoint.features)

prediction: Double = 1.0

val trueLabel = dataPoint.label

trueLabel: Double = 0.0

val predictions = lrModel.predict(data.map(lp => lp.features))predictions.take(5)

Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0)

4 评估性能

4.1 逻辑回归模型的正确率

val lrTotalCorrect = data.map { point =>  if (lrModel.predict(point.features) == point.label) 1 else 0}.sumval lrAccuracy = lrTotalCorrect / numData

lrAccuracy: Double = 0.5146720757268425

4.2 SVM模型的正确率

val svmTotalCorrect = data.map { point =>  if (svmModel.predict(point.features) == point.label) 1 else 0}.sumval svmAccuracy = svmTotalCorrect / numData

svmAccuracy: Double = 0.5146720757268425

4.3 朴素贝叶斯的正确率

val nbTotalCorrect = nbData.map { point =>  if (nbModel.predict(point.features) == point.label) 1 else 0}.sumval nbAccuracy = nbTotalCorrect / numData

nbAccuracy: Double = 0.5803921568627451

4.4 决策树的正确率

// decision tree threshold needs to be specifiedval dtTotalCorrect = data.map { point =>  val score = dtModel.predict(point.features)  val predicted = if (score > 0.5) 1 else 0  if (predicted == point.label) 1 else 0}.sumval dtAccuracy = dtTotalCorrect / numData

dtAccuracy: Double = 0.6482758620689655

4.5 ROC曲线和AUC

val metrics = Seq(lrModel, svmModel).map { model =>    val scoreAndLabels = data.map { point =>          (model.predict(point.features), point.label)    }    val metrics = new BinaryClassificationMetrics(scoreAndLabels)    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)}val nbMetrics = Seq(nbModel).map{ model =>    val scoreAndLabels = nbData.map { point =>          val score = model.predict(point.features)          (if (score > 0.5) 1.0 else 0.0, point.label)    }    val metrics = new BinaryClassificationMetrics(scoreAndLabels)    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)}val dtMetrics = Seq(dtModel).map{ model =>    val scoreAndLabels = data.map { point =>          val score = model.predict(point.features)          (if (score > 0.5) 1.0 else 0.0, point.label)    }    val metrics = new BinaryClassificationMetrics(scoreAndLabels)    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)}val allMetrics = metrics ++ nbMetrics ++ dtMetricsallMetrics.foreach{ case (m, pr, roc) =>    println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")}

LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418%

SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559%
DecisionTreeModel, Area under PR: 74.3081%, Area under ROC: 64.8837%

5 改进和调优

5.1 特征标准化

val vectors = data.map(lp => lp.features)val matrix = new RowMatrix(vectors)val matrixSummary = matrix.computeColumnSummaryStatistics()println(matrixSummary.mean)println(matrixSummary.min)println(matrixSummary.max)println(matrixSummary.variance)println(matrixSummary.numNonzeros)

[0.4122580529952672,2.761823191986608,0.4682304732861389,0.21407992638350232,0.09206236071899916,0.04926216043908053,2.255103452212041,-0.10375042752143335,0.0,0.05642274498417851,0.02123056118999324,0.23377817665490194,0.2757090373659236,0.615551048005409,0.6603110209601082,30.07707910750513,0.03975659229208925,5716.598242055447,178.75456389452353,4.960649087221096,0.17286405047031742,0.10122079189276552]

[0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0,0.0,0.0,0.0,0.045564223,-1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0]

[0.999426,363.0,1.0,1.0,0.980392157,0.980392157,21.0,0.25,0.0,0.444444444,1.0,0.716883117,113.3333333,1.0,1.0,100.0,1.0,207952.0,4997.0,22.0,1.0,1.0]

[0.10974244167559001,74.30082476809639,0.04126316989120241,0.02153343633200108,0.009211817450882448,0.005274933469767946,32.53918714591821,0.09396988697611545,0.0,0.0017177410346628928,0.020782634824610638,0.0027548394224293036,3.683788919674426,0.2366799607085986,0.22433071201674218,415.8785589543846,0.03818116876739597,7.877330081138463E7,32208.116247426184,10.45300904576431,0.03359363403832393,0.006277532884214705]

[5053.0,7354.0,7172.0,6821.0,6160.0,5128.0,7350.0,1257.0,0.0,7362.0,157.0,7395.0,7355.0,4552.0,4883.0,7347.0,294.0,7378.0,7395.0,6782.0,6868.0,7235.0]

val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors)val scaledData = data.map(lp => LabeledPoint(lp.label, scaler.transform(lp.features)))println(data.first.features)println(scaledData.first.features)println((0.789131 - 0.41225805299526636)/math.sqrt(0.1097424416755897))

[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]

[1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]

1.137647336497682

使用标准化重新训练

val lrModelScaled = LogisticRegressionWithSGD.train(scaledData, numIterations)val lrTotalCorrectScaled = scaledData.map { point =>  if (lrModelScaled.predict(point.features) == point.label) 1 else 0}.sumval lrAccuracyScaled = lrTotalCorrectScaled / numData// lrAccuracyScaled: Double = 0.6204192021636241val lrPredictionsVsTrue = scaledData.map { point =>    (lrModelScaled.predict(point.features), point.label)}val lrMetricsScaled = new BinaryClassificationMetrics(lrPredictionsVsTrue)val lrPr = lrMetricsScaled.areaUnderPRval lrRoc = lrMetricsScaled.areaUnderROCprintln(f"${lrModelScaled.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaled * 100}%2.4f%%\nArea under PR: ${lrPr * 100.0}%2.4f%%\nArea under ROC: ${lrRoc * 100.0}%2.4f%%")

LogisticRegressionModel

Accuracy: 62.0419%
Area under PR: 72.7254%
Area under ROC: 61.9663%

5.2 其他特征

val categories = records.map(r => r(3)).distinct.collect.zipWithIndex.toMapval numCategories = categories.sizeval dataCategories = records.map { r =>    val trimmed = r.map(_.replaceAll("\"", ""))    val label = trimmed(r.size - 1).toInt    val categoryIdx = categories(r(3))    val categoryFeatures = Array.ofDim[Double](numCategories)    categoryFeatures(categoryIdx) = 1.0    val otherFeatures = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)    val features = categoryFeatures ++ otherFeatures    LabeledPoint(label, Vectors.dense(features))}println(dataCategories.first)

(0.0,[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])

标准化转换

// standardize the feature vectorsval scalerCats = new StandardScaler(withMean = true, withStd = true).fit(dataCategories.map(lp => lp.features))val scaledDataCats = dataCategories.map(lp => LabeledPoint(lp.label, scalerCats.transform(lp.features)))println(dataCategories.first.features)println(scaledDataCats.first.features)

[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]

[-0.02326210589837061,2.7207366564548514,-0.4464212047941535,-0.22052688457880879,-0.028494000387023734,-0.2709990696925828,-0.23272797709480803,-0.2016540523193296,-0.09914991930875496,-0.38181322324318134,-0.06487757239262681,-0.6807527904251456,-0.20418221057887365,-0.10189469097220732,1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]

使用扩展后的特征训练逻辑回归模型

val lrModelScaledCats = LogisticRegressionWithSGD.train(scaledDataCats, numIterations)val lrTotalCorrectScaledCats = scaledDataCats.map { point =>  if (lrModelScaledCats.predict(point.features) == point.label) 1 else 0}.sumval lrAccuracyScaledCats = lrTotalCorrectScaledCats / numDataval lrPredictionsVsTrueCats = scaledDataCats.map { point =>    (lrModelScaledCats.predict(point.features), point.label)}val lrMetricsScaledCats = new BinaryClassificationMetrics(lrPredictionsVsTrueCats)val lrPrCats = lrMetricsScaledCats.areaUnderPRval lrRocCats = lrMetricsScaledCats.areaUnderROCprintln(f"${lrModelScaledCats.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaledCats * 100}%2.4f%%\nArea under PR: ${lrPrCats * 100.0}%2.4f%%\nArea under ROC: ${lrRocCats * 100.0}%2.4f%%")

LogisticRegressionModel

Accuracy: 66.5720%
Area under PR: 75.7964%
Area under ROC: 66.5483%

5.3 使用正确的数据格式

使用1-of-k便民店类型特征构建数据集

val dataNB = records.map { r =>    val trimmed = r.map(_.replaceAll("\"", ""))    val label = trimmed(r.size - 1).toInt    val categoryIdx = categories(r(3))    val categoryFeatures = Array.ofDim[Double](numCategories)    categoryFeatures(categoryIdx) = 1.0    LabeledPoint(label, Vectors.dense(categoryFeatures))}

重新训练贝叶斯模型,并评估性能

val nbModelCats = NaiveBayes.train(dataNB)val nbTotalCorrectCats = dataNB.map { point =>  if (nbModelCats.predict(point.features) == point.label) 1 else 0}.sumval nbAccuracyCats = nbTotalCorrectCats / numDataval nbPredictionsVsTrueCats = dataNB.map { point =>    (nbModelCats.predict(point.features), point.label)}val nbMetricsCats = new BinaryClassificationMetrics(nbPredictionsVsTrueCats)val nbPrCats = nbMetricsCats.areaUnderPRval nbRocCats = nbMetricsCats.areaUnderROCprintln(f"${nbModelCats.getClass.getSimpleName}\nAccuracy: ${nbAccuracyCats * 100}%2.4f%%\nArea under PR: ${nbPrCats * 100.0}%2.4f%%\nArea under ROC: ${nbRocCats * 100.0}%2.4f%%")

NaiveBayesModel

Accuracy: 60.9601%
Area under PR: 74.0522%
Area under ROC: 60.5138%

5.4 模型参数调优

5.4.1 线性模型

基础优化技术:随机梯度下降(SGD)

辅助函数:根据输入,训练模型

// helper function to train a logistic regresson modeldef trainWithParams(input: RDD[LabeledPoint], regParam: Double, numIterations: Int, updater: Updater, stepSize: Double) = {    val lr = new LogisticRegressionWithSGD    lr.optimizer.setNumIterations(numIterations).setUpdater(updater).setRegParam(regParam).setStepSize(stepSize)    lr.run(input)}

辅助函数:根据输入数据和分类模型,计算AUC

// helper function to create AUC metricdef createMetrics(label: String, data: RDD[LabeledPoint], model: ClassificationModel) = {    val scoreAndLabels = data.map { point =>          (model.predict(point.features), point.label)    }    val metrics = new BinaryClassificationMetrics(scoreAndLabels)    (label, metrics.areaUnderROC)}

迭代次数调优

// cache the data to increase speed of multiple runs agains the datasetscaledDataCats.cache// num iterationsval iterResults = Seq(1, 5, 10, 50).map { param =>    val model = trainWithParams(scaledDataCats, 0.0, param, new SimpleUpdater, 1.0)    createMetrics(s"$param iterations", scaledDataCats, model)}iterResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

1 iterations, AUC = 64.97%

5 iterations, AUC = 66.62%
10 iterations, AUC = 66.55%
50 iterations, AUC = 66.81%

步长调优

// step sizeval numIterations = 10val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>    val model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater, param)    createMetrics(s"$param step size", scaledDataCats, model)}stepResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

0.001 step size, AUC = 64.95%

0.01 step size, AUC = 65.00%
0.1 step size, AUC = 65.52%
1.0 step size, AUC = 66.55%
10.0 step size, AUC = 61.92%

使用SquaredL2Updater研究正则化参数

// regularizationval regResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>    val model = trainWithParams(scaledDataCats, param, numIterations, new SquaredL2Updater, 1.0)    createMetrics(s"$param L2 regularization parameter", scaledDataCats, model)}regResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

0.001 L2 regularization parameter, AUC = 66.55%

0.01 L2 regularization parameter, AUC = 66.55%
0.1 L2 regularization parameter, AUC = 66.63%
1.0 L2 regularization parameter, AUC = 66.04%
10.0 L2 regularization parameter, AUC = 35.33%

5.4.2 决策树

辅助函数:接收树的深度和不纯度

// investigate decision treedef trainDTWithParams(input: RDD[LabeledPoint], maxDepth: Int, impurity: Impurity) = {    DecisionTree.train(input, Algo.Classification, impurity, maxDepth)}

使用Entropy不纯度

// investigate tree depth impact for Entropy impurityval dtResultsEntropy = Seq(1, 2, 3, 4, 5, 10, 20).map { param =>    val model = trainDTWithParams(data, param, Entropy)    val scoreAndLabels = data.map { point =>        val score = model.predict(point.features)          (if (score > 0.5) 1.0 else 0.0, point.label)    }    val metrics = new BinaryClassificationMetrics(scoreAndLabels)    (s"$param tree depth", metrics.areaUnderROC)}dtResultsEntropy.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

1 tree depth, AUC = 59.33%

2 tree depth, AUC = 61.68%
3 tree depth, AUC = 62.61%
4 tree depth, AUC = 63.63%
5 tree depth, AUC = 64.88%
10 tree depth, AUC = 76.26%
20 tree depth, AUC = 98.45%

使用Gini不纯度

// investigate tree depth impact for Gini impurityval dtResultsGini = Seq(1, 2, 3, 4, 5, 10, 20).map { param =>    val model = trainDTWithParams(data, param, Gini)    val scoreAndLabels = data.map { point =>        val score = model.predict(point.features)          (if (score > 0.5) 1.0 else 0.0, point.label)    }    val metrics = new BinaryClassificationMetrics(scoreAndLabels)    (s"$param tree depth", metrics.areaUnderROC)}dtResultsGini.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

1 tree depth, AUC = 59.33%

2 tree depth, AUC = 61.68%
3 tree depth, AUC = 62.61%
4 tree depth, AUC = 63.63%
5 tree depth, AUC = 64.89%
10 tree depth, AUC = 78.37%
20 tree depth, AUC = 98.87%

5.4.3 朴素贝叶斯

辅助函数:接收lamda参数

// investigate Naive Bayes parametersdef trainNBWithParams(input: RDD[LabeledPoint], lambda: Double) = {    val nb = new NaiveBayes    nb.setLambda(lambda)    nb.run(input)}
val nbResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>    val model = trainNBWithParams(dataNB, param)    val scoreAndLabels = dataNB.map { point =>          (model.predict(point.features), point.label)    }    val metrics = new BinaryClassificationMetrics(scoreAndLabels)    (s"$param lambda", metrics.areaUnderROC)}nbResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

0.001 lambda, AUC = 60.51%

0.01 lambda, AUC = 60.51%
0.1 lambda, AUC = 60.51%
1.0 lambda, AUC = 60.51%
10.0 lambda, AUC = 60.51%

5.4.4 交叉验证

划分训练集和测试集

// illustrate cross-validation// create a 60% / 40% train/test data splitval trainTestSplit = scaledDataCats.randomSplit(Array(0.6, 0.4), 123)val train = trainTestSplit(0)val test = trainTestSplit(1)

测试集的模型性能

val regResultsTest = Seq(0.0, 0.001, 0.0025, 0.005, 0.01).map { param =>    val model = trainWithParams(train, param, numIterations, new SquaredL2Updater, 1.0)    createMetrics(s"$param L2 regularization parameter", test, model)}regResultsTest.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.6f%%") }

0.0 L2 regularization parameter, AUC = 66.480874%

0.001 L2 regularization parameter, AUC = 66.480874%
0.0025 L2 regularization parameter, AUC = 66.515027%
0.005 L2 regularization parameter, AUC = 66.515027%
0.01 L2 regularization parameter, AUC = 66.549180%

训练集的模型性能

// training set resultsval regResultsTrain = Seq(0.0, 0.001, 0.0025, 0.005, 0.01).map { param =>    val model = trainWithParams(train, param, numIterations, new SquaredL2Updater, 1.0)    createMetrics(s"$param L2 regularization parameter", train, model)}regResultsTrain.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.6f%%") }

0.0 L2 regularization parameter, AUC = 66.260311%

0.001 L2 regularization parameter, AUC = 66.260311%
0.0025 L2 regularization parameter, AUC = 66.260311%
0.005 L2 regularization parameter, AUC = 66.238294%
0.01 L2 regularization parameter, AUC = 66.238294%

转载地址:http://euitl.baihongyu.com/

你可能感兴趣的文章