spark初步探索
今天我阅读了《基于spark下一代机器学习》这本书,里面spark与sql进行了结合,关于传统领域cv,nlp我认为使用大数据处理并不如python处理方便快捷。学习了里面的基础操作,之前学习的spl也可以进行相对的结合。
在书中我会使用了XGBoost,LightGBM,等模型,并且学习了大数据经i行数据预处理的方法
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.functions._
import ml.dmlc.xgboost4j.scala.{XGBoost, DMatrix}
object XGBoostExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("XGBoostExample")
.getOrCreate()
// 加载数据集
val data = spark.read.option("header", "true").csv("path/to/your/data.csv")
// 选择特征和标签
val featureCols = data.columns.dropRight(1) // 假设最后一列是标签
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val transformedData = assembler.transform(data).select(col("features"), col("label").cast("float"))
// 划分训练集和测试集
val Array(trainData, testData) = transformedData.randomSplit(Array(0.8, 0.2), seed = 42)
// 将数据转换为 DMatrix
val dtrain = new DMatrix(trainData.rdd.map(row => (row.getAs[org.apache.spark.ml.linalg.Vector]("features").toArray, row.getAs[Float]("label"))).collect())
val dtest = new DMatrix(testData.rdd.map(row => (row.getAs[org.apache.spark.ml.linalg.Vector]("features").toArray, row.getAs[Float]("label"))).collect())
// 设置 XGBoost 参数
val params = Map(
"eta" -> 0.1,
"max_depth" -> 3,
"objective" -> "binary:logistic",
"silent" -> 1,
"nthread" -> 4
)
// 训练模型
val bst = XGBoost.train(dtrain, params, 100)
// 进行预测
val preds = bst.predict(dtest)
// 评估模型
val labels = dtest.getLabel
val accuracy = preds.zip(labels).count { case (pred, label) => Math.round(pred) == label } / labels.length.toDouble
println(s"Accuracy: $accuracy")
// 停止 Spark 会话
spark.stop()
}
}