当前位置: 首页 > news >正文

古法CDC:AWS Aurora MySQL使用AWS DMS构建数据管道到数据湖(Apache Iceberg)

问题

需要再AWS的数据湖里面对数据进行CRUD,这里就需要使用Apache Iceberg来管理数据。

这了我们使用AWS DMS服务来构建CDC,将MySQL中的数据搬到s3,然后,使用Lambda来触发CDC更新数据到Athena(Apache Iceberg)中使用。

一图胜千言

修改MySQL参数

  • binlog_format:ROW
  • binlog_row_image:Full

这是MySQL审计日志最全的方式。


创建mysql用户

# CDC权限CREATEUSER'dms_user'@'%'IDENTIFIEDBY'dms_User349';GRANTREPLICATIONCLIENT,REPLICATIONSLAVEON*.*TO'dms_user'@'%';# 业务库和业务表grantselectonmydb.*todms_user;# 使用 MySQL 特定的迁移前评估grantselectonmysql.usertodms_user;grantselectonmysql.dbtodms_user;grantselectonmysql.tables_privtodms_user;grantselectonmysql.role_edgestodms_user;#only for MySQL version 8.0.11 and highergrantselectonperformance_schema.replication_connection_statustodms_user;#Required for primary instance validation - MySQL version 5.7 and higher only# RDS 运行 MySQL 特定的迁移前评估grantselectonmysql.rds_configurationtodms_user;#Required for binary log retention check# 如果参数BatchEnable是必需的true,则需要授予权限grantcreatetemporarytableson*.*todms_user;FLUSHPRIVILEGES;

Amazon Secrets Manager

我这里的mysql是使用了AWS Secrets Manager进行用户名和密码托管轮转的。这里就不重点介绍了,我们主要关注AWS DMS数据迁移任务怎么创建。

AWS DMS

创建源端点


或者,如下方式创建:

注意,这里连接mysql启用了必须使用SSL方式才能连接,需要手动将Aurora MySQL中的证书手动下载添加到DMS中。在如下页面中找到AWS Aurora MySQL中的证书,如下页面:

https://docs.amazonaws.cn/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.SSL.html#UsingWithRDS.SSL.RegionCertificates-BJS
运行如下命令,验证一下下载的SSL证书:

keytool-printcert-v-fileglobal-bundle.pem

我们在AWS DMS证书管理页面,导入上面MySQL的证书给DMS,如下图:


证书配置成功后,下面重新创建源端点:

创建目标端点

s3创建内网端点


这里的路由表,你如果不知道怎么设置,或者后续无法写数据到s3的话,你可以一股脑全部把路由表都够选上也可以。

Athena的vpc 端点创建

Athena vpc端点安全组创建


这里的athena vpc 端点的安全组中的出入站规则都是指定了dms的安全组。下面就可以开始创建athena的vpc内网端点了,如下图:

接下来创建s3目标端点,注意这里使用到的s3桶名,最好以aws-glue-开头,方便以后glue中任务读取。如下图:

参数配置如下:

{"DataFormat":"parquet","ParquetVersion":"PARQUET_2_0","CompressionType":"GZIP","IncludeOpForFullLoad":true,"GlueCatalogGeneration":true,"TimestampColumnName":"last_updated_ts","UseTaskStartTimeForFullLoadTimestamp":true,"CdcMaxBatchInterval":3600,"CdcMinFileSize":64000,"DatePartitionEnabled":true,"DatePartitionSequence":"YYYYMMDD","DatePartitionDelimiter":"SLASH","DatePartitionTimezone":"Asia/Shanghai"}

源和目标端点都创建之后,如下图:

这个目标S3端点会创建一个角色,这个角色缺失"s3:DeleteObject",权限,需要我们自己手动加一下这个权限。还得添加s3解密,glue和athena相关权限配置:

{"Version":"2012-10-17","Statement":[{"Sid":"AllObjectActions","Effect":"Allow","Action":["s3:PutObject","s3:GetObject","s3:DeleteObject","s3:GetObjectVersion"],"Resource":["arn:aws-cn:s3:::aws-glue-dev-bronze/*"],"Condition":{"StringEquals":{"aws:ResourceAccount":"111112234434"}}},{"Sid":"ListBucketActions","Effect":"Allow","Action":["s3:ListBucket"],"Resource":"*","Condition":{"StringEquals":{"aws:ResourceAccount":"111112234434"}}},{"Sid":"GetBucketActions","Effect":"Allow","Action":["s3:GetBucketLocation","s3:GetBucketVersioning"],"Resource":["arn:aws-cn:s3:::aws-glue-dev-bronze"],"Condition":{"StringEquals":{"aws:ResourceAccount":"111112234434"}}},{"Sid":"EnableBucketHTTPSOnly","Action":"s3:*","Effect":"Deny","Resource":["arn:aws-cn:s3:::aws-glue-dev-bronze/*","arn:aws-cn:s3:::aws-glue-dev-bronze"],"Condition":{"Bool":{"aws:SecureTransport":false}}},{"Sid":"AllowUseOfTheKey","Effect":"Allow","Action":["kms:Encrypt","kms:Decrypt","kms:ReEncrypt*","kms:GenerateDataKey*","kms:DescribeKey"],"Resource":"*"},{"Effect":"Allow","Action":["glue:CreateDatabase","glue:GetDatabase","glue:GetDatabases","glue:CreateTable","glue:DeleteTable","glue:UpdateTable","glue:GetTable","glue:GetTables","glue:BatchCreatePartition","glue:CreatePartition","glue:UpdatePartition","glue:GetPartition","glue:GetPartitions","glue:BatchGetPartition"],"Resource":["arn:aws-cn:glue:*:111112234434:catalog","arn:aws-cn:glue:*:111112234434:database/*","arn:aws-cn:glue:*:111112234434:table/*"]},{"Effect":"Allow","Action":["athena:StartQueryExecution","athena:GetQueryExecution","athena:CreateWorkGroup"],"Resource":"arn:aws-cn:athena:*:111112234434:workgroup/*"}]}

