当前位置: 首页 > article >正文

机器学习---pySpark案例

1、统计PV,UV

1.if __name__ == '__main__':
2.    conf = SparkConf()
3.    conf.setMaster("local")
4.    conf.setAppName("test")
5.    sc = SparkContext(conf=conf)
6.
7.    #pv
8.    sc.textFile("./pvuv").map(lambda line:(line.split("\t")[4],1)).reduceByKey(lambda v1,v2:v1+v2).sortBy(lambda tp:tp[1],ascending=False).foreach(print)
9.
10.    #uv
11.sc.textFile("./pvuv").map(lambda line:line.split("\t")[1]+"_"+line.split("\t")[4]).distinct().map(lambda one:(one.split("_")[1],1)).reduceByKey(lambda v1,v2:v1+v2).sortBy(lambda tp:tp[1],ascending=False).foreach(print)

2、统计除了某个地区外的UV

1.if __name__ == '__main__':
2.    conf = SparkConf()
3.    conf.setMaster("local")
4.    conf.setAppName("test")
5.    sc = SparkContext(conf=conf)
6.
7.    #uv
8.    sc.textFile("./pvuv").filter(lambda line:line.split("\t")[3]=='beijing').map(lambda line:line.split("\t")[1]+"_"+line.split("\t")[4]).distinct().map(lambda one:(one.split("_")[1],1)).reduceByKey(lambda v1,v2:v1+v2).sortBy(lambda tp:tp[1],ascending=False).foreach(print)

3、统计每个网站最活跃的top2地区

1.def get_top2_local(one):
2.    site = one[0]
3.    local_iterable = one[1]
4.
5.    local_dic = {}
6.    for local in local_iterable:
7.      if local in local_dic:
8.        local_dic[local] += 1
9.      else:
10.        local_dic[local] = 1
11.
12.    sorted_list = sorted(local_dic.items(),key = lambda x:x[1],reverse= True)
13.    return_list = []
14.    if(len(sorted_list)>=2):
15.      for i in range(0,2):
16.        return_list.append(sorted_list[i])
17.      else:
18.        return_list = sorted_list
19.
20.    return return_list
21.
22.
23.if __name__ == '__main__':
24.    conf = SparkConf()
25.    conf.setMaster("local")
26.    conf.setAppName("test")
27.    sc = SparkContext(conf=conf)
28.
29.    #统计每个网站最活跃的top2地区
30.    lines = sc.textFile("./pvuv")
31.    site_local = lines.map(lambda line:(line.split("\t")[4],line.split("\t")[3]))
32.    site_localIterable = site_local.groupByKey()
33.    sorted_result = site_localIterable.map(lambda one:get_top2_local(one))
34.    sorted_result.foreach(print)
35. 

4、统计每个网站最热门的操作

1.def get_hot_operator(one):
2.    site = one[0]
3.    operator_iterable = one[1]
4.
5.    operator_dic = {}
6.    for operator in operator_iterable:
7.      if operator in operator_dic:
8.        operator_dic[operator] += 1
9.      else:
10.        operator_dic[operator] = 1
11.
12.    sorted_list = sorted(operator_dic.items(),key = lambda x:x[1],reverse= True)
13.    return_list = []
14.    if(len(sorted_list)>=2):
15.    for i in range(0,1):
16.      return_list.append(sorted_list[i])
17.    else:
18.      return_list = sorted_list
19.
20.    return return_list
21.
22.
23.if __name__ == '__main__':
24.    conf = SparkConf()
25.    conf.setMaster("local")
26.    conf.setAppName("test")
27.    sc = SparkContext(conf=conf)
28.
29.    #统计每个网站最热门的操作
30.    lines = sc.textFile("./pvuv")
31.    site_operator = lines.map(lambda line:(line.split("\t")[4],line.split("\t")[5]))
32.    site_operatorIterable = site_operator.groupByKey()
33.    sorted_result = site_operatorIterable.map(lambda one:get_hot_operator(one))
34.    sorted_result.foreach(print)
35.

5、统计每个网站下最活跃的top3用户

1.def get_uid_site_count(one):
2.    uid = one[0]
3.    site_iterable = one[1]
4.
5.    site_dic = {}
6.    for site in site_iterable:
7.      if site in site_dic:
8.        site_dic[site] += 1
9.      else:
10.        site_dic[site] = 1
11.
12.    return_list = []
13.    for site,count in site_dic.items():
14.      return_list.append((site,(uid,count)))
15.    return return_list
16.
17.def get_top3_uid(one):
18.    site = one[0]
19.    uid_count_iterable = one[1]
20.    top3_uid = ['','','']
21.    for tp in uid_count_iterable:
22.      uid = tp[0]
23.      count = tp[1]
24.        for i in range(0,len(top3_uid)):
25.          if(top3_uid[i]==''):
26.            top3_uid[i] = tp
27.            break
28.          elif(count > top3_uid[i][1]):
29.            for j in range(2,i,-1):
30.              top3_uid[j] = top3_uid[j-1]
31.              top3_uid[i] = tp
32.              break
33.
34.       return top3_uid
35.
36.
37.
38.if __name__ == '__main__':
39.    conf = SparkConf()
40.    conf.setMaster("local")
41.    conf.setAppName("test")
42.    sc = SparkContext(conf=conf)
43.
44.    #统计每个网站最活跃的top3用户
45.    lines = sc.textFile("./pvuv")
46.    uid_site = lines.map(lambda line:(line.split("\t")[2],line.split("\t")[4]))
47.    uid_siteIterable = uid_site.groupByKey()
48.    uid_site_count = uid_siteIterable.flatMap(lambda one:get_uid_site_count(one))
49.    top3_uid_info = uid_site_count.groupByKey().map(lambda one:get_top3_uid(one))
50.    top3_uid_info.foreach(print)


http://www.kler.cn/a/160486.html

相关文章:

  • Nginx配置VTS模块-对接Promethues监控
  • 单体 vs 微服务 怎么选?
  • 中等难度——python实现电子宠物和截图工具
  • 快速、可靠且高性价比的定制IP模式提升芯片设计公司竞争力
  • Ubuntu Server挂载AWS S3成一个本地文件夹
  • Centos9 + Docker 安装 MySQL8.4.0 + 定时备份数据库到本地
  • 深入理解Vue.js中的this:解析this关键字及其使用场景
  • uniapp实战 —— 分类导航【详解】
  • 设置webstorm和idea符合Alibaba规范
  • 【Docker】从零开始:17.Dockerfile基本概念
  • 指定分隔符对字符串进行分割 numpy.char.split()
  • 自然语言处理(NLP)技术-AI生成版
  • Flinksql bug :Illegal mixing of types in CASE or COALESCE statement
  • 按天批量创建间隔分区表(DM8:达梦数据库)
  • 【PTA-C语言】编程练习4 - 数组Ⅰ
  • HarmonyOS4.0从零开始的开发教程08构建列表页面
  • 17、XSS——session攻击
  • 【动态规划】LeetCode-面试题 17.16. 按摩师
  • 配置阿里云CLI-aliyun命令与安装ossutil
  • 数据结构之交换排序
  • Flink优化——数据倾斜(二)
  • ssh远程连接服务器
  • ELK的日志解决方案
  • PACS源码,医学影像传输系统源码,全院级应用,支持放射、超声、内窥镜、病理等影像科室,且具备多种图像处理及三维重建功能
  • Kafka 的消息格式:了解消息结构与序列化
  • 2023字节跳动软件测试工程师面试题及答案分享