【大数据学习 | Spark-Core】Spark的kryo序列化
1. 前言
由于大多数Spark计算的内存性质,Spark程序可能会受到集群中任何资源(CPU,网络带宽或内存)的瓶颈。通常,如果内存资源足够,则瓶颈是网络带宽。
数据序列化,这对于良好的网络性能至关重要。
在Spark的架构中,在网络中传递的或者缓存在内存、硬盘中的对象需要进行序列化操作。比如:
1)分发给Executor上的Task
2)广播变量
3)Shuffle过程中的数据缓存
等操作,序列化起到了重要的作用,将对象序列化为慢速格式或占用大量字节的格式将大大减慢计算速度。通常,这是优化Spark应用程序的第一件事。
spark 序列化分两种:一种是Java 序列化; 另一种是 Kryo 序列化。
2. Java序列化
定义UserInfo类
public class UserInfo{
private String name = "hainiu"; // java实现了序列化
private int age = 10; // java实现了序列化
private Text addr = new Text("beijing"); // 没有实现java的 Serializable接口
public UserInfo() {
}
@Override
public String toString() {
return "UserInfo{" +
"name='" + name + '\'' +
", age=" + age +
", addr=" + addr +
'}';
}
}
java实现序列化的一般方法:
1)让类实现Serializable接口
当使用Serializable方案的时候,你的对象必须继承Serializable接口,类中的属性如果有实例那也必须是继承Serializable 可序列化的;
package com.hainiu.sparkcore
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SerDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SerDemo")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.parallelize(List("aa","aa","bb","aa"),2)
val broad: Broadcast[UserInfo] = sc.broadcast(new UserInfo)
val pairRdd: RDD[(String, UserInfo)] = rdd.map(f => {
val userInfo: UserInfo = broad.value
(f, userInfo)
})
// 因为groupByKey有shuffle,需要序列化
val groupRdd: RDD[(String, Iterable[UserInfo])] = pairRdd.groupByKey()
val arr: Array[(String, Iterable[UserInfo])] = groupRdd.collect()
for(t <- arr){
println(t)
}
}
}
2)static和transient修饰的属性不会被序列化,可以通过在属性上加 static 或 transient 修饰来解决序列化问题。
static修饰的是类的状态,而不是对象状态,所以不存在序列化问题;
这样导致数据丢失。
给addr 属性用 transient 修饰,导致反序列化后数据丢失
java 序列化弊端:
1)如果引入第三方类对象作为属性,如果对象没有实现序列化,那这个类也不能序列化;
2)用 transient 修饰 的属性,反序列化后数据丢失;
3)Java序列化很灵活(支持所有对象的序列化)但性能较差,同时序列化后占用的字节数也较多(包含了序列化版本号、类名等信息);
3. Kryo 序列化
由于java序列化性能问题,spark 引入了Kryo序列化机制。
Spark 也推荐用 Kryo序列化机制。Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便。
1)开启序列化
spark 默认序列化方式 是 用java序列化。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // classOf[KryoSerializer].getName 一样效果
2)配置序列化参数
当开启序列化后,需要配置 【spark.kryo.registrationRequired】属性为true,默认是false,如果是false,Kryo序列化时性能有所下降。
注册有两种方式:
第一种:
// 开启Kryo序列化
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 要求主动注册
conf.set("spark.kryo.registrationRequired", "true")
// 方案1:
val classes: Array[Class[_]] = Array[Class[_]](
classOf[UserInfo],
classOf[Text],
Class.forName("scala.reflect.ClassTag$GenericClassTag"),
classOf[Array[UserInfo]]
)
//将上面的类注册
conf.registerKryoClasses(classes)
第二种:
封装一个自定义注册类,然后把自定义注册类注册给Kryo。
a)自定义注册类:
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[UserInfo])
kryo.register(classOf[Text])
kryo.register(Class.forName("scala.reflect.ClassTag$GenericClassTag"))
kryo.register(classOf[Array[UserInfo]])
}
}
b)配置自定义注册类
// 开启Kryo序列化
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 要求主动注册
conf.set("spark.kryo.registrationRequired", "true")
// 设置注册类
conf.set("spark.kryo.registrator",classOf[MyRegistrator].getName)
代码:
package spark05
import com.esotericsoftware.kryo.Kryo
import java05.UserInfo
import org.apache.hadoop.io.Text
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.{SparkConf, SparkContext}
object SerDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SerDemo")
// 开启Kryo序列化
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 要求主动注册
conf.set("spark.kryo.registrationRequired", "true")
// 方案1:
// val classes: Array[Class[_]] = Array[Class[_]](
// classOf[UserInfo],
// classOf[Text],
// Class.forName("scala.reflect.ClassTag$GenericClassTag"),
// classOf[Array[UserInfo]]
// )
//将上面的类注册
// conf.registerKryoClasses(classes)
// 方案2
conf.set("spark.kryo.registrator",classOf[MyRegistrator].getName)
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.parallelize(List("aa","aa","bb","aa"),2)
val user = new UserInfo
val broad: Broadcast[UserInfo] = sc.broadcast(user)
val rdd2: RDD[(String, UserInfo)] = rdd.map(f => {
val user2: UserInfo = broad.value
(f, user2)
})
// 目的是让rdd产生shuffle
val arr: Array[(String, Iterable[UserInfo])] = rdd2.groupByKey().collect()
arr.foreach(println)
}
}
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[UserInfo])
kryo.register(classOf[Text])
kryo.register(Class.forName("scala.reflect.ClassTag$GenericClassTag"))
kryo.register(classOf[Array[UserInfo]])
}
}