您的位置 首页 > 腾讯云社区

Druid源码阅读(一):Druid Hadoop-based ingestion实现---2011aad

一、Druid Hadoop-based ingestion简介

Apache Druid是一款开源时序OLAP数据库,支持流数据摄入和批数据摄入两种数据写入方式,其中批数据摄入又包括Native batch和Hadoop-based两种方式。根据官方文档[1],Druid推荐使用Native batch方式摄入数据,因为这种方式更灵活,且没有对外部Hadoop集群的依赖。但Hadoop-based数据摄入也有其优势: 1. 生成Segment的离线计算过程可以使用Hadoop集群的计算资源,减少了druid集群的计算压力,做到计算和存储分离;2. 与大数据处理生态对接更方便,前期的数据预处理可以使用Spark、Flink等计算引擎,将处理结果放在某个HDFS目录下即可。

Hadoop-based数据摄入一般是向Druid Overload节点提交一个Json文件,里面会定义数据源、写入的datasource、数据格式等信息,具体语法可参考Druid官方文档[2]。

本文的目的就是对照Druid源码,解析Druid如何通过MapReduce任务完成索引计算并生成Segment文件存储。本文会聚焦于MapReduce任务的执行,略过Druid数据聚合和生成索引的逻辑。数据聚合和生成索引的逻辑相对比较复杂且独立,打算后续在另外的文章中详细描述。

二、MapReduce任务

Druid的Hadoop数据摄入任务实现在indexing-hadoop子工程中,核心代码是IndexGeneratorJob.java这个文件。接下来就深入这个文件,看看Druid如何将HDFS文件中的数据通过MapReduce任务转化为Segment存储下来。

任务构建

现在Spark、Flink框架比较流行,可能很多人已经不认识原始的MapReduce任务代码了。下面的代码就完成了构建一个MapReduce任务并提交给Hadoop集群。可以看到任务使用IndexGeneratorMapper类作为Mapper、使用IndexGeneratorPartitioner作为Partitioner、使用IndexGeneratorCombiner作为Combiner、使用IndexGeneratorReducer作为Reducer,并设置输入输出路径、任务配置参数,最后job.submit()提交任务。

这里job.getConfiguration().set("io.sort.record.percent","0.23")是配置环形缓冲区中用多大比例来保存数据索引,默认值是0.05。

job.getConfiguration().set("io.sort.record.percent", "0.23"); job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(BytesWritable.class); SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class); if (config.getSchema().getTuningConfig().getUseCombiner()) { job.setCombinerClass(IndexGeneratorCombiner.class); job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class); } // setReducerClass(job); job.setReducerClass(IndexGeneratorReducer.class); job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(IndexGeneratorOutputFormat.class); FileOutputFormat.setOutputPath(job, config.makeIntermediatePath()); config.addInputPaths(job); config.intoConfiguration(job); JobHelper.setupClasspath( JobHelper.distributedClassPath(config.getWorkingPath()), JobHelper.distributedClassPath(config.makeIntermediatePath()), job ); job.submit();

下面就具体说明Mapper、Combiner、Reducer的逻辑。

Mapper: IndexGeneratorMapper

