收藏 分销(赏)

阿里云 JindoFS+OSS 数据上云实战.pdf

上传人:Stan****Shan 文档编号:1240878 上传时间:2024-04-19 格式:PDF 页数:152 大小:10.85MB
下载 相关 举报
阿里云 JindoFS+OSS 数据上云实战.pdf_第1页
第1页 / 共152页
阿里云 JindoFS+OSS 数据上云实战.pdf_第2页
第2页 / 共152页
阿里云 JindoFS+OSS 数据上云实战.pdf_第3页
第3页 / 共152页
阿里云 JindoFS+OSS 数据上云实战.pdf_第4页
第4页 / 共152页
阿里云 JindoFS+OSS 数据上云实战.pdf_第5页
第5页 / 共152页
点击查看更多>>
资源描述

1、卷首语JindoFS 作为阿里云基于 OSS 的一揽子数据湖存储优化方案,完全兼容 Hadoop/Spark 生态,并针对 Spark、Hive、Flink、Presto 等大数据组件和 AI 生态实现了大量扩展和优化。JindoFS 项目包括 JindoFS OSS 支持、JindoFS 分布式缓存系统(JindoFS Cache 模式)和 JindoFS 分布式存储优化系统(JindoFS Block 模式)。JindoSDK 是各个计算组件可以用来使用 JindoFS 这些优化扩展功能和模式的套件,包括Hadoop Java SDK、Python SDK 和 Fuse/POSIX 支持。

2、JindoSDK 在阿里云 E-MapReduce产品中被深度集成,同时也开放给非 EMR 产品用户在各种 Hadoop/Spark 环境上使用。目录数据迁移5高效迁移 HDFS 海量文件到 OSS5数据无忧:利用 Checksum 迁移 HDFS 数据到 OSS13如何将 HDFS 海量文件归档到 OSS21如何将 Hive 数据按分区归档到 OSS27OSS 访问加速34访问 OSS 这类对象存储最快的方式:JindoFS SDK34Hadoop/Spark 访问 OSS 加速42Flink 高效 sink 写入 OSS49Flume 高效写入 OSS56Presto 如何高效查询 OSS

3、 数据68Impala 如何高效查询 OSS 数据71打开 OSS 多版本:合规和分析两不误76JindoFS 缓存加速81Spark 访问 OSS 透明缓存加速81Presto 访问 OSS 透明缓存加速87指定表和分区来预先缓存,查询分析更高效93云上计算云下数据:HDFS 缓存加速103AI 训练加速112Fluid+JindoFS 对 OSS 上的数据进行训练加速112Fluid+JindoFS 对 HDFS 上的数据进行训练加速121Fluid+JindoFS 对海量小文件的训练加速127JindoTable 计算加速133Spark 对 OSS 上的 Parquet 数据进行查询加

4、速133Spark 对 OSS 上的 ORC 数据进行查询加速138分层更高效,对 Hive 数仓进行热度/冷度统计143对 Hive 数仓表进行高效小文件合并1505数据迁移数据迁移高效迁移 HDFS 海量文件到 OSS作者|阿里云计算平台事业部开源大数据数据湖存储-技术专家 扬礼内容概要:一、DistCp 介绍二、Jindo DistCp 介绍三、性能优化四、实操演示视频链接:https:/ 介绍数据迁移数据迁移目前,从 HDFS 拷贝海量文件到 OSS 上存在着诸多问题。首先是文件数量规模比较大,比如百万/千万级的时候,使用开源的 Hadoop DistCp 会出现超时或 OOM 的情况

5、,这是因为拷贝的文件数量大,花费时间长,而且 OSS 带宽一定的情况下容易出现单个 Task 超时或失败的情况,十分不友好。而且从 HDFS 拷贝到 OSS 上,现有的工具实现都比较耗时,主要是因为 Map/Reduce Task结束会进行 Rename 流程,Rename 操作对于 OSS 的这种对象存储系统非常不友好。其次,现有开源工具无法保证数据拷贝的一致性,比如不能保证 Task 写的所有文件同时对外可见或同时不可见。最后,OSS 的很多功能特性比如写入时进行冷归档,在社区版本都是不支持的。三、性能优化那么 Jindo DistCp 如何优化和解决这些问题?数据迁移数据迁移在 Comm

6、itJob 之前,所有 MapReduce 写的数据都在临时目录中,Job 最终目录不会读到临时数据,在 Job 执行的过程中失败了,只需清理临时目录中的文件即可。V1 在 Job 执行的过程中,每一个产生的文件需要进行两次 ReName 操作,第一次是 CommitT-ask,在 Task 端执行,多个节点中执行的 Task 可以并发进行 Rename。第二次是 CommitJo-b,在 Driver 端执行,是个单点操作。此过程中由于需要将 Job 临时目录中的文件移动到最终目录,会有一个时间窗口,在这个时间窗口中失败会导致部分数据对外可见,时间窗口会随着文件数量的增加而增加。对于 HDF

7、S 这种分布式的文件系统来说,Rename 是一个非常高效的操作,只涉及到 Nameno-de 上相关元数据的修改,所以这个时间窗口非常小,可以满足绝大部分场景的需求。但 S3OSS 的这种公有云上的对象存储并不直接支持 Rename 操作,文件系统级别的 Rename 一般会转化成 copy+delete 的操作,这个代价相对于 HDFS 会大大增加。CommitJob 是在 MapReduce 的 Driver 端执行的,虽然有实现线程级别的并发优化,但是在写入 OSS 的场景中,CommitJob 的时间窗口非常长,文件数量较大时可能达到分钟甚至小时级别,这对于 Job 的性能会产生严重

8、的影响。为了解决写 S3 和 OSS 这种对象存储系统的性能问题,Hadoop 社区引入了 FileOutputCommi-tter V2 版本。V2 版本最大的区别是去掉了 Task 的输出目录,在 CommitTask 的时候将文件直接 Rename 到 Job 最终目录,整个 JobCommit 过程对所有的文件只需进行一次 Rename 操作,而且是在集群节点的所有 Task 上并发执行,消除了 Driver 单点 Rename 的瓶颈,所以数据迁移数据迁移在 Task 的执行过程中,由于通过 MultiPart Upload 相关接口初始化 Upload 和上传分块数据,但是直到 C

9、ommitjob 的时候才调用 CompleteMultiPartUpload 完成传输。根据 MultiPartUpload 的特性,在调用 Complete 之前数据是不可见的,从而保证了数据的一致性。与 FileUploadCommitter 类似,由于有多个文件需要 Complete 的操作,在 CommitJob 的时候也可能会有数据不一致的时间窗口。文件的上传过程都已经在 Task 的分布式操作中完成了,再加上 JobCommitter 是一个非常轻量级的请求,所以这个时间窗口非常短,失败的可能性也比较低,可以满足绝大多数业务场景的需求。相比于 V1,CompleteMultiPa

10、rtUpload 在 Rename 上的代价会小很多,导致数据不一致的窗口也会少很多。相比于 V2 无法保证数据的一致性,Jindo CopyCommitter 更适用对数据一致性有要求的场景。性能方面,这种方式在 Task 过程中能并发写数据到 OSS 并且不需要 Rename 操作,对比 V1和 V2 分别需要两次和一次的 Rename 操作,性能上也有大幅度的提升。我们做了一个 Jindo DistCp 和 Hadoop DistCp 的性能对比。以 HDFS 到 OSS 离线数据迁移为主要场景,利用 Hadoop 自带的测试数据集 TestDFSIO 分别生成 1000 个 10M、1

11、000 个 500M和 1000 个 1G 大小的文件,进行从 HDFS 拷贝数据到 OSS 上的测试过程。测试结果显示,Jindo DistCp 最高可以达到 Hadoop DistCp 1.59 倍的加速效果,这个性能是非常可观的。数据迁移数据迁移数据无忧:利用 Checksum 迁移 HDFS 数据到OSS作者|阿里云计算平台事业部开源大数据数据湖存储-技术专家 焱冰内容概要:一、Jindo DistCp 简述二、Checksum 技术科普三、DistCp 技术解密四、Jindo DistCp 实操演示视频链接:https:/ DistCp 简述数据迁移数据迁移循环冗余校验又称 CRC(

12、Cyclic redundancy check),将待发送的比特串看做是系数为 0 或者 1 的多项式。CRC 编码时,发送方和接收方必须预先商定一个生成多项式 G(x)。发送方将比特串和生成多项式 G(x)进行运算得到校验码,在比特串尾附加校验码,使得带校验码的比特串的多项式能被 G(x)整除。接收方接收到后,除以 G(x),若有余数,则传输有错。CRC 算法是数据校验算法中最常用的。它的优点是算法实现相对简单、运算速度较快。而且错误检错能力很强,因此被广泛应用于通信数据校验。缺点是输出长度较短,容易被伪造,碰撞率高。数据迁移数据迁移在上图的文件系统中,节点 A 的 Block size 是

13、 n,节点 B 的 Block size 是 q,它们两个之间的Checksum 可能就不一样,导致没办法比对。云存储上又是另外一种 Checksum 的计算函数,更加无法比对。数据迁移数据迁移首先 Jindo DistCp 支持多种 Checksum 算法,默认是 CRC32,也支持 CRC64,之后还会扩展支持其他算法。在文件迁移拷贝的时候默认开启 Checksum 比对,特别是 Hadoop 到 OSS 的场景,能够兼容 Hadoop 默认的 CRC32 以及 OSS 默认的 CRC64 算法。如果要禁用 Checksum,可以通过使用 disableChecksum 参数。这里所有的差

14、异比较、增量拷贝的参数使用与 Hadoop DistCp 的使用方式都是相同的。差异比较首先比较文件是否存在,然后比较文件大小,最后比较 Checksum,并且在过程中不会重复计算,因为在拷贝的时候就已经记录下 Checksum。四、Jindo DistCp 实操演示接下来通过实操演示 Jindo DistCp 是如何在数据拷贝和数据迁移中保证数据的完整性和一致性。下载wget https:/smartdata-binary.oss-cn- jar jindo-distcp-3.5.0.jar-src/user/root/random-data-dest oss:/yanbin-hd2-tes

15、t/distcp/-parallelism 10-src:hdfs 的源路径-dest:oss 的目标路径-ossKey:oss 的 AccessKey-ossSecret:oss 的 AccessSecret-ossEndPoint:oss 的 endpoint 信息,可以公网或者内网的 endpoint-parallelism:任务并发大小,根据集群资源可调整Jindo DistCp 高阶使用实战禁用 Checksumhadoop jar jindo-distcp-3.5.0.jar-src/user/root/random-data-dest oss:/yanbin-hd2-test/d

16、istcp/-parallelism 10-disableChecksum增量拷贝hadoop jar jindo-distcp-3.5.0.jar-src/user/root/random-data-dest oss:/yanbin-hd2-test/distcp/-parallelism 10-update差异比较时,启用 CMS 告警export cmsAccessKeyId=export cmsAccessSecret=export cmsRegion=cn-shanghaiexport cmsToken=export cmsLevel=WARNhadoop jar jindo-dis

17、tcp-3.5.0.jar-src/user/root/random-data-dest oss:/yanbin-hd2-test/distcp/-diff-enableCMS点击视频链接 20 分 00 秒开始,观看实操演示。https:/ HDFS 海量文件归档到 OSS作者|阿里云计算平台事业部开源大数据数据湖存储-技术专家 辰石内容概要:一、背景二、功能介绍三、实操演示视频链接:https:/ IDC 机房里构建大数据集群。数据存储就是一个 HDFS 集群,用户把业务数据存储在 HDFS 中,这种架构下 HDFS 的集群容量依赖于本地磁盘。公有云存算一体架构的出现解决了大数据集成运维的

18、难题。公有云的厂商将线下 IDC 机房搬迁到公有云上,把物理主机换成公有云上的 ECS,数据还是存储在 HDFS 集群上。这种架构下,HDFS 集群的容量依赖于 ECS 的个数以及 ECS 上的云盘容量。数据迁移数据迁移上图是 HDFS 和阿里云对象存储 OSS 的成本的对比,OSS 分为标准、低频、归档和冷归档这四种存储类型。OSS 标准平均 0.12 元/GB/每月、OSS 低频 0.08 元/GB/每月、OSS 归档 0.033 元/GB/每月、OSS 冷归档 0.015 元/GB/每月。对于 OSS 归档类型,其存储成本只有 HDFS 的 1/5,冷归档不及 HDFS 成本的 1/10

19、。由此可以看出 OSS 数据存储的成本要远远低于 HDFS。大数据迁移到 OSS 的归档存储,可以减少数据的存储成本,同时也可以腾出更多的 HDFS 的容量存储热数据。如何将 HDFS 数据归档到 OSS 之上呢?主要是借助于阿里云提供的 Jindo DistCp 工具,Jindo DistCp 工具是类似于 Hadoop DistCp 的工具,但有自身的优化以及支持:一是可以全量支持 HDFS/OSS/S3 之间的的数据拷贝场景,只要兼容 S3 的协议就可以支持;二是重点优化了 HDFS/OSS 数据拷贝场景,可以避免 Rename 等复杂操作,拷贝数据的速度会快很多;三是在 DistCp

20、过程中支持 CheckSum 校验或者增量备份等功能。具体详细的介绍可以参考一下上图的链接,有完整的 Jindo DistCp 的介绍。数据迁移数据迁移上图主要介绍 DistCp 详细的使用,DistCp 写入低频、归档和冷归档的数据。红色字体的-Policy ia、-policy archive、-policy coldArchive 对应 OSS 的低频、归档和冷归档。3、HDFS 命令查看数据类型数据从 HDFS 归档到 OSS 上,如何确认在 OSS 上数据的存储类型?EMR 团队扩展了 LS2 扩展命令,除了能常规的显示原来 LS 命令属性,还会把 OSS 存储类型显示出来,图中的链

21、接连接详细介绍了利用 Jindo DistCp 拷贝数据从 HDFS 到 OSS,有兴趣的用户可以看一下。三、实操演示1、准备测试脚本,包含 DistCp 到 OSS 各种存储类型的文件,测试文件大小为 10M。2、执行测试脚本进行数据拷贝。数据迁移数据迁移如何将 Hive 数据按分区归档到 OSS作者|阿里云计算平台事业部开源大数据数据湖存储-技术专家 健身内容概要:一、背景介绍二、功能介绍三、实现原理四、实操演示视频链接:https:/ HDFS 的替代或补充相比存算分离架构,对于已有 HDFS 数据比较平滑,可以逐渐过渡到存算分离架构二、功能介绍数据仓库数据仓库是大数据的典型场景每天的

22、ETL 作业新增大量数据Hive 支持分区表,使用分区可以快速裁剪数据Hive 数仓中大量 Hive 表以时间日期作为分区字段在数仓中很多表的较老的日期分区平常一般不会被访问,可以考虑把这部分数据移出 HDFSHive 的每个分区都有自己的 storage descriptor,可以有单独的存储路径数据迁移数据迁移以上调用 Jindo DistCp 采用了事务模式,保证作业失败的情况下可以回滚。使用 HDFS 文件锁则保证同一时间内,一个分区只有一个 JindoTable 的作业,防止多个 Jindo DistCp 作业同时操作导致失败。先修改分区元数据,再清理 HDFS 数据,则可以确保数据

23、可用。四、实操演示命令:jindo table-moveTo-t -d -c|-fullTable-b/-before -p/-parallel -o/-overWrite-r/-removeSource-e/-explain-l/-logDir 参数说明要移动的表。目标路径,为表级别的路径,分区路径会在这个路径下自动创建。分区过滤条件表达式,支持基本运算符,不支持 udf。根据分区创建时间,创建时间超过给定天数的分区进行移动。整个 moveTo 任务的最大 task 并发度,默认为 1。-o/-overWrite是否覆盖最终目录。分区级别覆盖,不会覆盖本次移动不涉及的分区。-r/-remov

24、eSource是否在移动完成后删除源路径。本地日志目录,默认为/tmp/-e/-explain如果指定 explain 模式,不会触发实际操作,仅打印会同步的分区。1、数据准备数据迁移数据迁移3、检查移动后的分区情况点击视频链接 23 分 02 秒开始,观看实操演示。https:/ 访问加速OSS 访问加速早期的大数据存储架构大多数是选择 Apache HDFS 来构建大规模数仓的,Apache HDFS 有着很好的易用性,可以在比较廉价的存储节点上快速构建分布式存储系统,它的三副本机制也较好地保证了数据的可靠性。但是随着大数据存储的发展,各家大厂存储的数据到达百 PB 级或 EB 级,Apa

25、che HDFS 也逐渐暴露出了一些问题。比如,HDFS 的元数据都是存储在 NameNode JVM 内存里,这样元数据的规模首先就受限于 JVM。一般来说存储 46 亿文件就比较接近 NameNode 元数据的瓶颈了,对应的数据规模大约是 100PB 左右,对应到不同存储类型的节点大约是 10003000 台的规模。当单组 NameNode 的 HDFS 集群到达瓶颈的时候,社区就引入了 HDFS Federation 方案。Federation 指的是建立 HDFS 联邦集群,然后按业务进行拆分,把不同的数据拆分到不同的集群里,可以突破单组 NameNode 的规模限制,能够扩展 HDF

26、S 存储元数据的能力和容量,但是这样也带来了运维成本增加等问题。原来只需要维护一个集群,现在拆分以后需要维护多个集群,一些重要的业务和作业会对多个存储集群有依赖,只有每个存储集群同时都很好地运作的时候,业务或者作业才能流畅运行,所以运维成本也显著增加。HDFS 子集群之间是通过 nameservice 来相互区分的,让业务方大量地去使用 nameservice 也是一个很不好的体验,比较容易出问题。所以这里很多厂商就引入了 NS Router,可以把很多nameservice 挂在同一个 router 的 path 下,只要访问 router 的 path 对应的目录,业务方就能自动使用不同

27、ns,这也是一个比较便捷的方案,但同时也增加了运维难度,一些 router 也会成为系统的瓶颈。随着业界大数据上云的趋势和对象存储能力的不断提高,现在大家更青睐基于云上对象存储来构建数据湖,利用对象存储高可用、高吞吐的特点,来更快速地构建数据湖存储,这也是当前大数据演进的方向。OSS 访问加速OSS 访问加速一些事务写入。OSS 本身是不支持事务性写入的,所以一旦程序出现异常需要恢复数据的情况就很麻烦。JindoFS 对这些数据湖的需求都提供了很好的支持,来满足数据湖上层的计算引擎和应用,让用户能够更好地使用对象存储。同时很多计算引擎比如 SparkSQL、Hive、Presto 和 Impa

28、la 想要更好地使用对象存储,把引擎的性能发挥到极致是需要做工作的,Jindo Table 可以让这些计算引擎更好地使用对象存储存储,同时把分层存储的能力更好地应用起来。还有一方面是安全,对象存储支持通过 AK 方式和 Token 方式授权。直接使用 AK 方式会有泄露的风险,通过 Token 模式集成就可以很好地避免泄露问题的发生。JindoFS 对免密验证的支持是比较丰富的,同时也对接了 Kerberos 和 Ranger 这类安全组件,对数据湖存储进行安全增强。JindoFS 也对接了对象存储的加密特性保证了数据合规。二、JindoFSJindoFS 是一个双模式,它既可以是一个分布式的

29、缓存系统,为计算加速提供优化(cache 模式),也可以是一个分布式的存储系统,是一个完备的存储引擎(block 模式)。Block 模式和 Cache 模式最大的区别就是它们的数据协议,block 有自己的元数据协议即JindoFS 协议,它通过 JindoFS 的 nameservice 提供元数据的服务,满足对外数据性能上的一些需求。Cache 模式可以直接对访问 OSS 进行加速,它也支持 OSS、S3 和其他云厂商的存储。它的特点是能对 OSS 和 S3 协议提供非常好的支持,使得用户能继续按照以前的访问方式,而且没有对底层数据做切分,与 Alluxio 比较类似。OSS 访问加速O

30、SS 访问加速JindoFS SDK 访问 OSS 包含两部分,一部分是 JindoFS Hadoop SDK,HCFS 标准,一般推荐直接替换掉现有 OSS 或 S3 的 Hadoop SDK,使性能得到大幅提升。另外一部分是 JindoF-S Object SDK,可以直接替换掉 OSS Java SDK,获得性能的提升。四、JindoFS SDK 的优势JindoFS SDK 的优势主要体现在以下几方面。OSS 访问加速OSS 访问加速除了 Object 层的性能提升,还有操作层面的性能提升。上图对 Jindo OSS Hadoop SDK 和现有开源 SDK 进行了对比。可以看到 Ji

31、ndo SDK 的性能都有不同程度的提升。OSS 访问加速OSS 访问加速简单来说就是为 OSS 对象存储提供了一种 Hadoop FS 的实现,只需要像访问 Hadoop FS 一样就可以去访问整个 OSS 文件,同时还会映射出 HDFS 这样的树状结构,使得用户可以在Hive、Spark 上面无缝使用 OSS 的数据。关于为什么要使用 JindoSDK,有以下几方面的原因:首先是性能方面,SDK 是基于 Native 的,它的性能要比 Hadoop OSS SDK 更好。其次是良好的兼容性,我们测试过 Hadoop 大多数版本,所有 2.3 以上的版本都是验证通过。接下来是专业团队维护,因

32、为 Jindo SDK 其实也是我们开发团队自己在用的,所以发现问题我们都会及时去修复和更新。最后是功能更新快,及时跟进 OSS 最新的特性和优化。很多 OSS 的最新特性是对用户不透明的,用户很难去做这些优化,我们会及时跟进这些优化,然后更新。二、Hadoop 使用 JindoFS SDK 访问 OSS使用 JindoFS SDK 访问 OSS,首先需要下载最新的 sdk 安装包,并将其安装到 Hadoop 的classpath 下。OSS 访问加速OSS 访问加速配置以上两个操作以后,就可以轻松地访问 Hadoop OSS 了,上图列举了几个常用的命令。还有一个隐藏的功能是 map/red

33、uce 也可以直接访问 OSS 文件。三、Spark 使用 JindoFS SDK 访问 OSS首先下载一个 jar 包,将 SDK 包安装到 Spark 的 classpath 下。在 spark2.0 之后,jar 包都是放在 spark_home/jars 下面,spark 也允许用户去配置第三方的目录,用户可以把 SDK 放到其他目录中。接下来配置 JindoFS SDK。Spark 访问 HDFS 用的就是 Hadoop 的配置,它是从环境中读取Hadoop conf.dll 变量,然后从里面找到 Hadoop 相关的配置。所以如果第一步已经完成了Hadoop 的配置,就直接去把环境

34、变量导到之前配置的 Hadoop 的 Conf 的目录,这样启动Spark 任务以后它默认就能够读 OSS 的数据了。OSS 访问加速OSS 访问加速4、演示 hadoop 命令5、将 jar 包拷贝到 Spark$SPARK_HOME/jars6、演示 Spark 访问 OSSOSS 访问加速OSS 访问加速Flink 高效 sink 写入 OSS作者|阿里云计算平台事业部开源大数据数据湖存储-技术专家 重湖内容概要:一、背景介绍二、功能介绍三、如何配置及使用视频链接:https:/ Flink 简介业内很多人将大数据的计算引擎分成了四代。第一代的计算引擎是 Hadoop 承载的 MapRe

35、duc-e。MapReduce 将计算分为 Map 和 Reduce 两个阶段,对于上层应用来说,需要想方设法拆分算法,甚至在上层应用实现多个 Job 的串联来完成一个完整的算法,既缺乏效率,又对程序设计很不友好。这些问题催生了支持 DAG 框架的产生,支持 DAG 的框架被划分为第二代计算引擎,可以更高效地实现复杂的 MapReduce。接下来就是以 Spark 为代表的第三代计算引擎,第三代计算引擎支持 Job 内的 DAG 规划,支持全内存计算,使复杂 MapReduce 的作业效率得到大幅度提升。与 Hadoop 的 MapReduce 相比,甚至有十倍至百倍的提升。Spark 的出现

36、对大数据生态的发展起到了显著的促进作用,大量的上层应用开始出现,例如对流计算的支持,对SQL 的支持等。Apache Flink 的诞生被认为是第四代引擎,以分布式流计算为核心,同时支持批处理,以高吞吐、低延时、高容错的流计算为核心,主要体现在如下几点:OSS 访问加速OSS 访问加速在这样的背景下,JindoFS 团队为写入 OSS 开发了 JindoFS Flink Connector。这个 Connect-or 接入 Flink 的两阶段 Checkpoint 机制与 Recorverable 机制,实现了高效可容错地写入 OSS。二、功能介绍1、JindoFS Flink Connec

37、tor 整体架构:两阶段 Checkpoint(检查点)机制:第一阶段 MPU(MultiPartUpload,分片上传)写入 OSS第二阶段 MPU 提交Recoverable Writer 可恢复性写入:临时文件以普通文件格式上传 OSSSink 节点状态快照(两阶段 Checkpoint 示意图)OSS 访问加速OSS 访问加速1、在程序中使用 JindoFS Flink Connector确保集群能够访问 OSS Bucket前提:已购买 OSS 产品,OSS 网站链接:https:/ OSS Bucket,例如正确配置密钥或免密服务等使用合适的路径,流式写入 OSS Bucket。写

38、入 OSS 需使用 OSS:/前缀路径,类似于:OSS:/2、在程序中使用 JindoFS Flink Connector:Java在程序中开启 Flink Checkpoint前提:使用可重发的数据源,如 Kafka通过 StreamExecutionEnvironment 对象打开 Checkpoint(示例):建立:StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();打开:env.enableCheckpointing(,CheckpointingMode.EXACTLY_

39、ONCE);示例程序下文中,outputStream 是一个预先形成的 DataStream对象,若需写入 OSS,则可以这样添加 sink:String outputPath=oss:/;StreamingFileSink sink=StreamingFileSink.forRowFormat(new Path(outputPath),new SimpleStringEncoder(UTF-8)OSS 访问加速54).build();outputStream.addSink(sink);上述程序指定将 outputStream 中的 String 内容写入 OSS 路径 oss:/,最后还需

40、用 env.execute()语句执行 Flink 作业,env 是已建立的StreamExecutionEnvironment 对象最后,将 Java 作业打包为 jar 文件,并用 flink run 在集群提交即可3、程序中使用 JindoFS Flink Connector:Pyflink与 Java 示例类似,在 Pyflink 中使用 JindoFS Flink Connector 与写入 HDFS 等其他介质方式相同,只需:将写入路径写作合适的 OSS 路径注意打开 Checkpoint 功能例如,下列 Python 程序定义了一张位于 OSS 的表:然后将其添加到 Stream

41、TableEnvironment t_env 中即可:t_env.sql_update(sink_ddl)55OSS 访问加速4、在程序中使用 JindoFS Flink Connector:更多配置用户通过 flink run 提交 java 或 pyflink 程序时,可以额外自定义一些参数,格式:flink run-m yarn-cluster-yD key1=value1-yD key2=value2.目前支持“熵注入”及“分片上传并行度”两项配置熵注入(entropy injection):功能:将写入路径的一段特定字符串匹配出来,用一段随机的字符串进行替换效果:削弱所谓“片区”(s

42、harding)效应,提高写入效率配置参数:oss.entropy.key=oss.entropy.length=分片上传并行度:配置参数:oss.upload.max.concurrent.uploads默认值:当前可用的处理器数量OSS 访问加速OSS 访问加速E-MapReduce 从 3.16.0 版本开始支持 Apache Flume,客户最常用的场景是在自建的机器上收集日志,收集日志到 Flume 以后,可以做一些实时的计算和实例性的计算,也可以把数据先存储起来,比如存储到 HDFS、OSS 还有 Kafka 等社区已经提供的 Sink,用户也可以定制一些自己的 Sink,比如说

43、Spark、Presto 等。简单介绍一下 Flume 的架构和技术原理。首先 Flume 在每台机器上都有一个 Flume Agent,它是一个节点进程,从本地或者一些端口收集数据,Agent 中负责收集数据的组件称为 Source,每个数据的基本单元称为一个 Event。Event 分为 Header 和 Body,Header 是 Map 定义里面包含了特定的选项和配置,Body 里面是具体的数据。任何情况下,数据都是以 Event 的形式在Flume 中传递,然后通过事务的形式将数据从 Source 发送到 Channel。Channel 充当了 Source 和 Sink 之间的缓冲

44、队列,它有很多种类型,有基于内存的,也有基于文件的。Sink 和具体的存储服务接口相关的组件,主要负责从 Channel 中获取具体的 Event,以事务的方式 commit 到存储服务中。如果 commit 成功了,它就会从 Channel 中移除。如果不成功,就会回退到退回 Channel 中。Flume 可以将多个 Agent 做一个组合,来保证一些特殊使用场景。OSS 访问加速OSS 访问加速下面看一下 Taildir Source 中文文档。首先 Type 必须要指定为 taildir,然后 filegroups,它可以指定被监听的文件夹集合的名称,通过这个名称来指定具体的文件。配置

