当前位置: 首页 > news >正文

flink的streaming api 统计文本中的字段个数

1.flink 的streaming api初步学习

有界数据流处理,文件数据处理。

package com.ycl; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountStreamDemo { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.读取数据 DataStreamSource<String> lineDS = env.readTextFile("input/word.txt"); //3.处理数据 切分,转换,分组,聚合。 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //按照 空格切分 String[] words = value.split(" "); for (String word : words) { //转换成 二元组 (word,1) Tuple2<String, Integer> wordAndOne = Tuple2.of(word, 1); //通过采集器向下游发送数据 out.collect(wordAndOne); } } }); //3.2 分组 KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); //3.3聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1); //4.输出数据 sumDS.print(); //5.执行:类似 sparkstreaming 最后,ssc.start(); env.execute(); } } /** 接口 A,里面有一个方法a(); 正常写法,定义个 class B,实现接口A,方法a() B b = new B(); 匿名实现类: new A(){ a(){} } */

输出结果如下;

前面的编号是,并行度线程编号。

在 DataSet API 里面分组使用的groupBy ; 在streaming里面使用的分组函数是: keyBy;
执行环境: DataSet 是:ExecutionEnvironment, Streaming 是: StreamExecutionEnvironment
调用: DataStream里面 env.execute();是必须调用的。DataSet不用去调用。

http://www.cnnetsun.cn/news/3010183.html

相关文章:

  • HS2-HF Patch:3步完成HoneySelect2游戏终极增强
  • 如何看待anthropic指控阿里 qwen 蒸馏 Claude ?
  • Transformer工程化学习路线图:从手写代码到生产落地
  • 评测:Codex、Manus、Claude Code、OpenClaw 谁才是最强的 Agent
  • PX4神经网络控制:为电力巡检无人机赋能自主线路识别与跟踪的端到端解决方案
  • 火山引擎多模态数据湖的制作思路
  • 纳米堆栈是什么?IBM如何像建城市一样造芯片
  • 慢半拍的 Flink TaskManager——问题不在代码中
  • AI转行不晚:从问题闭环到能力锚点的实战路径
  • 电商评论情感分析驱动的内容推荐系统实战
  • 【从零开始学架构:业务思考】像架构师一样思考:从业务价值出发
  • 海尔智家回报股东:回购是去年5倍,注销是去年10倍
  • 2轴舵机控制板
  • 第6篇:《串口长线乱码排查:TTL电平传5米,信号反射振铃全波形分析》
  • 偏相关系数的计算
  • 软件部署中的持续交付流水线建设
  • 【Java踩坑笔记】【基础语法篇】05_重写equals不重写hashCode会怎样?
  • windows安装Claude
  • Vue 2 vs Vue 3:核心特性与差异全解析
  • UE5.6 GAS学习笔记(2)-->GA篇 [2.分析GA类基本内容]
  • .NET开发者集成YOLO目标检测:yolodotnet实战指南
  • 2026实测|个人免费AI编程工具全对比,vibe coding副业开发者必看
  • 铁电MEMS突触技术:神经形态计算新突破
  • 国企央企官网的工程化设计:多专题内容管理、安全合规与无障碍实现
  • 当智能体真正走进办公室,它的成绩单好看吗?
  • 高阶03:国产EAP vs 进口Applied EAP全维度对比与迁移改造
  • Hermes 上手指南:真实开发里的落地路径
  • Plotly实现印度数字体系(Lac/Crore)数据可视化
  • Agent可,使由之;不可,使知之。
  • Keras Functional API:构建多输入多输出复杂模型的工程实践