IndexGeneratorMapper继承自HadoopDruidIndexerMapper<BytesWritable, BytesWritable>,进入HadoopDruidIndexerMapper,我们看到了熟悉的map函数。

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { try { final List<InputRow> inputRows = parseInputRow(value, parser); for (InputRow inputRow : inputRows) { try { if (inputRow == null) { // Throw away null rows from the parser. log.debug("Throwing away row [%s]", value); context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1); continue; } if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) { final String errorMsg = StringUtils.format( "Encountered row with timestamp that cannot be represented as a long: [%s]", inputRow ); throw new ParseException(errorMsg); } if (!granularitySpec.bucketIntervals().isPresent() || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch())) .isPresent()) { innerMap(inputRow, context); } else { context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1); } } catch (ParseException pe) { handleParseException(pe, context); } } } catch (ParseException pe) { handleParseException(pe, context); } catch (RuntimeException e) { throw new RE(e, "Failure on row[%s]", value); } }

首先parseInputRow,将文本格式输入中的每一行转换为InputRow,这里的parser实例是HadoopyStringInputRowParser,inputRows的实例是MapBasedInputRow。具体调用的ParserSpec会根据提交Json中的spec.dataSchema.parser来实例化,Druid官方文档[3]中说明的数据格式在图一中都能找到对应ParserSpec实现。

图一 Druid ParserSpecs UML

然后对每一行数据做一个过滤,过滤掉空行、没有时间戳的行以及不在任务指定时间范围内的行。这里的判断逻辑是基于提交Json中的spec.dataSchema.granularitySpec.intervals字段,若该字段不存在,则任意时间的数据都可以摄入;若指定了该字段,则需要检查当前行的时间戳是否在需要摄入的时间范围内,以决定是否丢弃该行数据。如果通过了这些条件的校验,函数最终会调用到innerMap函数,否则会对丢弃的行计数,用于日志或监控。

下面来看IndexGeneratorMapper.innerMap函数。

protected void innerMap( InputRow inputRow, Context context ) throws IOException, InterruptedException { // Group by bucket, sort by timestamp final Optional<Bucket> bucket = getConfig().getBucket(inputRow); if (!bucket.isPresent()) { throw new ISE("WTF?! No bucket found for row: %s", inputRow); } final long truncatedTimestamp = granularitySpec.getQueryGranularity() .bucketStart(inputRow.getTimestamp()) .getMillis(); // type SegmentInputRow serves as a marker that these InputRow instances have already been combined // and they contain the columns as they show up in the segment after ingestion, not what you would see in raw // data InputRowSerde.SerializeResult serializeResult = inputRow instanceof SegmentInputRow ? InputRowSerde.toBytes( typeHelperMap, inputRow, aggsForSerializingSegmentInputRow ) : InputRowSerde.toBytes( typeHelperMap, inputRow, aggregators ); final byte[] hashedDimensions = HASH_FUNCTION.hashBytes( HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes( Rows.toGroupKey( truncatedTimestamp, inputRow ) ) ).asBytes(); context.write( new SortableBytes( bucket.get().toGroupKey(), // sort rows by truncated timestamp and hashed dimensions to help reduce spilling on the reducer side ByteBuffer.allocate(Long.BYTES + hashedDimensions.length) .putLong(truncatedTimestamp) .put(hashedDimensions) .array() ).toBytesWritable(), new BytesWritable(serializeResult.getSerializedRow()) ); ParseException pe = IncrementalIndex.getCombinedParseException( inputRow, serializeResult.getParseExceptionMessages(), null ); if (pe != null) { throw pe; } else { context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1); } }

这里第一步是获取该行所在的Bucket,代码中的Bucket的概念与Segment的概念是对应的,即在运算阶段属于同一个Bucket的数据,最终会被写入同一个Segment文件中。Bucket由shardNum、truncatedTimestamp和partitionNum唯一确定(理论上通过shardNum就可以唯一确定,后面会看到shardNum、truncatedTimestamp和partitionNum之间的关系)。因为在进入该函数前已经按时间区间做了过滤,所以这里获取Bucket理论上一定会成功。然后需要对行的时间戳做截取,比如queryGranularity设置为小时,这里就会将时间戳截取至小时粒度,剩余的精度全部补0。时间戳截取的目的是为了方便rollup,即将属于同一个时间区间的数据预聚合起来,这样虽然丢失了部分原始信息,但可以很大程度减少存储的数据量,并提升查询效率。

下一步将InputRow序列化,作为Map输出的Value部分。这里判断了inputRow instanceof SegmentInputRow,即当前inputRow是原始数据还是其他Segment中已经预聚合好的数据,据此会在序列化时使用不同的aggregator。因为如果是Segment中预聚合好的数据,对应的aggregator需要做一些变化。例如指定某一指标需要Count聚合,对于原始数据就是用Count聚合就好,而对于预聚合好的Segment,就要使用LongSum聚合。

最后一步就是是输出Key-Value对,其中key是固定160字节的BytesWritable(包括一个四字节的SortableBytes长度和156字节的SortableBytes),Value就是InputRow序列化后的BytesWritable。SortableBytes的格式如下图二所示,其中groupKey标识了归属的Bucket,其作用是在Reduce阶段将属于同一个Bucket的数据放入同一次reduce函数的调用中,从而保存在同一个Segment文件里。sortKey中hashedDimensions是根据当前行截取后的时间戳以及所有维度的取值计算出的哈希值,sortKey的作用是将所有维度值相同的行排序时排在一起,可以减少Combine阶段和Reduce阶段的spill。

就这个Map输出的Key来讲,我认为还是有优化的空间:1. 图中各个字段均是定长的,最前面4个字节的groupKeySize可以省去;2. sortKey中不需要再写入truncatedTimestamp,时间戳在hashedDimensions中已有体现。

图二 Map Output SortableBytesCombiner: IndexGeneratorCombiner

MapReduce中Combiner的作用是预聚合单节点Key相同的数据,减少Shuffle过程的数据传输量。由于任务设置了setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class),在Combine过程中会使用BytesWritable.Comparator作为比较运算符,对Map输出的Key-Value对进行分组预聚合。进入BytesWritable.Comparator,可以看到这个Comparator比较跳过了前4个字符,比较了Map阶段输出的SortableBytes。因此Combine阶段只有所有维度取值都相同的行才会被聚合在一起,输入到reduce函数中。

job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class) public static class Comparator extends WritableComparator { public Comparator() { super(BytesWritable.class);} public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return compareBytes(b1, s1 + 4, l1 - 4, b2, s2 + 4, l2 - 4); } }

Combine阶段预聚合逻辑如下。首先,如果只有一行数据,则直接将该行数据输出;如果有多条数据,则需要将其预聚合起来,这也是Druid rollup的核心过程,将所有维度值(包括TruncatedTimestamp)相同的数据压缩成一行,减少数据存储量。

数据的聚合过程就是创建一个index,并不断把数据行加入index中,如果index满了(index.canAppendRow()里检查了Segment行数的配置和Segment大小的配置),就flush当前index,并打开一个新的index。具体index是如何计算的本文不去细究,后续会更新另外的文章解析索引构建的方法。

@Override protected void reduce(final BytesWritable key, Iterable<BytesWritable> values, final Context context) throws IOException, InterruptedException { Iterator<BytesWritable> iter = values.iterator(); BytesWritable first = iter.next(); if (iter.hasNext()) { LinkedHashSet<String> dimOrder = new LinkedHashSet<>(); SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null, null); index.add(InputRowSerde.fromBytes(typeHelperMap, first.getBytes(), aggregators)); while (iter.hasNext()) { context.progress(); InputRow value = InputRowSerde.fromBytes(typeHelperMap, iter.next().getBytes(), aggregators); if (!index.canAppendRow()) { dimOrder.addAll(index.getDimensionOrder()); log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); flushIndexToContextAndClose(key, index, context); index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnCapabilities()); } index.add(value); } flushIndexToContextAndClose(key, index, context); } else { context.write(key, first); } }Partitioner: IndexGeneratorPartitioner

