HDFS Hedged Read 的利弊分析

HDFS Hedged read 是一种优化 HDFS 客户端读取文件性能的方法。它会在存在慢节点的情况下,通过申请多个内存来提高读取性能。但是,由于 Hedged read 会频繁申请内存,可能会导致内存消耗过大,从而影响系统性能。因此,HDFS 并没有默认开启 Hedged read 功能。在使用 Hedged read 时,需要注意内存消耗的问题,以避免对系统性能造成负面影响。

鄙人在之前一篇文章中,简单的介绍了将 Hedged Read 引入 StarRocks 的效果,见 StarRocks 中关于 Hadoop Hedged Read 性能测试 这篇文章。当时得出来的结论是在存在慢节点的情况下,hedged read 一定是正优化的。但是,后面我就被啪啪打脸了,你想一想,真有那么好的功能,HDFS 为什么不默认开启?真那么牛逼,这部分代码为什么7,8年不改进了?

Hedged read 简介

Hedged read 在 hdfs-site.xml 里面提供了两个参数,分别是:

  • dfs.client.hedged.read.threadpool.size:负责hedged read 线程池大小
  • dfs.client.hedged.read.threshold.millis 发起 hedged read 的阀值时间,当一个请求耗时超过这个阀值,就会新发起一个读请求。

总所周知,HDFS 会把一个文件切分成多个 block,每个 block 默认都有 3 副本,这 3 个副本会存储在不同的 DataNode 上面。HDFS 客户端每次读取一个 block 时,都会选择一个存储有该 block 的 DataNode 进行读取。

而 Hedged Read 的思想很简单,当第一次发起的请求耗时超过dfs.client.hedged.read.threshold.millis 这个阀值后,它会把这个 DataNode 加入黑名单,然后重新发起一个新请求,这个请求会发向新的 DataNode。最后看两个请求,谁返回的快,就用谁的。

优点-解决慢节点

这个机制听起来很美好,不把鸡蛋放在一个篮子里面,哪个 DataNode快就用哪个,有效的解决了 HDFS 慢节点的问题。不过它的实现也很粗糙,它只会试两个DataNode,如果两个DataNode都抽风了,那也没办法了。

缺点-费内存

那么这么牛的方法,为什么不默认开启呢?这就要从线上的一个事故说起了。线上的一位客户一听这么牛逼的功能,就马上开启了 hedged read 功能。当时他们自己简单的测试了下,效果确实杠杠的,HDFS 慢节点的问题得到了很大的改善。但是一道生产环境,发现 HDFS 客户端频繁的报 Out Of Memory 问题,导致查询失败。起初以为流量变大了,但是后面经过检查,发现流量其实并没有什么变化。后面客户关闭了 hedged read,就一切正常了。

为了搞明白问题,我就看了下 hedged read 的源码,核心代码如下:

/**
 * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
 * 'hedged' read if the first read is taking longer than configured amount of
 * time. We then wait on which ever read returns first.
 */
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
    long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
    throws IOException {
  final DfsClientConf conf = dfsClient.getConf();
  ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
  CompletionService<ByteBuffer> hedgedService =
      new ExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool());
  ArrayList<DatanodeInfo> ignored = new ArrayList<>();
  ByteBuffer bb;
  int len = (int) (end - start + 1);
  int hedgedReadId = 0;
  while (true) {
    // see HDFS-6591, this metric is used to verify/catch unnecessary loops
    hedgedReadOpsLoopNumForTesting++;
    DNAddrPair chosenNode = null;
    // there is no request already executing.
    if (futures.isEmpty()) {
      // chooseDataNode is a commitment. If no node, we go to
      // the NN to reget block locations. Only go here on first read.
      chosenNode = chooseDataNode(block, ignored);
      // Latest block, if refreshed internally
      block = chosenNode.block;
      bb = ByteBuffer.allocate(len);
      Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
          chosenNode, block, start, end, bb,
          corruptedBlocks, hedgedReadId++);
      Future<ByteBuffer> firstRequest = hedgedService
          .submit(getFromDataNodeCallable);
      futures.add(firstRequest);
      Future<ByteBuffer> future = null;
      try {
        future = hedgedService.poll(
            conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
        if (future != null) {
          ByteBuffer result = future.get();
          result.flip();
          buf.put(result);
          return;
        }
        DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
            + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
        dfsClient.getHedgedReadMetrics().incHedgedReadOps();
        // continue; no need to refresh block locations
      } catch (ExecutionException e) {
        futures.remove(future);
      } catch (InterruptedException e) {
        throw new InterruptedIOException(
            "Interrupted while waiting for reading task");
      }
      // Ignore this node on next go around.
      // If poll timeout and the request still ongoing, don't consider it
      // again. If read data failed, don't consider it either.
      ignored.add(chosenNode.info);
    } else {
      // We are starting up a 'hedged' read. We have a read already
      // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
      // If no nodes to do hedged reads against, pass.
      boolean refetch = false;
      try {
        chosenNode = chooseDataNode(block, ignored, false);
        if (chosenNode != null) {
          // Latest block, if refreshed internally
          block = chosenNode.block;
          bb = ByteBuffer.allocate(len);
          Callable<ByteBuffer> getFromDataNodeCallable =
              getFromOneDataNode(chosenNode, block, start, end, bb,
                  corruptedBlocks, hedgedReadId++);
          Future<ByteBuffer> oneMoreRequest =
              hedgedService.submit(getFromDataNodeCallable);
          futures.add(oneMoreRequest);
        } else {
          refetch = true;
        }
      } catch (IOException ioe) {
        DFSClient.LOG.debug("Failed getting node for hedged read: {}",
            ioe.getMessage());
      }
      // if not succeeded. Submit callables for each datanode in a loop, wait
      // for a fixed interval and get the result from the fastest one.
      try {
        ByteBuffer result = getFirstToComplete(hedgedService, futures);
        // cancel the rest.
        cancelAll(futures);
        dfsClient.getHedgedReadMetrics().incHedgedReadWins();
        result.flip();
        buf.put(result);
        return;
      } catch (InterruptedException ie) {
        // Ignore and retry
      }
      // If refetch is true, then all nodes are in deadNodes or ignoredNodes.
      // We should loop through all futures and remove them, so we do not
      // have concurrent requests to the same node.
      // Once all futures are cleared, we can clear the ignoredNodes and retry.
      if (refetch && futures.isEmpty()) {
        block = refetchLocations(block, ignored);
      }
      // We got here if exception. Ignore this node on next go around IFF
      // we found a chosenNode to hedge read against.
      if (chosenNode != null && chosenNode.info != null) {
        ignored.add(chosenNode.info);
      }
    }
  }
}

