鄙人在之前一篇文章中,简单的介绍了将 Hedged Read 引入 StarRocks 的效果,见 StarRocks 中关于 Hadoop Hedged Read 性能测试 这篇文章。当时得出来的结论是在存在慢节点的情况下,hedged read 一定是正优化的。但是,后面我就被啪啪打脸了,你想一想,真有那么好的功能,HDFS 为什么不默认开启?真那么牛逼,这部分代码为什么7,8年不改进了?
Hedged read 简介
Hedged read 在 hdfs-site.xml
里面提供了两个参数,分别是:
dfs.client.hedged.read.threadpool.size
:负责hedged read 线程池大小dfs.client.hedged.read.threshold.millis
发起 hedged read 的阀值时间,当一个请求耗时超过这个阀值,就会新发起一个读请求。
总所周知,HDFS 会把一个文件切分成多个 block,每个 block 默认都有 3 副本,这 3 个副本会存储在不同的 DataNode 上面。HDFS 客户端每次读取一个 block 时,都会选择一个存储有该 block 的 DataNode 进行读取。
而 Hedged Read 的思想很简单,当第一次发起的请求耗时超过dfs.client.hedged.read.threshold.millis
这个阀值后,它会把这个 DataNode 加入黑名单,然后重新发起一个新请求,这个请求会发向新的 DataNode。最后看两个请求,谁返回的快,就用谁的。
优点-解决慢节点
这个机制听起来很美好,不把鸡蛋放在一个篮子里面,哪个 DataNode快就用哪个,有效的解决了 HDFS 慢节点的问题。不过它的实现也很粗糙,它只会试两个DataNode,如果两个DataNode都抽风了,那也没办法了。
缺点-费内存
那么这么牛的方法,为什么不默认开启呢?这就要从线上的一个事故说起了。线上的一位客户一听这么牛逼的功能,就马上开启了 hedged read 功能。当时他们自己简单的测试了下,效果确实杠杠的,HDFS 慢节点的问题得到了很大的改善。但是一道生产环境,发现 HDFS 客户端频繁的报 Out Of Memory 问题,导致查询失败。起初以为流量变大了,但是后面经过检查,发现流量其实并没有什么变化。后面客户关闭了 hedged read,就一切正常了。
为了搞明白问题,我就看了下 hedged read 的源码,核心代码如下:
/**
* Like {@link #fetchBlockByteRange}except we start up a second, parallel,
* 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
CompletionService<ByteBuffer> hedgedService =
new ExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool());
ArrayList<DatanodeInfo> ignored = new ArrayList<>();
ByteBuffer bb;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
DNAddrPair chosenNode = null;
// there is no request already executing.
if (futures.isEmpty()) {
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb,
corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
Future<ByteBuffer> future = null;
try {
future = hedgedService.poll(
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) {
ByteBuffer result = future.get();
result.flip();
buf.put(result);
return;
}
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
+ "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
// continue; no need to refresh block locations
} catch (ExecutionException e) {
futures.remove(future);
} catch (InterruptedException e) {
throw new InterruptedIOException(
"Interrupted while waiting for reading task");
}
// Ignore this node on next go around.
// If poll timeout and the request still ongoing, don't consider it
// again. If read data failed, don't consider it either.
ignored.add(chosenNode.info);
} else {
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass.
boolean refetch = false;
try {
chosenNode = chooseDataNode(block, ignored, false);
if (chosenNode != null) {
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable =
getFromOneDataNode(chosenNode, block, start, end, bb,
corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest =
hedgedService.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
} else {
refetch = true;
}
} catch (IOException ioe) {
DFSClient.LOG.debug("Failed getting node for hedged read: {}",
ioe.getMessage());
}
// if not succeeded. Submit callables for each datanode in a loop, wait
// for a fixed interval and get the result from the fastest one.
try {
ByteBuffer result = getFirstToComplete(hedgedService, futures);
// cancel the rest.
cancelAll(futures);
dfsClient.getHedgedReadMetrics().incHedgedReadWins();
result.flip();
buf.put(result);
return;
} catch (InterruptedException ie) {
// Ignore and retry
}
// If refetch is true, then all nodes are in deadNodes or ignoredNodes.
// We should loop through all futures and remove them, so we do not
// have concurrent requests to the same node.
// Once all futures are cleared, we can clear the ignoredNodes and retry.
if (refetch && futures.isEmpty()) {
block = refetchLocations(block, ignored);
}
// We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) {
ignored.add(chosenNode.info);
}
}
}
}
可以看到第一次请求,在 28 行申请了一个内存 bb = ByteBuffer.allocate(len);
,如果等待超过阀值,在 69 行再一次申请了内存 bb = ByteBuffer.allocate(len);
。
这里要注意一个问题,在使用 HDFS 客户端读取文件的时候,我们都会事先在外部申请一块内存,然后让 HDFS 客户端往这块内存填东西。而开启了 hedged read 后,HDFS 客户端保底就会为第一次请求申请一个 ByteBuffer,如果存在慢节点情况,再申请一个 ByteBuffer。
这里拉个清单,算下内存消耗,假设我们要读取一个 30MB 的文件:
- 不开启 hedged read:自己申请的30MB
- 开启 hedged read:自己申请的30MB + hedged read 第一次申请的 30MB = 60MB
- 开启 hedged read 且存在慢节点情况:自己申请的30MB + hedged read 第一次申请的 30MB + hedged read 第二次申请的30MB = 90MB
总结
所以只要开启了 hedged read,内存消耗保底就会 x2,如果存在慢节点,就 x3了,那能不OOM才怪了。
所以除非内存富裕,能接受2倍~3倍的额外开销,不然还是别开了,这也能理解为什么 HDFS 默认关闭了。
原创文章,作者:Smith,如若转载,请注明出处:https://www.inlighting.org/archives/hdfs-hedged-read
评论列表(1条)
[…] 我在之前的文章里面《HDFS Hedged Read 的利弊分析》分析过,hedged read 因为实现上的挫,它保底要消耗多一倍的内存。期间如果超过阈值发起了 hedged read,那就是两倍。 […]