当前位置: 首页 > news >正文

Spark算子 - Python

目录

第一关 Transformation - map

第二关 Transformation - mapPartitions

第三关 Transformation - filter

第四关 Transformation - flatMap

第五关 Transformation - distinct

第六关 Transformation - sortBy

第七关 Transformation - sortByKey

第八关 Transformation - mapValues

第九关 Transformations - reduceByKey

第十关 Actions - 常用算子

第一关 Transformation - map

from pyspark import SparkContext # -*- coding: UTF-8 -*- if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个1到5的列表List data = [1, 2, 3, 4, 5] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) """ 使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下: 需求: 偶数转换成该数的平方 奇数转换成该数的立方 """ # 5.使用 map 算子完成以上需求 rdd_map = rdd.map(lambda x: x * x if x % 2 == 0 else x * x * x) # 6.使用rdd.collect() 收集完成 map 转换的元素 print(rdd_map.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#

第二关 Transformation - mapPartitions

# -*- coding: UTF-8 -*- from pyspark import SparkContext #********** Begin **********# def f(iterator): list = [] for x in iterator: list.append((x, len(x))) return list #********** End **********# if __name__ == "__main__": # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List data = ["dog", "salmon", "salmon", "rat", "elephant"] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) """ 使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下: 需求: 将字符串与该字符串的长度组合成一个元组,例如: dog --> (dog,3) salmon --> (salmon,6) """ # 5.使用 mapPartitions 算子完成以上需求 partitions = rdd.mapPartitions(f) # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素 print(partitions.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#

第三关 Transformation - filter

# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个1到8的列表List data = [1, 2, 3, 4, 5, 6, 7, 8] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) """ 使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下: 需求: 过滤掉rdd中的奇数 """ # 5.使用 filter 算子完成以上需求 rdd_filter = rdd.filter(lambda x: x % 2 == 0) # 6.使用rdd.collect() 收集完成 filter 转换的元素 print(rdd_filter.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#

第四关 Transformation - flatMap

# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) """ 使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下: 需求: 合并RDD的元素,例如: ([1,2,3],[4,5,6]) --> (1,2,3,4,5,6) ([2,3],[4,5],[6]) --> (1,2,3,4,5,6) """ # 5.使用 filter 算子完成以上需求 flat_map = rdd.flatMap(lambda x: x) # 6.使用rdd.collect() 收集完成 filter 转换的元素 print(flat_map.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#

第五关 Transformation - distinct

# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下: 需求: 元素去重,例如: 1,2,3,3,2,1 --> 1,2,3 1,1,1,1, --> 1 """ # 5.使用 distinct 算子完成以上需求 distinctResult = rdd.distinct() # 6.使用rdd.collect() 收集完成 distinct 转换的元素 print(distinctResult.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#

第六关 Transformation - sortBy

# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List data = [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下: 需求: 元素排序,例如: 5,4,3,1,2 --> 1,2,3,4,5 """ # 5.使用 sortBy 算子完成以上需求 sort_result = rdd.sortBy(lambda x: x) # 6.使用rdd.collect() 收集完成 sortBy 转换的元素 print(sort_result.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#

第七关 Transformation - sortByKey

# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List data = [('B', 1), ('A', 2), ('C', 3)] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下: 需求: 元素排序,例如: [(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)] """ # 5.使用 sortByKey 算子完成以上需求 sort_by_key = rdd.sortByKey() # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素 print(sort_by_key.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#

第八关 Transformation - mapValues

# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下: 需求: 元素(key,value)的value进行以下操作: 偶数转换成该数的平方 奇数转换成该数的立方 """ # 5.使用 mapValues 算子完成以上需求 values = rdd.mapValues(lambda x: x * x if x % 2 == 0 else x * x * x) # 6.使用rdd.collect() 收集完成 mapValues 转换的元素 print(values.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#

第九关 Transformations - reduceByKey

# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下: 需求: 元素(key-value)的value累加操作,例如: (1,1),(1,1),(1,2) --> (1,4) (1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4) """ # 5.使用 reduceByKey 算子完成以上需求 result = rdd.reduceByKey(lambda x, y: x + y) # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素 print(result.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#

第十关 Actions - 常用算子

# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List data = [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.收集rdd的所有元素并print输出 print(rdd.collect()) # 5.统计rdd的元素个数并print输出 print(rdd.count()) # 6.获取rdd的第一个元素并print输出 print(rdd.first()) # 7.获取rdd的前3个元素并print输出 print(rdd.take(3)) # 8.聚合rdd的所有元素并print输出 print(rdd.reduce(lambda x, y: x + y)) # 9.停止 SparkContext sc.stop() # ********** End **********#
http://www.cnnetsun.cn/news/2619079.html

相关文章:

  • 完全免费!不用花一分钱调用 GPT4!公司代码不会泄露!断网也能用!
  • uVision调试器C++开发限制与解决方案
  • 基于SQLite的本地化二次智能决策系统设计与实现
  • 3分钟解锁网易云音乐NCM格式:Windows用户必备的免费图形化解密工具终极指南
  • U盘版小龙虾教程
  • 【他山之石】盖瑞·查普曼《爱的五种语言》导读
  • 手把手教你用Claude Code打造自己的视频创作智能体(1)
  • 从数据看板到决策智能体:基于因果推断与约束优化的自动化策略生成实战
  • 记录AI学习之路Day01 Vibe Coding
  • 上海办公室装修省钱技巧
  • 自条件化与非自回归吸引子:提升端到端说话人日志模型性能
  • Switch游戏画面电脑同步终极指南:5分钟实现高清直播录制
  • 从提示词工程到上下文工程:构建AI就绪的项目心智模型
  • 角色驱动AI编程工作流:从概念到实践,构建你的虚拟开发团队
  • 深度解析signature_pad:HTML5 Canvas平滑签名绘制技术实现与高级优化
  • NCCL性能调优必看:如何通过环境变量NCCL_TOPO_FILE与源码理解自定义机器拓扑
  • 美少女万华镜1-4下载2026最新
  • 多模态输入总报错?Gemini最新v1.5 API兼容性全解析,92%开发者忽略的4个元数据校验盲区
  • 告别APK/IPA文件图标混乱!ApkShellext2让Windows资源管理器完美显示应用图标
  • 如何高效提取网页媒体资源:猫抓资源嗅探工具完全指南
  • 批处理脚本实现语音计算器:Windows自动化入门实践
  • 别再硬算方差了!用Delta方法5分钟搞定样本标准差的标准误(附R/Python代码)
  • 电脑文件杂乱无从下手?一文讲透通用文件分类方法与实用管理工具
  • 电源动态测试到底有没有必要?负载固定为什么还要测瞬态响应?(工程师必看)
  • 别再混淆min和argmin了!用Python和NumPy代码实例讲透机器学习里的这两个关键操作
  • 3个步骤+20个模板:用Obsidian搭建你的第二大脑知识管理系统
  • 简历工具哪家强?8款市场热门产品深度测评,避坑指南与实战建议
  • 推荐效果停滞不前?Gemini策略迭代已进入“微调临界点”——48小时紧急升级清单
  • 完全掌握BG3模组管理器:专业解决博德之门3模组冲突的实战指南
  • 全面解析开源项目:高效实现Switch游戏画面跨平台传输的完整指南