就可以开始CDC任务创建了。

开始CDC

创建安全组

创建一个安全组给CDC任务使用:

开始创建复制,如下图:

注意,这里的前缀,不要使用-字符,因为AWS DMS在Athena中创建数据库,使用连字符会失败,应该改成下划线_,需要注意一下。

创建完成,手动启动,如下图:

启动之前,先让AWS云先评估一下,如果没问题,再真正开始CDC任务,如下图:

看到如下AWS DMS任务,表示我们的CDC任务运行成功了:

到此CDC任务运行成功。到这里只是将mysql里面的binlog数据全部搬到数据湖里面的铜牌层,还需要进一步清理数据到银牌层。

银牌层

创建Silver S3桶

Iceberg数据库创建

CREATEDATABASEdev_silver_db;

直接在glue创建银牌层桶。

创建Iceberg表

根据需要处理的原始表结构,来写下面的DDL语句:

CREATETABLEmy_iceberg(last_updated_tsTIMESTAMP,idBIGINT,name STRING)LOCATION's3://aws-glue-dev-silver/dbname/my_iceberg/'TBLPROPERTIES('table_type'='ICEBERG','format'='parquet')

AWS Glue合并任务

AWSGlueServiceRoleDev角色创建


这个角色必须以AWSGlueServiceRole开头,不然AWS Glue任务无法选择这个角色。创建Glue任务如下:

其中配置参数为:

--datalake-formats iceberg--confspark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions--confspark.sql.catalog.glue_catalog.warehouse=s3://aws-glue-dev-silver/dbname/my_iceberg/--confspark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog--confspark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog--confspark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO--confspark.sql.catalog.glue_catalog.glue.lakeformation-enabled=true--confspark.sql.iceberg.handle-timestamp-without-timezone=true

任务源代码如下:

importsysfromawsglue.transformsimport*fromawsglue.utilsimportgetResolvedOptionsfrompyspark.contextimportSparkContextfromawsglue.contextimportGlueContextfromawsglue.jobimportJobfrompyspark.sql.typesimportTimestampType## @params: [JOB_NAME]args=getResolvedOptions(sys.argv,['JOB_NAME'])sc=SparkContext()glueContext=GlueContext(sc)spark=glueContext.spark_session job=Job(glueContext)job.init(args['JOB_NAME'],args)# ============================================# 第一步:从 Bronze 表读取增量数据# ============================================# 注意:transformation_ctx 对 Glue 书签机制至关重要,请保持唯一且不变[reference:0]myIcebergIncrementalInputDF=glueContext.create_dynamic_frame.from_catalog(database="dev_bronze",# 修改为你的 Glue 数据库名table_name="my_iceberg",# 修改为你的 Bronze 表名transformation_ctx="myIcebergIncrementalInputDF").toDF()# ============================================# 第二步:去重 - 按 id 分组,保留 last_updated_ts 最新的记录[reference:1]# ============================================windowSpec=Window.partitionBy(myIcebergIncrementalInputDF.id).orderBy(col("last_updated_ts").desc())rankedDF=myIcebergIncrementalInputDF.withColumn("row_num",row_number().over(windowSpec))deduplicatedDF=rankedDF.filter(col("row_num")==1).drop("row_num")# ============================================# 第三步:清洗 - 数据类型转换、字段拆分等[reference:2]# ============================================deduplicatedDF=deduplicatedDF.withColumn("last_updated_ts",col("last_updated_ts").cast(TimestampType()))# 注册为临时视图,供 MERGE 语句使用[reference:3]deduplicatedDF.createOrReplaceTempView("deduplicated_view")# ============================================# 第四步:执行 MERGE 到 Silver 层 Iceberg 表[reference:4]# ============================================merge_sql=""" MERGE INTO glue_catalog.dev_silver_db.my_iceberg AS target USING deduplicated_view AS source ON target.id = source.id WHEN MATCHED AND source.op = 'U' THEN UPDATE SET target.last_updated_ts = source.last_updated_ts, target.name = source.name WHEN MATCHED AND source.op = 'D' THEN DELETE WHEN NOT MATCHED THEN INSERT ( last_updated_ts, id, name ) VALUES ( source.last_updated_ts, source.id, source.name ) """spark.sql(merge_sql)job.commit()