45、范例见上图,这里指定了两个文件夹名称,f1 和 f2,f1 是单个的绝对路径的文件,f2 是这个路径下匹配出来的一个文件。还有一些其他的配置,比如 batchSize 指定一次性可以写入多少行数据,还有最大一次写入多少行数据。还可以在 header 中指定 key,Flume 的一些高阶的使用比如拦截器或选择器,它的方式就是通过拦截或指定带有某些 header key 的事件,将它们过滤或者向后传输。OSS 访问加速OSS 访问加速常见 SinkLogger Sink:用于测试。Avro Sink:转换成 Avro Event,主要用于连接多个 Flume Agent。HDFS Sink:写入

46、 HDFS,最为常用。Hive Sink:写入 Hive 表或分区,使用 Hive 事务写 events。Kafka Sink:写入 Kafka。上图为 HDFS Sink 参数表。hello world 的配置方法见下图。OSS 访问加速OSS 访问加速使用 JindoFS SDK 上传数据到 OSS,首先必须使用 JindoFS SDK 3.4 以上的版,往 OSS 上写数据是通过 Flush 来实现的,要保证它的事务性就要支持 Flush 接口,而 OSS 本身不支持 Flush,所以我们要通过 JindoFS 来保证,而且,JindoFS SDK 的性能也比较好。其次我们会使用多线程并

47、发的方式,面对一些比较大的文件还会使用 Multiupload 的方式,首先写本地的缓存,然后在 Multiupload 到 OSS 上,这种方式的性能更优。同时通过 Multiuploa-d 来实现 Flush,就是从本地缓存往 OSS 上推,如果作业失败,可以通过命令或指定的 SDK 的调用来恢复这些数据。OSS 的配置方式是指定一个 OSS 的路径,其他的方式与 HDFS 完全一样,具体的步骤在官方文档中有介绍。如果要在 EMR 集群中恢复文件,可以使用 jfs-recover 命令,它的参数见上图。-R 指递归恢复,递归恢复的话会找 list 目录下所有文件来做 recover。fla

48、shStagingPath 指 Flush 写的临时文件的路径,它会记录临时文件的状态。如果是在 EMR 集群外,就需要调用 jindoFileSystem.recover 接口来实现文件的恢复。OSS 访问加速OSS 访问加速3、接下来安装 Flume,并把 Hadoop 相关的 jar 复制到 Flume 中。4、然后配置文件。OSS 访问加速OSS 访问加速7、下载文件以后可以看到,这是一个 20 行的文件,每秒钟打印一次日志。点击视频链接 19:15 秒开始,观看实操演示。https:/ 访问加速OSS 访问加速二、JindoSDK 安装1、下载安装在 GitHub 下载最新的 jar

49、 包 jindofs-sdk-x.x.x.jar,然后在所有 Presto 节点安装 JindoFSSDK。2、配置 JindoFS OSS 实现类将 JindoFS OSS 实现类配置到所有 Presto 节点上的 Hadoop 的 core-site.xml 中,让每个Presto worker 都能访问到 HDFS 兼容接口中的数据,配置示例如下:OSS 访问加速OSS 访问加速Impala 如何高效查询 OSS 数据作者|阿里云计算平台事业部开源大数据平台-技术专家 流影内容概要:一、背景介绍二、Impala 使用 JindoSDK三、实操演示视频链接:https:/ Impala 介

50、绍简单来说,Apache Impala 是对存储在 Hadoop 集群的 PB 级数据进行快速 SQL 查询分析的分布式 MPP 查询框架。Impala 基于 Hive 的大数据分析引擎,直接使用 Hive 元数据库 Metastore,同时 JDBC 插件也兼容 Hive 协议,可以高效访问 HDFS,支持各种存储格式。OSS 访问加速OSS 访问加速下载最新的 jar 包 jindofs-sdk-x.x.x.jar,将 sdk 包安装到 Impala 的 lib 下:cp jindofs-sdk-$version.jar$IMPALA_HOME/lib/2、配置 JindoFS OSS 实

展开阅读全文
相似文档                                   自信AI助手自信AI助手
猜你喜欢                                   自信AI导航自信AI导航
搜索标签

当前位置:首页 > 研究报告 > 其他

移动网页_全站_页脚广告1

关于我们      便捷服务       自信AI       AI导航        获赠5币

©2010-2024 宁波自信网络信息技术有限公司  版权所有

客服电话:4008-655-100  投诉/维权电话:4009-655-100

gongan.png浙公网安备33021202000488号   

icp.png浙ICP备2021020529号-1  |  浙B2-20240490  

关注我们 :gzh.png    weibo.png    LOFTER.png 

客服