可以看到第一次请求,在 28 行申请了一个内存 bb = ByteBuffer.allocate(len);,如果等待超过阀值,在 69 行再一次申请了内存 bb = ByteBuffer.allocate(len);

这里要注意一个问题,在使用 HDFS 客户端读取文件的时候,我们都会事先在外部申请一块内存,然后让 HDFS 客户端往这块内存填东西。而开启了 hedged read 后,HDFS 客户端保底就会为第一次请求申请一个 ByteBuffer,如果存在慢节点情况,再申请一个 ByteBuffer。

这里拉个清单,算下内存消耗,假设我们要读取一个 30MB 的文件:

  • 不开启 hedged read:自己申请的30MB
  • 开启 hedged read:自己申请的30MB + hedged read 第一次申请的 30MB = 60MB
  • 开启 hedged read 且存在慢节点情况:自己申请的30MB + hedged read 第一次申请的 30MB + hedged read 第二次申请的30MB = 90MB

总结

所以只要开启了 hedged read,内存消耗保底就会 x2,如果存在慢节点,就 x3了,那能不OOM才怪了。

所以除非内存富裕,能接受2倍~3倍的额外开销,不然还是别开了,这也能理解为什么 HDFS 默认关闭了。

原创文章,作者:Smith,如若转载,请注明出处:https://www.inlighting.org/archives/hdfs-hedged-read

打赏 微信扫一扫 微信扫一扫
SmithSmith
上一篇 2023年8月21日 下午1:17
下一篇 2023年11月12日 下午7:59

相关推荐

  • Spark-SQL 有用的 SQL

    我发现自己每次用 Spark 造 Iceberg 表都要耗费老大的劲,官方文档总是没有一个现成的 Demo,网上也搜索不到,全靠自己琢磨。故在这里记录一下,顺带帮助一下可能需要的人…

    2023年11月12日
    1.3K3
  • 浅谈 HDFS 慢节点的解决方案

    在优化 HDFS 查询性能时,慢节点问题会显著影响 SQL 的查询效率。本文浅谈了目前解决 HDFS 慢节点的几种思路。

    2024年3月25日
    1.6K3
  • Trino / StarRocks 阿里云 EMR Kerberos 认证指南

    Kerberos 是最为头疼的鉴权配置,但是 Hadoop 全家桶绕不开,只能硬着头皮干了。本文以 Trino 和 StarRocks 为例,讲述如何在非 EMR 的节点上,通过一…

    2023年8月21日
    1.1K0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

评论列表(2条)

  • […] 我在之前的文章里面《HDFS Hedged Read 的利弊分析》分析过,hedged read 因为实现上的挫,它保底要消耗多一倍的内存。期间如果超过阈值发起了 hedged read,那就是两倍。 […]

  • […] 我在之前的文章里面《HDFS Hedged Read 的利弊分析》分析过,hedged read 因为实现上的挫,它保底要消耗多一倍的内存。期间如果超过阈值发起了 hedged read,那就是两倍。 […]