Map或Combine阶段输出的Key-Value对会使用指定的Partitioner进行分区,之后Reducer会从每个Map或Combine的结果中读取属于自己的分区数据,完成Shuffle的过程。

int numReducers = Iterables.size(config.getAllBuckets().get()); if (numReducers == 0) { throw new RuntimeException("No buckets?? seems there is no data to index."); } job.setNumReduceTasks(numReducers); //====================================================================================== @Override public int getPartition(BytesWritable bytesWritable, Writable value, int numPartitions) { final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); bytes.position(4); // Skip length added by SortableBytes int shardNum = bytes.getInt(); if ("local".equals(JobHelper.getJobTrackerAddress(config))) { return shardNum % numPartitions; } else { if (shardNum >= numPartitions) { throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions); } return shardNum; } }

Reduce Task的个数取决于两部分配置。1. granularitySpec中时间段的划分:假设granularitySpec.intervals设置了一个一天的时间区间,granularitySpec.segmentGranularity.period配置了"PT1H",即以小时为粒度划分Segment,这样就会划分出24个Segment,相应的就有24个Reduce Task;2. tuningConfig.partitionsSpec中numShards配置:假设这个numShards配置的值为2,那么上面的每个时间区间又会分为2个分片,这样就有24*2=48个Reduce Task。

"granularitySpec": { "type": "uniform", "segmentGranularity": { "type": "period", "period": "PT1H", "origin": null }, "rollup": true, "intervals": [ "2020-04-15T16:00:00.000Z/2020-04-16T16:00:00.000Z" ] } "tuningConfig": { "type": "hadoop", "partitionsSpec": { "type": "hashed", "numShards": 2 } }

