当前位置: 首页 > news >正文

Spark SQL详解(二):RDD转换DataFrame与Spark SQL读写数据库

摘要:本文深入讲解Spark SQL中RDD与DataFrame的互转机制,包括反射推断模式和编程式定义模式两种转换方式。同时系统讲解Spark SQL通过JDBC连接MySQL数据库的完整流程,涵盖依赖导入、数据读取、数据写入等实战操作,配合完整的Scala代码示例和常见错误排查。


一、RDD转换DataFrame

Spark提供了两种方法将RDD转换为DataFrame:利用反射机制推断RDD模式,以及使用编程方式定义RDD模式。两种方法各有适用场景,开发者可根据实际情况选择。

1.1 方法一:利用反射机制推断RDD模式

原理:通过定义case class,利用Spark的隐式转换机制,自动将RDD[CaseClass]转换为DataFrame。

核心特点

  • 简洁高效,代码量少
  • 自动推断字段名和类型
  • 必须使用case class(普通class不支持)
  • case class必须定义在main方法之外

完整代码实现:

importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,SparkSession}objectRDDToDFByReflection{// case class必须放到main方法之外,伴生对象下// 因为隐式转换时会通过 伴生对象名.case类名 来调用caseclassPerson(name:String,age:Long)defmain(args:Array[String]):Unit={valspark=SparkSession.builder().master("local[*]").appName("RDD-To-DF-Reflection").getOrCreate()// 导入隐式转换,这里的spark是SparkSession对象,不是org.apache.spark包importspark.implicits._// 1. 读取文本文件,解析为RDD[Person]valrdd:RDD[Person]=spark.sparkContext.textFile("data/sql/people.txt").map(line=>line.split(",")).map(t=>Person(t(0).trim,t(1).trim.toLong))// 2. 隐式转换:RDD[Person] -> DataFramevaldf:DataFrame=rdd.toDF()// 3. 注册临时视图,执行SQL查询df.createOrReplaceTempView("people")spark.sql("SELECT * FROM people WHERE age > 20").show()spark.stop()}}

输入数据(people.txt):

Tom, 21 Mike, 25 Andy, 18

运行结果:

+----+---+ |name|age| +----+---+ | Tom| 21| |Mike| 25| +----+---+

关键注意事项:

注意点说明错误后果
case class位置必须放在main方法之外,伴生对象下编译报错,找不到case类
implicits导入import spark.implicits._中的spark是SparkSession对象导入错误将无法隐式转换
数据类型匹配case class字段类型需与数据匹配类型转换异常
空值处理数值类型建议用Long/DoubleInt可能溢出

数据流转图解:

文本文件: "Tom, 21" "Mike, 25" "Andy, 18" ↓ textFile + map + map RDD[Person]: Person("Tom", 21) Person("Mike", 25) Person("Andy", 18) ↓ toDF() (隐式转换) DataFrame: +----+---+ |name|age| +----+---+ | Tom| 21| |Mike| 25| |Andy| 18| +----+---+

1.2 方法二:使用编程方式定义RDD模式

原理:通过StructType定义Schema(表头),通过Row定义每条记录,最后调用createDataFrame将两者拼接。

核心特点

  • 无需定义case class,更灵活
  • 适合动态Schema场景(字段不确定)
  • 代码稍繁琐,但不易出错
  • 运行时类型安全

完整代码实现:

importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}objectRDDToDFByProgramming{defmain(args:Array[String]):Unit={valspark=SparkSession.builder().master("local[*]").appName("RDD-To-DF-Programming").getOrCreate()// Step 1: 制作表头 - 定义Schema结构valschema:StructType=StructType(Array(StructField("name",StringType,nullable=true),StructField("age",IntegerType,nullable=true)))// Step 2: 制作表中记录 - 读取文件生成RDD[Row]valrowRDD:RDD[Row]=spark.sparkContext.textFile("data/sql/people.txt").map(_.split(",")).map(attr=>Row(attr(0).trim,attr(1).trim.toInt))// Step 3: 拼接表头和记录 - 创建DataFramevalpeopleDF:DataFrame=spark.createDataFrame(rowRDD,schema)// 注册临时视图并查询peopleDF.createOrReplaceTempView("people")spark.sql("SELECT * FROM people WHERE age > 20").show()spark.stop()}}

运行结果(同上):

+----+---+ |name|age| +----+---+ | Tom| 21| |Mike| 25| +----+---+

三个核心步骤详解:

步骤操作代码作用
1制作表头StructType(Array(StructField(...)))定义字段名、类型、可空性
2制作记录RDD[Row]将原始数据转换为Row对象
3拼接合并spark.createDataFrame(rowRDD, schema)将Schema和RowRDD合并为DataFrame

Row对象的创建方式:

// 方式1:按位置传入值(需与Schema顺序一致)valrow1=Row("Tom",21)valrow2=Row("Mike",25)// 方式2:通过索引访问值valname=row1.getString(0)// "Tom"valage=row1.getInt(1)// 21// 方式3:类型安全的获取(推荐)valname=row1.getAs[String](0)valage=row1.getAs[Int](1)

1.3 两种方法对比

特性反射推断模式编程式定义模式
代码量较多
灵活性低(需预定义case class)高(动态定义Schema)
类型安全编译时检查运行时检查
适用场景字段固定的结构化数据字段动态变化的数据
性能相同(底层都转为RDD)相同
错误排查相对困难相对容易
case class必须不需要

选择建议:

  • 字段固定、类型明确 → 反射推断模式(代码简洁)
  • 字段动态、Schema不确定 → 编程式定义模式(灵活可控)

二、Spark SQL读写MySQL数据库

Spark SQL通过JDBC连接器可以方便地读写关系型数据库,本节以MySQL为例进行讲解。

2.1 导入依赖

在项目的pom.xml中添加MySQL JDBC驱动依赖:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.31</version></dependency>

版本注意事项:

MySQL版本JDBC驱动类URL格式
MySQL 5.xcom.mysql.jdbc.Driverjdbc:mysql://host:3306/db
MySQL 8.xcom.mysql.cj.jdbc.Driverjdbc:mysql://host:3306/db?serverTimezone=UTC

注意:MySQL 8.0必须使用com.mysql.cj.jdbc.Driver,使用旧驱动会报错。


2.2 读取MySQL数据

通过spark.read.format("jdbc")读取数据库表数据。

完整代码:

importorg.apache.spark.sql.{DataFrame,SparkSession}objectReadMySQL{defmain(args:Array[String]):Unit={valspark=SparkSession.builder().master("local[*]").appName("Read-MySQL").getOrCreate()// 方式1:使用format("jdbc") + option链式配置valmysqlDF:DataFrame=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","student").option("user","root").option("password","your_password").load()// 方式2:使用jdbc()方法(更简洁)valmysqlDF2=spark.read.jdbc("jdbc:mysql://localhost:3306/spark","student",properties)mysqlDF.show()spark.stop()}}

JDBC常用配置选项:

选项必填说明示例
urlJDBC连接URLjdbc:mysql://localhost:3306/spark
driverJDBC驱动类名com.mysql.cj.jdbc.Driver
dbtable表名或SQL子查询student(SELECT * FROM student WHERE age>20) tmp
user数据库用户名root
password数据库密码123456
partitionColumn分区列(用于并行读取)id
lowerBound分区下界1
upperBound分区上界10000
numPartitions并行分区数4
fetchsize每次获取行数1000

运行结果:

+---+----+---+---+ | id|name|age|sex| +---+----+---+---+ | 1| Tom| 21| 男| | 2|Andy| 20| 女| +---+----+---+---+

并行读取优化:

// 通过分区列实现并行读取,提升大数据量读取性能valdf=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","student").option("user","root").option("password","123456").option("partitionColumn","id")// 按id列分区.option("lowerBound","1")// 最小id.option("upperBound","1000")// 最大id.option("numPartitions","4")// 分4个分区并行读取.load()

2.3 向MySQL写入数据

通过df.write.mode().jdbc()将DataFrame数据写入数据库。

完整代码:

importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}importjava.util.PropertiesobjectWriteMySQL{defmain(args:Array[String]):Unit={valspark=SparkSession.builder().master("local[*]").appName("Write-MySQL").getOrCreate()// Step 1: 准备要写入的数据(从RDD创建)valrdd:RDD[Array[String]]=spark.sparkContext.parallelize(Array("3 Mike 22 男","4 Cindy 23 女")).map(_.split(" "))// Step 2: 定义Schema(表头)valschema:StructType=StructType(Array(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true),StructField("sex",StringType,true)))// Step 3: 创建Row RDD(记录)valrowRDD:RDD[Row]=rdd.map(stu=>Row(stu(0).toInt,stu(1),stu(2).toInt,stu(3)))// Step 4: 创建DataFramevaldf:DataFrame=spark.createDataFrame(rowRDD,schema)// Step 5: 配置JDBC连接参数valprop=newProperties()prop.put("user","root")prop.put("password","your_password")prop.put("driver","com.mysql.cj.jdbc.Driver")// Step 6: 写入数据(append模式追加)df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)// 验证写入结果valresult=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","spark.student").option("user","root").option("password","your_password").load()result.show()spark.stop()}}

写入模式说明:

模式说明适用场景
append追加数据到已有表增量写入
overwrite先删除表数据再写入全量覆盖
ignore表存在则忽略,不写入避免重复写入
errorIfExists表存在则报错(默认)防止误操作

写入前数据库表结构:

CREATETABLEspark.student(idINTPRIMARYKEY,nameVARCHAR(50),ageINT,sexVARCHAR(10));

写入后数据:

+---+-----+---+---+ | id| name|age|sex| +---+-----+---+---+ | 1| Tom| 21| 男| | 2| Andy| 20| 女| | 3| Mike| 22| 男| | 4|Cindy| 23| 女| +---+-----+---+---+

2.4 读写数据库完整流程图

读取流程: MySQL数据库 ↓ JDBC连接 (url, driver, user, password) ↓ spark.read.format("jdbc").option(...).load() ↓ DataFrame ↓ 数据处理/分析 写入流程: 原始数据 (RDD/集合/文件) ↓ 定义Schema + 创建Row RDD ↓ spark.createDataFrame(rowRDD, schema) ↓ DataFrame ↓ df.write.mode("append").jdbc(url, table, properties) ↓ MySQL数据库

三、常见问题排查

3.1 ClassNotFoundException: com.mysql.cj.jdbc.Driver

原因:缺少MySQL JDBC驱动依赖,或驱动类名错误。

解决

  1. 确认pom.xml中已添加mysql-connector-java依赖
  2. 确认MySQL 8.x使用com.mysql.cj.jdbc.Driver,5.x使用com.mysql.jdbc.Driver
  3. 提交集群任务时,使用--jars参数携带驱动jar包
spark-submit--jarsmysql-connector-java-8.0.31.jar your_app.jar

3.2 时区错误:The server time zone value ‘xxx’ is unrecognized

原因:MySQL 8.0默认时区与JDBC驱动不匹配。

解决:在URL中添加时区参数

.option("url","jdbc:mysql://localhost:3306/spark?serverTimezone=UTC")

3.3 写入时表不存在

原因:目标表未提前创建。

解决

  • 方式1:提前在MySQL中创建表
  • 方式2:使用df.write.mode("overwrite").option("createTableOptions", "...").jdbc(...)自动创建

3.4 数据类型不匹配

原因:DataFrame字段类型与数据库表字段类型不兼容。

解决

  • 检查Schema定义与数据库表结构是否一致
  • 注意Spark的IntegerType对应MySQL的INTLongType对应BIGINT
  • 字符串长度不足时,调整MySQL字段的VARCHAR长度

四、总结

本文系统讲解了RDD与DataFrame的转换以及Spark SQL的数据库操作:

核心知识点

  1. RDD转DataFrame两种方法

    • 反射推断模式:定义case class +import spark.implicits._+rdd.toDF()
    • 编程式定义模式StructType定义Schema +RDD[Row]创建记录 +createDataFrame()拼接
  2. Spark SQL读取MySQL

    • 导入mysql-connector-java依赖
    • 使用spark.read.format("jdbc").option(...).load()
    • 关键参数:url、driver、dbtable、user、password
  3. Spark SQL写入MySQL

    • 准备数据为DataFrame格式
    • 使用df.write.mode("append").jdbc(url, table, properties)
    • 支持append/overwrite/ignore/errorIfExists四种模式

方法选择指南

场景推荐方法原因
字段固定、类型明确反射推断模式代码简洁,自动推断
字段动态、Schema不确定编程式定义模式灵活可控,运行时安全
读取数据库全表format(“jdbc”)标准JDBC方式
大数据量读取JDBC + 分区参数并行读取,提升性能
增量写入数据库write.mode(“append”)不破坏已有数据
全量覆盖写入write.mode(“overwrite”)替换旧数据
http://www.cnnetsun.cn/news/2733444.html

相关文章:

  • WarcraftHelper终极教程:魔兽争霸3优化工具完全指南
  • 智能积分不是锦上添花,而是AI商业化的最后一块拼图(附Gartner认证架构图谱)
  • 快速构建轻量级Windows 11系统:Tiny11Builder系统镜像精简指南
  • CocosCreator ScrollView优化新思路:像原生App一样丝滑的长列表是如何炼成的?
  • 解密Windows平台RTMP流媒体服务器的3种高效部署方案
  • FPGA与Arduino并行通信:构建高性能硬件协同处理平台
  • 【AI工具与智能反馈整合实战指南】:20年架构师亲授5大落地陷阱与3步闭环优化法
  • 破除系统围墙!实测实在Agent智能体市场高频自动化场景模板
  • PUBG-Logitech压枪脚本终极指南:图像识别与鼠标宏的完美融合
  • Arduino蓝牙巡线坦克:从硬件搭建到App Inventor遥控开发全攻略
  • 从电路原理到PCB实战:硬件设计与调试全流程指南
  • ImageEN 8.3.0 全源码包(XE10.4 Win32实测可用),含扫描控制、DICOM处理与多格式编解码
  • 计算机组成原理 | 磁盘存储器
  • 有没有“一站式答辩解决方案”的PPT软件?要求:模板商务大气,附赠问答资料(答辩稿+答辩资料清单+答辩问答+问答应对策略)
  • 基于Arduino的简易雷达系统:从环境感知到智能避障的实践指南
  • 从零打造教学级Arduino WiFi开发板:硬件设计、焊接与物联网应用实战
  • 一次深度核查:那些被广泛引用的GEO品牌,居然不存在
  • 泸州福宝古镇人文溯源:从徐家坝聚落蜕变成川黔边贸重镇
  • 从零设计声光报警器:电路设计入门实战指南
  • 如何用Meep FDTD实现高效的光子器件仿真与优化
  • Windows 11终极瘦身指南:免费开源工具Win11Debloat让你的系统重获新生
  • DankDroneDownloader:分布式固件版本控制系统的架构设计与实现
  • 为什么92%的智能勋章项目失败?——资深CTO揭密AI工具选型的4个致命盲区
  • 构建脑肿瘤患者全周期支持体系:从信息导航到家庭康复的实践指南
  • 【AI举报系统实战指南】:2024年最权威的5大智能举报工具集成方案,错过再等一年
  • 华硕笔记本终极控制方案:G-Helper完整使用指南与性能优化教程
  • 深度探索ComfyUI:5个创意工作流构建指南与扩展生态解析
  • 字节AI Agent开发面试全解析:15道高频问题+深度答案
  • 3分钟掌握GitHub文件精准下载:告别克隆整个仓库的烦恼
  • 办公 Agent 与现有 OA 系统集成的实战方案