本文共 19426 字,大约阅读时间需要 64 分钟。
线性模型
kaggle2.blob.core.windows.net/competitions-data/kaggle/3526/train.tsv
sed 1d train.tsv > train_noheader.tsv
cd /Users/erichan/Garden/spark-1.5.1-bin-cdh4bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2import org.apache.spark.mllib.feature._import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.evaluation.BinaryClassificationMetricsimport org.apache.spark.mllib.linalg.distributed.RowMatriximport org.apache.spark.rdd.RDDimport org.apache.spark.mllib.optimization._import org.apache.spark.mllib.classification._import org.apache.spark.mllib.evaluation._import org.apache.spark.mllib.classification._import org.apache.spark.mllib.tree.DecisionTreeimport org.apache.spark.mllib.tree.configuration.Algoimport org.apache.spark.mllib.tree.impurity._
val PATH = "/Users/erichan/sourcecode/book/Spark机器学习"val rawData = sc.textFile(PATH+"/train_noheader.tsv")val records = rawData.map(line => line.split("\t"))records.first
Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ...
val data = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble) LabeledPoint(label, Vectors.dense(features))}data.cacheval numData = data.count
numData: Long = 7395
// note that some of our data contains negative feature vaues. For naive Bayes we convert these to zerosval nbData = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d) LabeledPoint(label, Vectors.dense(features))}
// train a Logistic Regression modelval numIterations = 10val maxTreeDepth = 5val lrModel = LogisticRegressionWithSGD.train(data, numIterations)
val svmModel = SVMWithSGD.train(data, numIterations)
val nbModel = NaiveBayes.train(nbData)
val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth)
以逻辑回归模型为例
val dataPoint = data.firstval prediction = lrModel.predict(dataPoint.features)
prediction: Double = 1.0
val trueLabel = dataPoint.label
trueLabel: Double = 0.0
val predictions = lrModel.predict(data.map(lp => lp.features))predictions.take(5)
Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0)
val lrTotalCorrect = data.map { point => if (lrModel.predict(point.features) == point.label) 1 else 0}.sumval lrAccuracy = lrTotalCorrect / numData
lrAccuracy: Double = 0.5146720757268425
val svmTotalCorrect = data.map { point => if (svmModel.predict(point.features) == point.label) 1 else 0}.sumval svmAccuracy = svmTotalCorrect / numData
svmAccuracy: Double = 0.5146720757268425
val nbTotalCorrect = nbData.map { point => if (nbModel.predict(point.features) == point.label) 1 else 0}.sumval nbAccuracy = nbTotalCorrect / numData
nbAccuracy: Double = 0.5803921568627451
// decision tree threshold needs to be specifiedval dtTotalCorrect = data.map { point => val score = dtModel.predict(point.features) val predicted = if (score > 0.5) 1 else 0 if (predicted == point.label) 1 else 0}.sumval dtAccuracy = dtTotalCorrect / numData
dtAccuracy: Double = 0.6482758620689655
val metrics = Seq(lrModel, svmModel).map { model => val scoreAndLabels = data.map { point => (model.predict(point.features), point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)}val nbMetrics = Seq(nbModel).map{ model => val scoreAndLabels = nbData.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)}val dtMetrics = Seq(dtModel).map{ model => val scoreAndLabels = data.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)}val allMetrics = metrics ++ nbMetrics ++ dtMetricsallMetrics.foreach{ case (m, pr, roc) => println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")}
LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418%NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559%DecisionTreeModel, Area under PR: 74.3081%, Area under ROC: 64.8837%val vectors = data.map(lp => lp.features)val matrix = new RowMatrix(vectors)val matrixSummary = matrix.computeColumnSummaryStatistics()println(matrixSummary.mean)println(matrixSummary.min)println(matrixSummary.max)println(matrixSummary.variance)println(matrixSummary.numNonzeros)
[0.4122580529952672,2.761823191986608,0.4682304732861389,0.21407992638350232,0.09206236071899916,0.04926216043908053,2.255103452212041,-0.10375042752143335,0.0,0.05642274498417851,0.02123056118999324,0.23377817665490194,0.2757090373659236,0.615551048005409,0.6603110209601082,30.07707910750513,0.03975659229208925,5716.598242055447,178.75456389452353,4.960649087221096,0.17286405047031742,0.10122079189276552]
[0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0,0.0,0.0,0.0,0.045564223,-1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0]
[0.999426,363.0,1.0,1.0,0.980392157,0.980392157,21.0,0.25,0.0,0.444444444,1.0,0.716883117,113.3333333,1.0,1.0,100.0,1.0,207952.0,4997.0,22.0,1.0,1.0]
[0.10974244167559001,74.30082476809639,0.04126316989120241,0.02153343633200108,0.009211817450882448,0.005274933469767946,32.53918714591821,0.09396988697611545,0.0,0.0017177410346628928,0.020782634824610638,0.0027548394224293036,3.683788919674426,0.2366799607085986,0.22433071201674218,415.8785589543846,0.03818116876739597,7.877330081138463E7,32208.116247426184,10.45300904576431,0.03359363403832393,0.006277532884214705]
[5053.0,7354.0,7172.0,6821.0,6160.0,5128.0,7350.0,1257.0,0.0,7362.0,157.0,7395.0,7355.0,4552.0,4883.0,7347.0,294.0,7378.0,7395.0,6782.0,6868.0,7235.0]
val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors)val scaledData = data.map(lp => LabeledPoint(lp.label, scaler.transform(lp.features)))println(data.first.features)println(scaledData.first.features)println((0.789131 - 0.41225805299526636)/math.sqrt(0.1097424416755897))
[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]
[1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]
1.137647336497682
使用标准化重新训练
val lrModelScaled = LogisticRegressionWithSGD.train(scaledData, numIterations)val lrTotalCorrectScaled = scaledData.map { point => if (lrModelScaled.predict(point.features) == point.label) 1 else 0}.sumval lrAccuracyScaled = lrTotalCorrectScaled / numData// lrAccuracyScaled: Double = 0.6204192021636241val lrPredictionsVsTrue = scaledData.map { point => (lrModelScaled.predict(point.features), point.label)}val lrMetricsScaled = new BinaryClassificationMetrics(lrPredictionsVsTrue)val lrPr = lrMetricsScaled.areaUnderPRval lrRoc = lrMetricsScaled.areaUnderROCprintln(f"${lrModelScaled.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaled * 100}%2.4f%%\nArea under PR: ${lrPr * 100.0}%2.4f%%\nArea under ROC: ${lrRoc * 100.0}%2.4f%%")
LogisticRegressionModel
Accuracy: 62.0419%Area under PR: 72.7254%Area under ROC: 61.9663%val categories = records.map(r => r(3)).distinct.collect.zipWithIndex.toMapval numCategories = categories.sizeval dataCategories = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val categoryIdx = categories(r(3)) val categoryFeatures = Array.ofDim[Double](numCategories) categoryFeatures(categoryIdx) = 1.0 val otherFeatures = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble) val features = categoryFeatures ++ otherFeatures LabeledPoint(label, Vectors.dense(features))}println(dataCategories.first)
(0.0,[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])
标准化转换
// standardize the feature vectorsval scalerCats = new StandardScaler(withMean = true, withStd = true).fit(dataCategories.map(lp => lp.features))val scaledDataCats = dataCategories.map(lp => LabeledPoint(lp.label, scalerCats.transform(lp.features)))println(dataCategories.first.features)println(scaledDataCats.first.features)
[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]
[-0.02326210589837061,2.7207366564548514,-0.4464212047941535,-0.22052688457880879,-0.028494000387023734,-0.2709990696925828,-0.23272797709480803,-0.2016540523193296,-0.09914991930875496,-0.38181322324318134,-0.06487757239262681,-0.6807527904251456,-0.20418221057887365,-0.10189469097220732,1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]
使用扩展后的特征训练逻辑回归模型
val lrModelScaledCats = LogisticRegressionWithSGD.train(scaledDataCats, numIterations)val lrTotalCorrectScaledCats = scaledDataCats.map { point => if (lrModelScaledCats.predict(point.features) == point.label) 1 else 0}.sumval lrAccuracyScaledCats = lrTotalCorrectScaledCats / numDataval lrPredictionsVsTrueCats = scaledDataCats.map { point => (lrModelScaledCats.predict(point.features), point.label)}val lrMetricsScaledCats = new BinaryClassificationMetrics(lrPredictionsVsTrueCats)val lrPrCats = lrMetricsScaledCats.areaUnderPRval lrRocCats = lrMetricsScaledCats.areaUnderROCprintln(f"${lrModelScaledCats.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaledCats * 100}%2.4f%%\nArea under PR: ${lrPrCats * 100.0}%2.4f%%\nArea under ROC: ${lrRocCats * 100.0}%2.4f%%")
LogisticRegressionModel
Accuracy: 66.5720%Area under PR: 75.7964%Area under ROC: 66.5483%使用1-of-k便民店类型特征构建数据集
val dataNB = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val categoryIdx = categories(r(3)) val categoryFeatures = Array.ofDim[Double](numCategories) categoryFeatures(categoryIdx) = 1.0 LabeledPoint(label, Vectors.dense(categoryFeatures))}
重新训练贝叶斯模型,并评估性能
val nbModelCats = NaiveBayes.train(dataNB)val nbTotalCorrectCats = dataNB.map { point => if (nbModelCats.predict(point.features) == point.label) 1 else 0}.sumval nbAccuracyCats = nbTotalCorrectCats / numDataval nbPredictionsVsTrueCats = dataNB.map { point => (nbModelCats.predict(point.features), point.label)}val nbMetricsCats = new BinaryClassificationMetrics(nbPredictionsVsTrueCats)val nbPrCats = nbMetricsCats.areaUnderPRval nbRocCats = nbMetricsCats.areaUnderROCprintln(f"${nbModelCats.getClass.getSimpleName}\nAccuracy: ${nbAccuracyCats * 100}%2.4f%%\nArea under PR: ${nbPrCats * 100.0}%2.4f%%\nArea under ROC: ${nbRocCats * 100.0}%2.4f%%")
NaiveBayesModel
Accuracy: 60.9601%Area under PR: 74.0522%Area under ROC: 60.5138%基础优化技术:随机梯度下降(SGD)
辅助函数:根据输入,训练模型
// helper function to train a logistic regresson modeldef trainWithParams(input: RDD[LabeledPoint], regParam: Double, numIterations: Int, updater: Updater, stepSize: Double) = { val lr = new LogisticRegressionWithSGD lr.optimizer.setNumIterations(numIterations).setUpdater(updater).setRegParam(regParam).setStepSize(stepSize) lr.run(input)}
辅助函数:根据输入数据和分类模型,计算AUC
// helper function to create AUC metricdef createMetrics(label: String, data: RDD[LabeledPoint], model: ClassificationModel) = { val scoreAndLabels = data.map { point => (model.predict(point.features), point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (label, metrics.areaUnderROC)}
迭代次数调优
// cache the data to increase speed of multiple runs agains the datasetscaledDataCats.cache// num iterationsval iterResults = Seq(1, 5, 10, 50).map { param => val model = trainWithParams(scaledDataCats, 0.0, param, new SimpleUpdater, 1.0) createMetrics(s"$param iterations", scaledDataCats, model)}iterResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }
1 iterations, AUC = 64.97%
5 iterations, AUC = 66.62%10 iterations, AUC = 66.55%50 iterations, AUC = 66.81%步长调优
// step sizeval numIterations = 10val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param => val model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater, param) createMetrics(s"$param step size", scaledDataCats, model)}stepResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }
0.001 step size, AUC = 64.95%
0.01 step size, AUC = 65.00%0.1 step size, AUC = 65.52%1.0 step size, AUC = 66.55%10.0 step size, AUC = 61.92%使用SquaredL2Updater研究正则化参数
// regularizationval regResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param => val model = trainWithParams(scaledDataCats, param, numIterations, new SquaredL2Updater, 1.0) createMetrics(s"$param L2 regularization parameter", scaledDataCats, model)}regResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }
0.001 L2 regularization parameter, AUC = 66.55%
0.01 L2 regularization parameter, AUC = 66.55%0.1 L2 regularization parameter, AUC = 66.63%1.0 L2 regularization parameter, AUC = 66.04%10.0 L2 regularization parameter, AUC = 35.33%辅助函数:接收树的深度和不纯度
// investigate decision treedef trainDTWithParams(input: RDD[LabeledPoint], maxDepth: Int, impurity: Impurity) = { DecisionTree.train(input, Algo.Classification, impurity, maxDepth)}
使用Entropy不纯度
// investigate tree depth impact for Entropy impurityval dtResultsEntropy = Seq(1, 2, 3, 4, 5, 10, 20).map { param => val model = trainDTWithParams(data, param, Entropy) val scoreAndLabels = data.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (s"$param tree depth", metrics.areaUnderROC)}dtResultsEntropy.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }
1 tree depth, AUC = 59.33%
2 tree depth, AUC = 61.68%3 tree depth, AUC = 62.61%4 tree depth, AUC = 63.63%5 tree depth, AUC = 64.88%10 tree depth, AUC = 76.26%20 tree depth, AUC = 98.45%使用Gini不纯度
// investigate tree depth impact for Gini impurityval dtResultsGini = Seq(1, 2, 3, 4, 5, 10, 20).map { param => val model = trainDTWithParams(data, param, Gini) val scoreAndLabels = data.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (s"$param tree depth", metrics.areaUnderROC)}dtResultsGini.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }
1 tree depth, AUC = 59.33%
2 tree depth, AUC = 61.68%3 tree depth, AUC = 62.61%4 tree depth, AUC = 63.63%5 tree depth, AUC = 64.89%10 tree depth, AUC = 78.37%20 tree depth, AUC = 98.87%辅助函数:接收lamda参数
// investigate Naive Bayes parametersdef trainNBWithParams(input: RDD[LabeledPoint], lambda: Double) = { val nb = new NaiveBayes nb.setLambda(lambda) nb.run(input)}
val nbResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param => val model = trainNBWithParams(dataNB, param) val scoreAndLabels = dataNB.map { point => (model.predict(point.features), point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (s"$param lambda", metrics.areaUnderROC)}nbResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }
0.001 lambda, AUC = 60.51%
0.01 lambda, AUC = 60.51%0.1 lambda, AUC = 60.51%1.0 lambda, AUC = 60.51%10.0 lambda, AUC = 60.51%划分训练集和测试集
// illustrate cross-validation// create a 60% / 40% train/test data splitval trainTestSplit = scaledDataCats.randomSplit(Array(0.6, 0.4), 123)val train = trainTestSplit(0)val test = trainTestSplit(1)
测试集的模型性能
val regResultsTest = Seq(0.0, 0.001, 0.0025, 0.005, 0.01).map { param => val model = trainWithParams(train, param, numIterations, new SquaredL2Updater, 1.0) createMetrics(s"$param L2 regularization parameter", test, model)}regResultsTest.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.6f%%") }
0.0 L2 regularization parameter, AUC = 66.480874%
0.001 L2 regularization parameter, AUC = 66.480874%0.0025 L2 regularization parameter, AUC = 66.515027%0.005 L2 regularization parameter, AUC = 66.515027%0.01 L2 regularization parameter, AUC = 66.549180%训练集的模型性能
// training set resultsval regResultsTrain = Seq(0.0, 0.001, 0.0025, 0.005, 0.01).map { param => val model = trainWithParams(train, param, numIterations, new SquaredL2Updater, 1.0) createMetrics(s"$param L2 regularization parameter", train, model)}regResultsTrain.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.6f%%") }
0.0 L2 regularization parameter, AUC = 66.260311%
0.001 L2 regularization parameter, AUC = 66.260311%0.0025 L2 regularization parameter, AUC = 66.260311%0.005 L2 regularization parameter, AUC = 66.238294%0.01 L2 regularization parameter, AUC = 66.238294%转载地址:http://euitl.baihongyu.com/