例如对于如上所示配置,在任务执行时会产生48个Reduce Task,其对应的partitionNum和shardNum如下图三所示。可以看到生成任务的shardNum的取值为0-47,而getPartition函数中传入的numPartitions为48(Reduce Task个数),因此可以直接用shardNum作为getPartition返回的结果。这里可以看到,shardNum实际上唯一确定了Bucket,因此相同Bucket中的数据会进入同一个Reduce Task中,最终会存储在同一个Segment中。

图三 partitionNum和shardNum示例Reducer: IndexGeneratorReducer

Reducer最后会将属于自己的分区数据收集到一起,调用reduce函数进一步将预聚合过的数据合并为Segment文件,这里合并的逻辑其实和Combine阶段非常类似。注意SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class)中的job.setGroupingComparatorClass(SortableBytesGroupingComparator.class),在Reduce阶段,会根据SortableBytesGroupingComparator将数据分组,调用reduce函数,SortableBytesGroupingComparator只比较了图二中的groupKey部分,即同一个Bucket的数据会被放在一起计算。因此Reduce阶段的reduce函数对于每个Task只会执行一次,生成一个Segment(包括一个descriptor.json文件和一个index.zip文件),并写入指定的HDFS路径下。

public static class SortableBytesGroupingComparator extends WritableComparator { protected SortableBytesGroupingComparator() { super(BytesWritable.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int b1Length = ByteBuffer.wrap(b1, s1 + 4, l1 - 4).getInt(); int b2Length = ByteBuffer.wrap(b2, s2 + 4, l2 - 4).getInt(); final int retVal = compareBytes(b1, s1 + 8, b1Length, b2, s2 + 8, b2Length); return retVal; } }

三、执行大图

图四是上述的MapReduce过程中数据转化的示例:假设我们的输入有2个HDFS文件,对应2个Map Task,时间区间只划分为两部分,对应的truncatedTimestamp分别为T1和T2,numShards为2,这样会生成4个Segment。dx和dy表示2个维度,dxi和dyi表示具体维度的不同取值,m表示一个指标,mi表示指标的具体取值,m的聚合方式用agg表示。Map阶段会将HDFS文件读取为行数据,Combine阶段会对同一个Map任务的输出将时间和维度值都相同的行预聚合好。Combine后的行数据会根据行所对应的的分区分发到响应的Reduce任务,并进一步聚合生成Segment文件。

图四 Druid Hadoop-based数据摄入MapReduce执行示例四、总结

通过学习Druid Hadoop-based数据摄入的流程,把“古老”的MapReduce过程又学习了一遍,学习的过程中参考了[4]中的一张MapReduce工作原理图,推荐想要学习MapReduce的同学都去看下;对任务Spec中各个配置字段的含义也有了更深入的了解,这里也给出一些参数设置建议:

建议使用targetRowsPerSegment而不是numShards。在partitionsSpec中targetRowsPerSegment和numShards配置是互斥的,使用targetRowsPerSegment配置可以更合理的控制每个Segment的大小,既不会出现超大Segment,也不会出现很多小Segment,利于historical节点进行加载和缓存。如果使用numShards,建议同时配置partitionDimensions。partitionDimensions可以配置如账号ID之类的信息,这样同一个账号的数据会保存在同一个Segment中,查询时可以减少读取的Segment数目,提升查询性能。tuningConfig.useCombiner置为true。这个值默认是false,一般来讲,对于druid数据摄入的场景,预聚合可以很大程度上减少Shuffle过程中的数据传输量,减少作业运行时间。

参考文献

[1] https://druid.apache.org/docs/0.17.1/ingestion/index.html#batch

[2] Hadoop-based ingestion,https://druid.apache.org/docs/latest/ingestion/hadoop.html

[3] Data formats,https://druid.apache.org/docs/latest/ingestion/data-formats.html

[4] mapreduce二次排序详解,https://www.lagou.com/lgeduarticle/8090.html

---来自腾讯云社区的---2011aad

关于作者: 瞎采新闻

这里可以显示个人介绍!这里可以显示个人介绍!

热门文章

留言与评论(共有 0 条评论)
   
验证码: