机器学习---pySpark代码开发
1、eclipse开发pySpark程序
在eclipse中开发pySpark程序,需要安装pydev插件。
1).eclipse安装python插件,安装完成后重启。
2). 在window--->preferences中找到python interpreter配置安装python的路径:
3).新建python项目:
2、pyCharm开发pySpark程序
pyCharm是专为开发python的工具,在pyCharm中直接可以编写pySpark代码。
3、IDEA开发pySpark程序
IDEA中开发pySpark代码需要安装python插件。
4、python wordcount
pySpark WordCount 案例:
#coding:utf-8
#从pyspark中导入相应的包
from pyspark import SparkConf
from pyspark import SparkContext
def show(x):
print x
if __name__ == '__main__':
conf = SparkConf()
conf.setAppName("wordcount")
conf.setMaster("local")
sc = SparkContext(conf=conf)
lines = sc.textFile("../../data/words", 2)
print "lines rdd partition length = %d"%(lines.getNumPartitions())
words = lines.flatMap(lambda line:line.split(" "), True)
pairWords = words.map(lambda word : (word,1),True)
result = pairWords.reduceByKey(lambda v1,v2:v1+v2, 3)
print "result rdd partition length = %d"%(result.getNumPartitions())
result.foreach(lambda t :show(t))
#将结果保存到文件
result.saveAsTextFile("../../data/wc-result")
eclipse console 控制台乱码问题:
eclipse控制台只支持GBK编码。运行时需要修改编码,运行python文件时,
右键->Run As->Run Configurations->Common->Encoding 改为GBK,没有GBK就直接输入运行。
5、python开发Spark原理
使用python api编写pyspark代码提交运行时,为了不破坏spark原有的运行架构,会将写好的代码首先在python解析器中运行(cpython),Spark代码归根结底是运行在JVM中的,这里python借助Py4j实现Python和Java的交互,即通过Py4j将pyspark代码“解析”到JVM中去运行。例如,在pyspark代码中实例化一个SparkContext对象,那么通过py4j最终在JVM中会创建scala的SparkContext对象及后期对象的调用、在JVM中数据处理消息的日志会返回到python进程中、如果在代码中会回收大量结果数据到Driver端中,也会通过socket通信返回到python进程中。这样在python进程和JVM进程之间就有大量通信。
python开发spark,需要进行大量的进程间的通信,如果通信量过大,会出现“socket write error”错误,应尽量少使用回收数据类算子,也可以调节回收日志的级别,降低进程之间的通信。
6、python开发Spark问题
1)、关于安装路径
jdk,anaconda及python的安装路径中不能有空格和中文。
2)、指定spark使用的python版本
如果使用的anaconda更换了python3.5.x版本,之后在开发工具中指定了python解析器为3.5.x版本之后,运行python spark 代码时spark默认的使用的python版本使环境变量中指定的版本。会导致与指定的python解析器的python版本不一致。这时需要在环境变量中指定下PYSPARK_PYTHON环境变量即可,值为指定的python3.5.x python解析器路径。如: