1.4FlinkHDFSConnector/FlinkHDFS连接器-创新互联
在上一章节已经翻译了Flink Kafka Connector,但由于HDFS作为大多数研究大数据者日常用到的,此章节并添加翻译HDFS的连接器。
创新互联是专业的宜黄网站建设公司,宜黄接单;提供成都网站设计、成都做网站,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行宜黄网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!此连接器提供了一个Sink,将分区文件写入Hadoop FileSystem支持的任何文件系统。要使用此连接器,请将以下依赖项添加到您的项目中:
请注意,流连接器当前不是二进制分发的一部分。有关如何将程序与程序库打包以进行集群执行的信息,请参阅此处。
折叠文件接收器(Bucketing File Sink)
可以配置压力行为以及写入操作,但我们稍后将会介绍。这是你如何创建一个耐心的病人,默认情况下,它会收敛到按时间分割的滚动文件:
DataStream
input.addSink(new BucketingSink
唯一必需的参数是存储桶的基本路径。可以通过指定自定义bucketer,写入器和批量大小来进一步配置接收器。
默认情况下,当元素到达时,当前的系统时间将会降级,并使用日期时间模式“yyyy-MM-dd-HH”命名这些存储区。此模式将传递给具有当前系统时间的SimpleDateFormat以形成存储桶路径。每当遇到新的日期时,都会创建一个新的桶。例如,如果您有一个包含分钟作为最细粒度的模式,您将每分钟获得一个新的桶。每个桶本身是一个包含几个零件文件的目录:每个并行实例的接收器将创建自己的零件文件,当零件文件变得太大时,槽也将在其他文件旁边创建一个新的零件文件。当桶变得不活动时,打开的零件文件将被刷新并关闭。当最近没有写入时,桶被视为不活动。默认情况下,接收器每分钟检查不活动的桶,并关闭一分钟内未写入的任何桶。可以在BucketingSink上使用setInactiveBucketCheckInterval()和setInactiveBucketThreshold()配置此行为。
您也可以使用BucketingSink上的setBucketer()指定自定义bucketer。如果需要,bucketer可以使用元素或元组的属性来确定bucket目录。
默认的作者是StringWriter。这将调用toString()对传入的元素,并将它们写入部分文件,用换行符分隔。要在BucketingSink上指定一个自定义的作者,请使用setWriter()。如果要编写Hadoop SequenceFiles,可以使用提供的SequenceFileWriter,它也可以配置为使用压缩。
最后一个配置选项是批量大小。这指定何时应该关闭零件文件并启动一个新的零件。 (默认部分文件大小为384 MB)。
例:
DataStream
sinketingSink
sink.setBucketer(new DateTimeBucketer
sink.setWriter(new SequenceFileWriter
sink.setBatchSize(1024 * 1024 * 400); //这是400 MB,
input.addSink(sink);
这将创建一个写入到遵循此模式的桶文件的接收器:
/ base / path / {date-time} / part- {parallel-task} - {count}
其中date-time是从日期/时间格式获取的字符串,parallel-task是并行接收器实例的索引,count是由于批量大小而创建的部分文件的运行数。
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
分享文章:1.4FlinkHDFSConnector/FlinkHDFS连接器-创新互联
本文网址:http://pwwzsj.com/article/hhehp.html