SpringBoot集成flink
2025-06-24 11:42:52
来源:新华网
Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。
环境搭建:
①、安装flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
②、安装Netcat
Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。
用于测试网络中的端口,发送文件等操作。
进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作
yum install-ync# 安装nc命令nc-lk8888# 启动socket端口
无界流之读取socket文本流
一、依赖
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demoartifactId><groupId>com.etgroupId><version>1.0-SNAPSHOTversion>parent><modelVersion>4.0.0modelVersion><artifactId>flinkartifactId><properties><maven.compiler.source>8maven.compiler.source><maven.compiler.target>8maven.compiler.target>properties><dependencies><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-webartifactId>dependency><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-autoconfigureartifactId>dependency><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-testartifactId><scope>testscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-javaartifactId><version>1.17.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-javaartifactId><version>1.17.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clientsartifactId><version>1.17.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-baseartifactId><version>1.17.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-filesartifactId><version>1.17.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafkaartifactId><version>1.17.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-runtime-webartifactId><version>1.17.0version>dependency>dependencies><build><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-shade-pluginartifactId><executions><execution><phase>packagephase><goals><goal>shadegoal>goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlersresource>transformer><transformerimplementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"><resource>META-INF/spring.factoriesresource>transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemasresource>transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.et.flink.job.SocketJobmainClass>transformer>transformers>configuration>execution>executions>plugin>plugins>build>project>
二、SoketJob
publicclassSocketJob{ publicstaticvoidmain(String[]args)throwsException{ // 创建执行环境StreamExecutionEnvironmentenv =StreamExecutionEnvironment.getExecutionEnvironment();// 指定并行度,默认电脑线程数env.setParallelism(3);// 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务DataStreamSource<String>socketDS =env.socketTextStream("172.24.4.193",8888);// 处理数据: 切换、转换、分组、聚合 得到统计结果SingleOutputStreamOperator<Tuple2<String,Integer>>sum =socketDS .flatMap((Stringvalue,Collector<Tuple2<String,Integer>>out)->{ String[]words =value.split(" ");for(Stringword :words){ out.collect(Tuple2.of(word,1));}}).setParallelism(2)// // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2。只有显式设置系统当前返回类型,才能正确解析出完整数据 .returns(newTypeHint<Tuple2<String,Integer>>(){ })// .returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(value ->value.f0).sum(1);// 输出sum.print();// 执行env.execute();}}
测试:
启动socket流:
nc-l8888
本地执行:直接ideal启动main程序,在socket流中输入
abc bcd cdebcd cde fghcde fgh hij
集群执行:
执行maven打包,将打包的jar上传到集群中