您的位置 首页 > 德语词汇

source是什么意思?用法、例句(第4章 Flink 基础API 二:原算子(source))

大家好,关于source是什么意思?用法、例句很多朋友都还不太明白,不过没关系,因为今天小编就来为大家分享关于第4章 Flink 基础API 二:原算子(source)的知识点,相信应该可以解决大家的一些困惑和问题,如果碰巧可以解决您的问题,还望关注下本站哦,希望对各位有所帮助!

source是什么意思?用法、例句(第4章 Flink 基础API 二:原算子(source))

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(datasource),而读取数据的算子就是源算子(sourceoperator)。所以source就是我们整个处理程序的输入端。

根据数据的来源,source可以分为5类:

并行度的角度,source又可以分为非并行的source和并行的source。

并行度只能为1,即只有一个运行时实例,在读取大量数据时效率比较低,通常是用来做一些实验或测试,例如SocketSource;

并行度可以是1到多个,在计算资源足够的前提下,并行度越大,效率越高。例如KafkaSource;

在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:

DataStreamstream=env.addSource(...);

方法传入的参数是一个“源函数”(sourcefunction),需要实现SourceFunction接口。

从Flink1.12开始,主要使用流批统一的新Source架构:

DataStreamSourcestream=env.fromSource(…)

Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

可将一个普通的Java集合、迭代器或者可变参数转换成一个分布式数据流DataStream。基于集合的source一般用于学习测试时编造数据,主要有下面几个API:

非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource。

非并行的Source,可以将一个Collection作为参数传入到该方法中,返回一个DataStreamSource.

并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)通过指定的起始值和结束值来生成数据序列流;

并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)通过指定的起始值和结束值来生成数据序列流;

packageorg.mochi.datastream;\n\nimportorg.apache.flink.streaming.api.datastream.DataStreamSource;\nimportorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;\n\nimportjava.util.Arrays;\n\npublicclassSourceCollection{\npublicstaticvoidmain(String[]args)throwsException{\n\n//1、创建流式处理环境\nStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();\n\n//2、创建基于集合的source\n//env.fromElements(可变参数);\nDataStreamSource<String>ds1=env.fromElements("spark","flink");\n\n//env.fromCollection(各种集合);\n//\nDataStreamSource<String>ds2=env.fromCollection(Arrays.asList("spark","flink"));\n\n//env.generateSequence(开始,结束);@deprecated\nDataStreamSource<Long>ds3=env.generateSequence(1,3);\n\n//env.fromSequence(开始,结束)\nDataStreamSource<Long>ds4=env.fromSequence(5,7).setParallelism(2);\n\n\n//3、transformation\n\n//4、sink\nds1.print();\nds2.print();\nds3.print();\nds4.print();\n\n//5、execute\nenv.execute();\n\n}\n}

IDE控制台结果如下:

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

Flink的的一些读取文件的API已经过时,我们看readFile()底层调用addSource(...)。Flink在Flink1.12从零实现了新的基础框架,在Flink1.13中kafka、hive和filesource已移植到新架构。

新的读取文件方式需要添加文件连接器依赖

<dependency>\n<groupId>org.apache.flink</groupId>\n<artifactId>flink-connector-files</artifactId>\n<version>${flink.version}</version>\n</dependency>

具体代码

packageorg.mochi.datastream;\n\nimportorg.apache.flink.api.common.eventtime.WatermarkStrategy;\nimportorg.apache.flink.connector.file.src.FileSource;\nimportorg.apache.flink.connector.file.src.reader.TextLineInputFormat;\nimportorg.apache.flink.core.fs.Path;\nimportorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;\n\npublicclassSourceFile{\npublicstaticvoidmain(String[]args)throwsException{\n\n//1、创建流式处理环境\nStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();\n\n////过时的方法读取文件\n//DataStreamSource<String>ds=env.readTextFile("input/words.txt");\n//ds.print();\n\nFileSource<String>fileSource=\nFileSource.forRecordStreamFormat(newTextLineInputFormat(),new\nPath("input/words.txt")).build();\n\nenv.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file")\n.print();\n\nenv.execute();\n\n}\n}

path可以有多种方式

我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStreamstream=env.socketTextStream("127.0.0.1",9999);4.2.4、基于kafka的source

Flink官方提供了连接工具第三方的connector,我们以生产环境中常用的flink-connector-kafka举例,它直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。需要导入的依赖如下:

<dependency>\n<groupId>org.apache.flink</groupId>\n<artifactId>flink-connector-kafka</artifactId>\n<version>${flink.version}</version>\n</dependency>

具体代码

packageorg.mochi.source;\n\nimportorg.apache.flink.api.common.eventtime.WatermarkStrategy;\nimportorg.apache.flink.api.common.serialization.SimpleStringSchema;\nimportorg.apache.flink.connector.kafka.source.KafkaSource;\nimportorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;\nimportorg.apache.flink.streaming.api.datastream.DataStreamSource;\nimportorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;\n\npublicclassSourceKafka{\npublicstaticvoidmain(String[]args)throwsException{\n\nStreamExecutionEnvironmentenv=\nStreamExecutionEnvironment.getExecutionEnvironment();\n\nKafkaSource<String>kafkaSource=\nKafkaSource.<String>builder()\n.setBootstrapServers("localhost:9092")\n.setTopics("topic_1")\n.setGroupId("mochi")\n.setStartingOffsets(OffsetsInitializer.latest())\n.setValueOnlyDeserializer(newSimpleStringSchema())\n.build();\n\nDataStreamSource<String>stream=env.fromSource(kafkaSource,\nWatermarkStrategy.noWatermarks(),"kafka-source");\n\nstream.print("Kafka");\n\nenv.execute();\n}\n}

这是新的API,flink会把kafka消费者的消费位移记录在算子状态中,这样就实现了消费位移状态的容错,从而可以支持端到端的exactly-once;

自定义的source内容较多,后面单独写一篇“自定义source-Mysql”来说明怎么自己定义connector并在项目中使用。

关于source是什么意思?用法、例句到此分享完毕,希望能帮助到您。

本站涵盖的内容、图片、视频等数据,部分未能与原作者取得联系。若涉及版权问题,请及时通知我们并提供相关证明材料,我们将及时予以删除!谢谢大家的理解与支持!

Copyright © 2023