lambda触发函数

创建一个AWS Lambda函数,源代码如下:

importjsonimportboto3deflambda_handler(event,context):glue_client=boto3.client('glue')glue_job_name='dev-b2s-xxxx'try:# Start the Glue jobresponse=glue_client.start_job_run(JobName=glue_job_name)print(f"Glue job started:{response['JobRunId']}")exceptExceptionase:print(f"Error starting Glue job:{str(e)}")raiseereturn{'statusCode':200,'body':json.dumps('Hello from Lambda!')}

注意,这里需要给aws lambda的角色添加启动glue任务的权限,类似如下:

{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":["glue:StartJobRun"],"Resource":"arn:aws-cn:glue:cn-north-1:111112234434:job/*"}]}

然后,我们回到bronze层的s3桶,创建一个当文件创建的事件来激活调用这个lambda函数,如下图:

总结

没有搞过大数据的朋友,需要注意一下如下两个概念:

  • Iceberg是表格式
  • Parquet是文件格式

这就是AWS里面的将mysql里面数据搬到数据湖的方式。AWS DMS+AWS Lambda+AWS Glue。这里主要就是借助iceberg表来进行增量更新。

参考

  • How to Merge your Relational Database into a Data Lake on AWS
  • Real-Time CDC Pipeline: MySQL to Apache Iceberg Using AWS DMS Serverless, S3, and Lambda.
  • Step 5: Configure an Amazon DMS Target Endpoint
  • Using Amazon S3 as a target for AWS Database Migration Service

  • How to MERGE your Database into a Data Lake on AWS | Change Data Capture | Apache Iceberg
  • Apache Iceberg vs Parquet – File Formats vs Table Formats
  • Change data capture
  • 使用与 MySQL 兼容的数据库作为源 Amazon DMS
  • CREATE TABLE AS
  • How to MERGE your Database into a Data Lake on AWS | Change Data Capture | Apache Iceberg
  • AWS Aurora MySQL证书地址
  • 为 MySQL-compatible、PostgreSQL 或 SQL Server 端点启用 SSL
  • How can I use AWS DMS to migrate data to Amazon S3 in Parquet format?
  • Athena JDBC 3.x 驱动程序
  • Create an AWS Glue Data Catalog with AWS DMS
  • Configuring VPC endpoints for AWS DMS
  • Migrating MSSQL to AWS using Data Migration Service with CDC Migration
  • 在 Amazon Glue 中使用 Iceberg 框架
http://www.cnnetsun.cn/news/3128049.html

相关文章:

  • 一个装X的架构师,通过建文件夹就能亮瞎你的狗眼... ——传说中的弦哥
  • 【Java从入门到入土】45:性能调优实战:从理论到实践
  • 线性密码分析实战:从S盒线性逼近表到SPN网络密钥恢复
  • 卷积的学习
  • 1、JavaScript入门和语法类型
  • 从GitHub Copilot到企业级审查中枢:构建可审计、可回溯、可问责的AI审查流水线
  • 工业级-40°C~125°C+10µA静态电流:SN74LVC1G07DBVR的低功耗宽温逻辑器件
  • 2026免费图片去水印工具推荐在线无水印无需下载
  • 性能测试,监控CPU和内存脚本
  • 数字印花机研发调研报告
  • AI加速卡与工控机集成优化 2026 软硬件协同实操指南
  • Enhanced Motion Forecasting with Plug-and-Play Multimodal Large Language Models
  • 多Agent协作:辩论、投票与分工——AI模型中的协同新范式
  • 数据中台建设方案
  • 佛山个人开发者为私人诊所搭建官网
  • 软件测试入门——第二十一课(接口测试入门)
  • 智能汽车SoC架构与开发实战解析
  • 意识、计算与DMTx:NKS理论为意识现实主义留下的缝隙及其实证延伸
  • 【JavaScript 标签(Label)完全指南:语法、使用场景、作用与意义|告别多层循环跳转难题(面试必刷)】
  • AI辅助商业模型画布验证:用数据驱动的聚类、A/B测试与敏感性分析,让创业假设先跑一跑数
  • Cursor实战案例-图形图像-49-高精度印章提取:利用OpenCV实现研报图片中红色公章的抠图、校正与增强
  • 运筹说 第156期 | 大模型基础篇之大模型概述(1):当“大“成为一种革命
  • 同步磁阻电机滑模控制技术解析与应用
  • 计算机毕业设计之jsp课程评价系统
  • 第2章 数据集及分类模型介绍
  • DeepSeek-V4-Pro如何实现国产AI的‘够用性’平权
  • E-Hentai下载器解决方案:完全绕过GP限制的实用技巧
  • E-Hentai下载器终极指南:如何高效批量下载画廊而不消耗GP积分
  • 可白嫖源码---课程设计--毕业设计--flask酒类购物系统[编号:project30576](案例分析)-附源码
  • Java基础快速入门:枚举与注解