spark处理超大文件方法 spark读取hdfs文件规则


spark处理超大文件方法 spark读取hdfs文件规则

文章插图
spark作为现在主流的分布式计算框架 , 已经融入到了很多的产品中作为ETL的解决方案 。而我们如果想要去测试这样的产品就要对分布式计算的原理有个清晰的认知并且也要熟悉分布式计算框架的使用来针对各种ETL场景设计不同的测试数据 。而一般来说我们需要从以下两个角度来进行测试 。
· ETL能兼容各种不同的数据(不同的数据规模 , 数据分布和数据类型)
· ETL处理数据的正确性
测试数据兼容
ETL是按一定规则针对数据进行清洗 , 抽取 , 转换等一系列操作的简写 。那么一般来说他要能够处理很多种不同的数据类型 。我们在生产上遇见的bug有很大一部分占比是生产环境遇到了比较极端的数据导致我们的ETL程序无法处理 。比如:
数据拥有大量分片
在分布式计算中 , 一份数据是由多个散落在HDFS上的文件组成的 ,  这些文件可能散落在不同的机器上 ,  只不过HDFS会给使用者一个统一的视图 , 让使用者以为自己在操作的是一个文件 , 而不是很多个文件 。这是HDFS这种分布式文件系统的存储方式 。而各种分布式计算框架 ,  比如hadoop的MapReduce , 或者是spark 。就会利用这种特性 , 直接读取散落在各个机器上文件并保存在那个节点的内存中(理想状态下 , 如果资源不够可能还是会发生数据在节点间迁移) 。而读取到内存中的数据也是分片的(partition) 。spark默认以128M为单位读取数据 , 如果数据小于这个值会按一个分片存储 , 如果大于这个值就继续往上增长分片 。比如一个文件的大小是130M ,  spark读取它的时候会在内存中分成两个partition(1个128M , 1个2M) 。如果这个文件特别小 , 只有10M , 那它也会被当做一个partition存在内存中 。所以如果一份数据存放在HDFS中 , 这个数据是由10个散落在各个节点的文件组成的 。那么spark在读取的时候 , 就会至少在内存中有10个partition ,  如果每个文件的大小都超过了128M , partition的数量会继续增加 。
而在执行计算的时候 , 这些存储在多个节点内存中的数据会并发的执行数据计算任务 。也就是说我们的数据是存放在多个节点中的内存中的 ,  我们为每一个partition都执行一个计算任务 。所以我们针对一个特别大的数据的计算任务 ,  会首先把数据按partition读取到不同节点的不同的内存中 ,  也就是把数据拆分成很多小的分片放在不同机器的内存中 。然后分别在这些小的分片上执行计算任务 。最后再聚合每个计算任务的结果 。这就是分布式计算的基本原理 。
那么这个时候问题就来了 ,  这种按partition为单位的分布式计算框架 。partition的数量决定着并发的数量 。可以理解为 , 如果数据有100个partition , 就会有100个线程针对这份数据做计算任务 。所以partition的数量代表着计算的并行程度 。但是不是说partition越多越好 , 如果明明数据就很小 ,  我们却拆分了大量的partition的话 , 反而是比较慢的 。而且所有分片的计算结果最后是要聚合在一个地方的 。这些都会造成网络IO的开销(因为数据是在不同的节点之前传输的) 。尤其是在分布式计算中 , 我们有shuffle这个性能杀手(不熟悉这个概念的同学请看我之前的文章) 。在大量的分片下执行shuffle将会是一个灾难 , 因为大量的网络IO会导致集群处于很高的负载甚至瘫痪 。我们曾经碰见过只有500M但是却有7000个分片的数据 , 那一次的结果是针对这个数据并行执行了多个ETL程序后 , 整个hadoop集群瘫痪了 。这是在数据预处理的时候忘记做reparation(重新分片)的结果 。


以上关于本文的内容,仅作参考!温馨提示:如遇健康、疾病相关的问题,请您及时就医或请专业人士给予相关指导!

「四川龙网」www.sichuanlong.com小编还为您精选了以下内容,希望对您有所帮助: