当前位置: 首页 > article >正文

深入HDFS——数据上传源码

引入

就如RPC篇章里提到的观点一样,任何一种能广为传播的技术,都是通过抽象和封装的思想,屏蔽底层底层复杂实现,提供简单且强大的工具,来降低使用门槛的。

HDFS的风靡自然也是如此。

通过前面深入了NameNode和DataNode的启动源码,我们已经是略有体会,但重启毕竟属于工作时几乎遇不到的场景,所以今天我们从HDFS最常用的上传功能入手,去看看HDFS是如何实现的。

数据上传过程

既然是我们要使用的功能,自然需要我们自己动手编写向HDFS中写入数据的代码啦。

下面我写了一个简单的写入数据代码:

public class WriteDataToHDFS {
    public static void main(String[] args) throws IOException, InterruptedException{
        Configuration conf = new Configuration();
  
        //创建FileSystem对象
        FileSystem fs= FileSystem.get(URI.create("hdfs://hadoop1:8020/"),conf,"root");

        //创建HDFS文件路径
        Path path = new Path("/chaos.txt");
        FSDataOutputStream out = fs.create(path);

        //向HDFS中写出数据
        out.write("hello chaos".getBytes());
    }
}

可以看到,实现的代码是很简单的。

使用确实是很容易的,那么底层实现是怎样的呢?

我们先通过前面的了解,先来梳理一下,客户端向 HDFS写入数据的实现流程:

  1. 客户端与NameNode进行通信,获取数据写入HDFS中对应哪些DataNode节点;
  2. 在客户端将数据划分成packet传输到HDFS各个DataNode节点上。

看起来好像也不难的样子?

但实际上底层实现可没那么简单。下面我们会从以下几个模块去深入源码,一起看看HDFS是如何实现数据上传的,里面又有哪些有意思的细节。

  1. 创建文件系统并初始化DFSClient
  2. 连接NameNode创建目录
  3. 启动DataStreamer线程
  4. 向dataQueue队列中写入packet
  5. 设置副本写入策略源码
  6. 客户端与DataNode建立socket通信
  7. 向Datanode中上传数据

1.创建文件系统并初始化DFSClient

操作HDFS前需要创建文件系统,并初始化DFSClient对象,该对象中持有与NameNode通信的NameNode Rpc Proxy。

而DFSClient对象的创建代码如下:

FileSystem fileSystem = FileSystem.get(URI.create("hdfs://hadoop1:8020/"),conf,"root");

这里也可以通过FileSystem.newInstance(conf)创建,不过殊途同归,底层实现是类似的。

FileSystem.get()具体源码如下:

public static FileSystem get(final URI uri, final Configuration conf,
      final String user) throws IOException, InterruptedException {
  String ticketCachePath =
    conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
  UserGroupInformation ugi =
      UserGroupInformation.getBestUGI(ticketCachePath, user);
  return ugi.doAs(new PrivilegedExceptionAction() {
    @Override
    public FileSystem run() throws IOException {
      //创建分布式文件系统及初始化DFSClient
      return get(uri, conf);
    }
  });
}

继续往里走,来看get(uri, conf)方法源码如下:

public static FileSystem get(URI uri, Configuration conf) throws IOException {
  ... ...
  //创建分布式文件系统及初始化DFSClient
  return CACHE.get(uri, conf);
}

CACHE.get(uri,conf)又调用到如下源码:

FileSystem get(URI uri, Configuration conf) throws IOException{
  Key key = new Key(uri, conf);
  //创建分布式文件系统及初始化DFSClient
  return getInternal(uri, conf, key);
}

getInternal方法源码如下:

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
  FileSystem fs;
  synchronized (this) {
    fs = map.get(key);
  }
  if (fs != null) {
    return fs;
  }
  // 创建分布式文件系统及初始化DFSClient
  fs = createFileSystem(uri, conf);
  ... ...
  return fs;
}

从上面源码可以看到,getInternal方法中,核心就在createFileSystem方法,这个方法会创建分布式文件系统及初始化DFSClient。

关于这个文件系统对应的类是什么,其实直接跟着代码找还是很难找到的,但是熟悉面向对象知识的小伙伴,肯定一看就知道,这个类一定是FileSystem的实现类。

那我们就先来看看FileSystem的注释:

An abstract base class for a fairly generic filesystem. It may be implemented as a distributed filesystem, or as a "local" one that reflects the locally-connected disk. The local version exists for small Hadoop instances and for testing.

All user code that may potentially use the Hadoop Distributed File System should be written to use a FileSystem object. The Hadoop DFS is a multi-machine system that appears as a single disk. It's useful because of its fault tolerance and potentially very large capacity.

The local implementation is {@link LocalFileSystem} and distributed implementation is DistributedFileSystem.

翻译:

这是一个相当通用的文件系统的抽象基类。它可能被实现为分布式文件系统,或者是反映本地连接磁盘的 “本地” 文件系统。本地版本适用于小型 Hadoop 实例和测试。

 

所有可能潜在使用 Hadoop 分布式文件系统的用户代码,都应编写为使用文件系统对象。Hadoop DFS 是一个多机系统,表现得像单个磁盘。它很有用,因为其具有容错性和潜在的非常大的容量。

 

本地实现是 {@link LocalFileSystem} ,分布式实现是 DistributedFileSystem 。

这下子就清晰了,我们这里创建的分布式文件系统类自然就是DistributedFileSystem类。(org.apache.hadoop.hdfs.DistributedFileSystem

我们回来接着看createFileSystem的源码如下:

private static FileSystem createFileSystem(URI uri, Configuration conf
    ) throws IOException {
  //创建的 class 为 org.apache.hadoop.hdfs.DistributedFileSystem
  Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
  //初始化分布式文件系统
  FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
  //调用到 DistributedFileSystem 中的 initialize方法,初始化创建DFSClient
  fs.initialize(uri, conf);
  return fs;
}

可以看到,是通过ReflectionUtils.newInstance(clazz, conf) 获取的这个文件系统类。

但在执行ReflectionUtils.newInstance(clazz, conf) 前,通过getFileSystemClass(uri.getScheme(), conf) 获取clazz才是重点,其对应源码如下:

public static Class<? extends FileSystem> getFileSystemClass(String scheme,
    Configuration conf) throws IOException {
  if (!FILE_SYSTEMS_LOADED) {
    //将 FileSystem 抽象类的所有实现类中的 schema,和对应实现类,放入 SERVICE_FILE_SYSTEMS 对象中
    loadFileSystems();
  }
  Class<? extends FileSystem> clazz = null;
  if (conf != null) {
    // 从配置中获取 fs.hdfs.impl
    // 如果配置文件中没有配置 fs.hdfs.impl 那么获取的clazz 为 null
    // 3.x这里会有些小区别
    clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
  }
  if (clazz == null) {
    //获取的clazz 为 org.apache.hadoop.hdfs.DistributedFileSystem
    clazz = SERVICE_FILE_SYSTEMS.get(scheme);
  }
  if (clazz == null) {
    throw new IOException("No FileSystem for scheme: " + scheme);
  }
  return clazz;
}

以上代码中loadFileSystems(),会将 FileSystem 抽象类的所有实现类中的 schema,和对应实现类,放入 SERVICE_FILE_SYSTEMS 对象中。

loadFileSystems()实现源码如下:

private static void loadFileSystems() {
  synchronized (FileSystem.class) {
    if (!FILE_SYSTEMS_LOADED) {
      //ServiceLoader.load(FileSystem.class)会加载所有 FileSystem 实现类中的schema信息
      ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
      for (FileSystem fs : serviceLoader) {
        //将所有文件系统的 schema信息存入 SERVICE_FILE_SYSTEMS Map中
        SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
      }
      FILE_SYSTEMS_LOADED = true;
    }
  }
}

getFileSystemClass 方法中首先从配置文件中获取 fs.hdfs.impl 配置的HDFS类,默认在HDFS中没有配置该属性,该属性也没有默认值,所以得到clazz为null,进而执行 SERVICE_FILE_SYSTEMS.get(scheme) 得到的clazz为org.apache.hadoop.hdfs.DistributedFileSystem。

FileSystem.createFileSystem() 中执行 fs.initialize(uri, conf) 时,这里的fs就是org.apache.hadoop.hdfs.DistributedFileSystem类,所以相当于执行的是DistributedFileSystem.initialize

对应实现与源码如下:

public void initialize(URI uri, Configuration conf) throws IOException {
  super.initialize(uri, conf);
  setConf(conf);

  String host = uri.getHost();
  if (host == null) {
    throw new IOException("Incomplete HDFS URI, no host: "+ uri);
  }
  homeDirPrefix = conf.get(
      DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
      DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
  //创建DFSClient,传入的URI 为NameNode URI
  this.dfs = new DFSClient(uri, conf, statistics);
  this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
  this.workingDir = getHomeDirectory();
}

通过以上代码,会创建DFSClient对象,并在创建DFSClient对象时创建NameNode Rpc Proxy对象,并赋值给其属性namenode,方便后续客户端和NameNode进行通信。

具体的 new DFSClient构造如下:

public DFSClient(URI nameNodeUri, Configuration conf, FileSystem.Statistics stats) throws IOException {
	//创建DFSClient ,传入了 NameNode的URI
    this(nameNodeUri, null, conf, stats);
}

this调用到DFSClient实现,其中创建了NameNode Rpc Proxy 并赋值给了namenode属性。

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
    Configuration conf, FileSystem.Statistics stats) throws IOException {
    ... ...
    //获取NameNode Rpc Proxy
    proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
        nameNodeUri, ClientProtocol.class, numResponseToDrop,
        nnFallbackToSimpleAuth);
    ... ...
    //给DFSClient中的namenode 赋值 NameNode的Rpc Proxy对象
    this.namenode = proxyInfo.getProxy();
    ... ...
}

后续客户端可以通过DFSClient.namenode获取到NameNode的RPC Proxy对象与NameNode进行通信。

2.连接NameNode创建目录

在我们编写的代码执行到 fs.create(path) 时,会在HDFS中创建目录并准备dataQueue,dataQueue用于客户端数据传输队列,并最后返回 FSDataOutputStream 对象,该对象用于向HDFS中写数据。

跟进fs.create() 源码一层层对象包装,会发现该create方法最终实际调到 DistributedFileSystem.create()方法,其源码如下:

@Override
public FSDataOutputStream create(Path f, FsPermission permission,
    boolean overwrite, int bufferSize, short replication, long blockSize,
    Progressable progress) throws IOException {
  //返回 FSDataOutputStream 对象
  return this.create(f, permission,
      overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
          : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
      blockSize, progress, null);
}

以上create方法会继续调用到DistributedFileSystem.create()方法,只是参数不同,源码如下:

@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
  final EnumSet<CreateFlag> cflags, final int bufferSize,
  final short replication, final long blockSize, final Progressable progress,
  final ChecksumOpt checksumOpt) throws IOException {
  ... ...
  return new FileSystemLinkResolver<FSDataOutputStream>() {
    @Override
    public FSDataOutputStream doCall(final Path p)
        throws IOException, UnresolvedLinkException {
        //创建了一个DFSOutputStream,做了很多初始化操作
      /**
       *  1.往文件目录树里面添加了INodeFile
       *  2.添加了契约管理
       *  3.启动了DataStreamer(写数据流程的关键服务)
       */
      //执行dfs.create方法,最终调用到 DFSClient.create方法
      final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
              cflags, replication, blockSize, progress, bufferSize,
              checksumOpt);
      //FSDataOutputStream 是DFSOutputStream 进行了再一次的封装。【装饰模式】
      return dfs.createWrappedOutputStream(dfsos, statistics);
    }
   ... ...
}

3.x版本,在上面代码最后会 return safelyCreateWrappedOutputStream(dfsos),感兴趣的小伙伴可以深入看看它们的区别。

以上代码执行dfs.create方法,最终调用到 DFSClient.create方法:

public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication,
		long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException {
	return create(src, permission, flag, true, replication, blockSize, progress, buffersize, checksumOpt, null);
}

DFSClient.create方法又经过一些列参数包装,最终调用到如下源码:

public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent,
		short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt,
		InetSocketAddress[] favoredNodes) throws IOException {
	checkOpen();
	if (permission == null) {
		permission = FsPermission.getFileDefault();
	}
	FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
	if (LOG.isDebugEnabled()) {
		LOG.debug(src + ": masked=" + masked);
	}
	// newStreamForCreate中获取到NameNoae Rpc Proxy 代理对象并连接创建目录,然后启动DataStreamer 线程用于接收客户端上传的packet
	/**
	 * 总结:
	 * 1.往文件目录树里面添加了文件
	 * 2.添加了契约
	 * 3.启动了DataStreamer
	 */
	final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent,
			replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt),
			getFavoredNodesStr(favoredNodes));


	// 开启续约(契约)
	beginFileLease(result.getFileId(), result);
	return result;
}

以上代码我补充了很多注释,这里就不赘述了,下面接着看newStreamForCreate实现源码如下:

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked,
		EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress,
		int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException {
    ... ...
    //重试的代码结构
    while (shouldRetry) {
        shouldRetry = false;
        try {
            /**
             * HDFS原理总结:
             * 创建目录:就是在 目录树(元数据)上面添加一个子Node (INodeDirectory)
             * 上传文件:
             *     1.在目录树里面添加一个字Node(InodeFile)
             *     2.再往文件里面写数据
             *     更新了元数据
             *     添加了契约
             *  往目录树里添加InodeFile,记录元数据日志和添加契约
             *  这儿都是需要跟Namenode的服务端进行交互的
             */
            // dfsClient.namenode 就是 NameNode Rpc Proxy 对象,调用的create方法,调用到NameNodeRpcServer.create方法
            stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
                    new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize,
                    SUPPORTED_CRYPTO_VERSIONS);
            break;
        } catch (RemoteException re) {
            IOException e = re.unwrapRemoteException(AccessControlException.class,
                    DSQuotaExceededException.class, FileAlreadyExistsException.class,
                    FileNotFoundException.class, ParentNotDirectoryException.class,
                    NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class,
                    UnresolvedPathException.class, SnapshotAccessControlException.class,
                    UnknownCryptoProtocolVersionException.class);
            if (e instanceof RetryStartFileException) {
                //重试
                if (retryCount > 0) {
                    shouldRetry = true;
                    retryCount--;
                } else {
                    throw new IOException("Too many retries because of encryption" + " zone operations", e);
                }
            } else {
                throw e;
            }
        }
    }
    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");

    //普通写文件策略,out对象是DFSOutputStream
    //该DFSOutputStream构造中会创建DataStreamer 线程,负责向HDFS中写数据
    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum,
            favoredNodes);
    //启动DataStreamer 线程 ,运行run方法
    out.start();
    return out;
	... ...
}

以上代码dfsClient.namenode.create()方法会通过NameNode Rpc Proxy 对象调用到NameNodeRpcServer.create方法,然后在HDFS中经过一些目录和权限判断来创建对应目录。NameNodeRpcServer.create源码如下:

@Override // ClientProtocol 客户端创建文件
public HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize, 
    CryptoProtocolVersion[] supportedVersions)
    throws IOException {
    //检查namenoe启动状态
    checkNNStartup();
    ... ...
    //创建文件核心代码
    status = namesystem.startFile(src, perm, clientName, clientMachine,
        flag.get(), createParent, replication, blockSize, supportedVersions,
        cacheEntry != null);
    ... ...
    return status;
}

以上startFile源码如下:

HdfsFileStatus startFile(String src, PermissionStatus permissions,
    String holder, String clientMachine, EnumSet<CreateFlag> flag,
    boolean createParent, short replication, long blockSize, 
    CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
    throws AccessControlException, SafeModeException,
    FileAlreadyExistsException, UnresolvedLinkException,
    FileNotFoundException, ParentNotDirectoryException, IOException {
    ... ...
    //创建文件目录
    status = startFileInt(src, permissions, holder, clientMachine, flag,
        createParent, replication, blockSize, supportedVersions,
        logRetryCache);
    ... ...
    return status;
}

startFileInt实现源码如下:

private HdfsFileStatus startFileInt(String src,
    PermissionStatus permissions, String holder, String clientMachine,
    EnumSet flag, boolean createParent, short replication,
    long blockSize, CryptoProtocolVersion[] supportedVersions,
    String ecPolicyName, String storagePolicy, boolean logRetryCache)
    throws IOException {
    ... ...
    //创建文件目录
    stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
        clientMachine, flag, createParent, replication, blockSize, feInfo,
        toRemoveBlocks, shouldReplicate, ecPolicyName, storagePolicy,
        logRetryCache);
    ... ...
}

以上代码中startFile实现如下:

static HdfsFileStatus startFile(
    FSNamesystem fsn, INodesInPath iip,
    PermissionStatus permissions, String holder, String clientMachine,
    EnumSet flag, boolean createParent,
    short replication, long blockSize,
    FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
    boolean shouldReplicate, String ecPolicyName, String storagePolicy,
    boolean logRetryEntry)
    throws IOException {
    ... ...
    //创建文件
    iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
        replication, blockSize, holder, clientMachine, shouldReplicate,
        ecPolicyName, storagePolicy);
    ... ...
}

addFile中最终会执行 fsd.addINode(existing, newNode, permissions.getPermission()) 向HDFS中添加目录信息。

3.启动DataStreamer线程

顺着流程就走到 DFSOutputStream.newStreamForCreate() 源码的下半部分:

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
    FsPermission masked, EnumSet flag, boolean createParent,
    short replication, long blockSize, Progressable progress,
    DataChecksum checksum, String[] favoredNodes, String ecPolicyName,
    String storagePolicy)
    throws IOException {
    ... ...
    // 普通写文件策略,out对象是DFSOutputStream
    // 该DFSOutputStream构造中会创建DataStreamer 线程,负责向HDFS中写数据
    out = new DFSOutputStream(dfsClient, src, stat,
        flag, progress, checksum, favoredNodes, true);
    ... ...
    //启动DataStreamer 线程 ,运行run方法
    out.start();
    return out;
}

当向NameNode连接创建目录后,会执行new DFSOutputStream()创建DFSOutputStream对象并最终返回,在创建该对象的构造中同时创建了DataStreamer对象并赋值给streamer属性,DataStreamer对象负责后续接收客户端上传数据并将数据发送pipeline方式发送到DataNode上,该对象为一个线程,创建DFSOutputStream对象完成后会执行out.start()方法进行启动。

new DFSOutputStream实现源码如下:

private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag,
		Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {
	this(dfsClient, src, progress, stat, checksum);
	this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
	
    //计算写入数据包的大小,默认每个packetSize大小为64kb
	computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
	 
	//创建 DataStreamer 对象负责 向HDFS中写入数据
	streamer = new DataStreamer(stat, null);
	if (favoredNodes != null && favoredNodes.length != 0) {
		streamer.setFavoredNodes(favoredNodes);
	}
}

3.x对应源码有一些小区别

protected DFSOutputStream(DFSClient dfsClient, String src,
    HdfsFileStatus stat, EnumSet flag, Progressable progress,
    DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
    ... ...
    computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
        bytesPerChecksum);
    ... ...
    streamer = new DataStreamer(stat, null, dfsClient, src, progress,
        checksum, cachingStrategy, byteArrayManager, favoredNodes,
        addBlockFlags);
    ... ...
}

可以看到,在DFSOutputStream创建同时,获取了后续写入数据时的packet大小(默认为64K),并给其streamer属性初始化了DataStreamer值(DataStreamer是一个线程)。

当创建好 DFSOutputStream对象后赋值给out对象,当执行out.start()方法时,实际上执行的就是streamer.start,由于DataStreamer是一个线程,所以最终调用到其中的run方法。

我们可以先看一下源码注释:

DFSOutputStream creates files from a stream of bytes.

The client application writes data that is cached internally by this stream.Data is broken up into packets, each packet is typically 64K in size. A packet comprises of chunks. Each chunk is typically 512 bytes and has an associated checksum with it.

 
When a client application fills up the currentPacket, it is enqueued into dataQueue. The DataStreamer thread picks up packets from the dataQueue, sends it to the first datanode in the pipeline and moves it from the dataQueue to the ackQueue. The ResponseProcessor receives acks from the datanodes. When an successful ack for a packet is received from all datanodes, the ResponseProcessor removes the corresponding packet from the ackQueue.

In case of error, all outstanding packets and moved from ackQueue. A new pipeline is setup by eliminating the bad datanode from the original pipeline.The DataStreamer now starts sending packets from the dataQueue.

翻译:

DFSOutputStream 从字节流创建文件。

 

客户端应用程序写入的数据由该流在内部进行缓存。数据被分解为数据包,每个数据包通常大小为 64K。一个数据包由数据块组成。每个数据块通常为 512 字节,并带有相关的校验和。

 

当客户端应用程序填满当前数据包时,它会被排入数据队列。数据流式处理线程从数据队列中取出数据包,将其发送到管道中的第一个数据节点,并将其从数据队列移动到确认队列。响应处理器从数据节点接收确认。当从所有数据节点收到一个数据包的成功确认时,响应处理器从确认队列中删除相应的数据包。

 

在出现错误的情况下,所有未完成的数据包都会从确认队列中移出。通过从原始管道中排除有问题的数据节点来设置新的管道。数据流式处理线程现在开始从数据队列发送数据包。

DataStreamer.run方法的源码如下:

/*
 * streamer thread is the only thread that opens streams to datanode, and closes
 * them. Any error recovery is also done by this thread.
 * 数据流式处理线程是唯一打开与数据节点的流并关闭它们的线程。任何错误恢复也由这个线程完成。
 */
@Override
public void run() {
	... ...
    synchronized (dataQueue) {
    // wait for a packet to be sent.
    // 等待packet 放入到  dataQueue,packet当客户端写入数据时才会放入到dataQueue
    long now = Time.monotonicNow();
    //第一次进来的时候,因为没有数据所以代码走的是这儿
    // dataQueue.size() == 0
    while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0
            && (stage != BlockConstructionStage.DATA_STREAMING
                    || stage == BlockConstructionStage.DATA_STREAMING
                            && now - lastPacket < dfsClient.getConf().socketTimeout / 2))
            || doSleep) {
        long timeout = dfsClient.getConf().socketTimeout / 2 - (now - lastPacket);
        timeout = timeout <= 0 ? 1000 : timeout;
        timeout = (stage == BlockConstructionStage.DATA_STREAMING) ? timeout : 1000;
        try {
            //如果dataQueue里面没有数据,代码就会阻塞在这儿。
            dataQueue.wait(timeout);//notify
        } catch (InterruptedException e) {
            DFSClient.LOG.warn("Caught exception ", e);
        }
        doSleep = false;
        now = Time.monotonicNow();
    }
    ... ...
    ... ...
        //获取待发送的数据包
        one = dataQueue.getFirst(); // regular data packet
        ... ...
        // get new block from namenode.
        /**
         * 建立数据管道
         * 向NameNode申请Block
         */
        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug("Allocating new block");
            }
            //步骤一:建立数据管道
            /**
             * nextBlockOutputStream 这个方法里面完成了两个事:
             * 1.向Namenode申请block
             * 2.建立数据管道
             */
            setPipeline(nextBlockOutputStream());
            //步骤二:启动了ResponseProcessor 用来监听我们一个packet发送是否成功
            initDataStreaming();
        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug("Append to block " + block);
            }
            setupPipelineForAppendOrRecovery();
            initDataStreaming();
        }

        long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
        if (lastByteOffsetInBlock > blockSize) {
            throw new IOException("BlockSize " + blockSize + " is smaller than data size. "
                    + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src);
        }

        if (one.isLastPacketInBlock()) {
            // wait for all data packets have been successfully acked
            synchronized (dataQueue) {
                while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {
                    try {
                        // wait for acks to arrive from datanodes
                        // dataQueue 中目前没有数据,进入等待状态
                        dataQueue.wait(1000);
                    } catch (InterruptedException e) {
                        DFSClient.LOG.warn("Caught exception ", e);
                    }
                }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
                continue;
            }
            stage = BlockConstructionStage.PIPELINE_CLOSE;
        }

        // send the packet
        Span span = null;
        synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) {
                span = scope.detach();
                one.setTraceSpan(span);
                //步骤三:从dataQueue把要发送的这个packet移除出去
                dataQueue.removeFirst();
                //步骤四:然后往ackQueue里面添加这个packet
                ackQueue.addLast(one);
                dataQueue.notifyAll();
            }
        }
        ... ...
        //这个就是我们写数据代码
        one.writeTo(blockStream);
        blockStream.flush();
        ... ...
}

以上代码中 dataQueue是一个Linkedlist对象,该对象会一直处于while循环中等待客户端上传文件的packet,当有数据放入该LinkedList后,会从该对象中获取一个个的packet写出到DN中。

4.向dataQueue队列中写入packet

向HDFS写入数据是通过执行自己编写代码out.write("hello chaos".getBytes())实现的。out对象为DFSOutputStream对象,所以write方法优先找该对象中的write方法,但是发现DFSOutputStream对象中没有write方法,所以找到DFSOutputStream对象的父类FSOutputSummer.write方法,因此最终执行到FSOutputSummer.write方法实现,其源码如下:

@Override
public synchronized void write(int b) throws IOException {
  buf[count++] = (byte)b;
  if(count == buf.length) {
    //刷新缓冲区,写出数据
    flushBuffer();
  }
}

以上代码中flushBuffer()实现源码如下:

protected synchronized void flushBuffer() throws IOException {
  //向packet中写入数据
  flushBuffer(false, true);
}

flushBuffer方法实现源码如下:

protected synchronized int flushBuffer(boolean keep,
    boolean flushPartial) throws IOException {
    ... ...
    // 调用writeChecksumChunks方法将缓冲区的数据写入到输出流,并进行校验和
    writeChecksumChunks(buf, 0, lenToFlush);
    ... ...
}

以上writeChecksumChunks()方法主要就是对写入buffer数据进行校验和生成并与数据一并写入packet。

writeChecksumChunks实现源码如下:

// 为给定的数据块生成校验和,并将输出块和校验和写入底层输出流
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
    ... ...
    //根据数据块的大小,计算数据块的校验和
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    .. ...
    //将当前数据块和对应的校验块写入到底层输出流中
    writeChunk(b, off + i, chunkLen, checksum, ckOffset,
        getChecksumSize());
    ... ...
}

以上代码中writeChunk()方法最终会调用到DFSOutputStream.writeChunk()实现,其源码如下:

protected synchronized void writeChunk(byte[] b, int offset, int len,
    byte[] checksum, int ckoff, int cklen) throws IOException {
    ... ...
    // 将校验和写入当前数据包
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    // 将数据块写入当前数据包
    currentPacket.writeData(b, offset, len);
    ... ...
    // 如果数据包已满,则将其排队等待传输
    enqueueCurrentPacketFull();
    ... ...
}

以上代码中,随着数据写入到packet中数据量达到默认64K时,会将packet写入到对应的dataQueue中。

enqueueCurrentPacketFull()方法实现源码如下:

synchronized void enqueueCurrentPacketFull() throws IOException {
  LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
          + " appendChunk={}, {}", currentPacket, src, getStreamer()
          .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
      getStreamer());
  //当前数据包排队等待传输
  enqueueCurrentPacket();
  adjustChunkBoundary();
  endBlock();
}

以上enqueueCurrentPacket()方法实现原理如下:

void enqueueCurrentPacket() throws IOException {
    //当前数据包排队等待传输
    getStreamer().waitAndQueuePacket(currentPacket);
    currentPacket = null;
}

waitAndQueuePacket()方法实现如下:

void waitAndQueuePacket(DFSPacket packet) throws IOException {
  synchronized (dataQueue) {
    ... ...
    //将当前packet 放入 dataQueue 中
    queuePacket(packet);
    ... ...
}

queuePacket()实现代码如下:

void queuePacket(DFSPacket packet) {
  synchronized (dataQueue) {
    if (packet == null) return;
    packet.addTraceParent(Tracer.getCurrentSpan());
    //将packet 加入到dataQueue LinkedList 中
    dataQueue.addLast(packet);
    lastQueuedSeqno = packet.getSeqno();
    LOG.debug("Queued {}, {}", packet, this);
    //notifyAll()方法通知所有正在等待dataQueue对象锁的线程,告诉它们数据队列已经有数据包放入,可以继续执行
    dataQueue.notifyAll();
  }
}

以上代码中dataQueue.addLast(packet)就是将packet 加入到dataQueue LinkedList 中,当执行到dataQueue.notifyAll()时,会通知所有正在等待dataQueue对象锁的线程,告诉它们数据队列已经有数据包放入,可以继续执行。

5.副本放置策略源码

下面我们回到DataStreamer.run方法源码,该部分代码3.x和2.x有很多区别,上面已经贴过2.x实现,下面我们看看3.x版本是如何实现的:

public void run() {
    ... ...
    synchronized (dataQueue) {
      // wait for a packet to be sent.
      //等待packet 放入到  dataQueue,packet当客户端写入数据时才会放入到dataQueue
      while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {
    ... ...
    //dataQueue 中目前没有数据,进入等待状态
    dataQueue.wait(timeout);
    ... ...
    }
    ... ...
    //获取待发送的数据包
    one = dataQueue.getFirst(); // regular data packet
    ... ...
    //构建写数据管道,通过管道连接到第一个DataNode,该DN将数据发送到管道的第二个DN,以此类推
    //nextBlockOutputStream 方法中连接NameNode申请写入数据的DataNode节点及副本分布策略,并设置客户端与第一个Block块所在的节点的socket连接
    setPipeline(nextBlockOutputStream());
    ... ...
    //将packet 以流的方式写入到DataNode节点
    sendPacket(one);
    ... ...
    //等待所有ack
    waitForAllAcks();
    ... ...
}

以上代码大体逻辑为:当dataQueue中有packet后,会执行one = dataQueue.getFirst()获取packet包并通过sendPacket(one)将packet数据写出到DataNode节点。

客户端向HDFS DataNode写入数据时,默认有3个副本,并且各个DataNode节点之间写出数据都是以pipeline方式依次传递到各个DataNode节点,所以在执行sendPacket(one)写出数据前,会执行setPipeline(nextBlockOutputStream())方法构建写数据管道,通过管道连接到第一个DataNode,将packet数据写入该节点,然后由第二个DataNode依次再将packet传递到第三个DataNode节点,副本多的依次类推。其中nextBlockOutputStream()方法中会连接NameNode申请写入数据的DataNode节点及副本分布策略,并设置客户端与第一个Block块所在的节点的socket连接,方便后续将数据写入到对应的DataNode节点。

nextBlockOutputStream()方法源码如下:

protected LocatedBlock nextBlockOutputStream() throws IOException {
    ... ...
    //locateFollowingBlock方法中向NameNode申请副本写入的DN节点信息并设置副本策略
    lb = locateFollowingBlock(
        excluded.length ]]> 0 ? excluded : null, oldBlock);
    block.setCurrentBlock(lb.getBlock());
    ... ...
    //获取Block块所在的所有节点信息
    nodes = lb.getLocations();
    ... ...
    //连接到节点列表中的第一个 DataNode 节点并建立客户端与DataNode节点的socket连接,方便后续将数据写入到DataNode
    success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
        0L, false);
    ... ...
}

以上代码中locateFollowingBlock( excluded.length > 0 ? excluded : null, oldBlock)代码中会向NameNode申请副本写入DN节点的信息并设置副本分布策略。

locateFollowingBlock()源码如下:

private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
    ExtendedBlock oldBlock) throws IOException {
  //向NameNode 添加block 块信息
  return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
      stat.getFileId(), favoredNodes, addBlockFlags);
}

以上代码中addBlock方法中会向NameNode申请block分布策略及写入DN节点信息。DFSOutputStream.addBlock()实现源码如下:

static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
    DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
    String[] favoredNodes, EnumSet allocFlags)
    throws IOException {
    ... ...
    //向NameNode申请block分布策略及写入DN节点信息
    return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
        excludedNodes, fileId, favoredNodes, allocFlags);
    ... ...
}

以上代码中dfsClient.namenode获取到NameNode Rpc Proxy,所以addBlock方法最终会调用到NameNodeRpcServer.addBlock()方法。NameNodeRpcServer.addBlock()源码如下:

//客户端写入数据向NameNode 申请block位置
@Override
public LocatedBlock addBlock(String src, String clientName,
    ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
    String[] favoredNodes, EnumSet addBlockFlags)
    throws IOException {
  //检查NameNode是否启动
  checkNNStartup();
  //getAdditionalBlock方法设置副本存储节点策略,返回的 LocatedBlock 对象中包含 block写入数据的DN节点
  LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
      clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
  if (locatedBlock != null) {
    metrics.incrAddBlockOps();
  }
  return locatedBlock;
}

以上代码namesystem.getAdditionalBlock()源码如下:

LocatedBlock getAdditionalBlock(
    String src, long fileId, String clientName, ExtendedBlock previous,
    DatanodeInfo[] excludedNodes, String[] favoredNodes,
    EnumSet flags) throws IOException {
    ... ...
    //为新数据块选择DataNode 节点,有几个副本选择几个节点
    DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
        blockManager, src, excludedNodes, favoredNodes, flags, r);
    ... ...
}

以上代码中chooseTargetForNewBlock()会为block找到存储DN节点,源码如下:

static DatanodeStorageInfo[] chooseTargetForNewBlock(
    BlockManager bm, String src, DatanodeInfo[] excludedNodes,
    String[] favoredNodes, EnumSet flags,
    ValidateAddBlockResult r) throws IOException {
    ... ...
    // 为新数据块选择目标数据节点
    return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
                                  excludedNodesSet, r.blockSize,
                                  favoredNodesList, r.storagePolicyID,
                                  r.blockType, r.ecPolicy, flags);
}

chooseTarget4NewBlock()中会为block选择目标数据节点:

public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
    final int numOfReplicas, final Node client,
    final Set excludedNodes,
    final long blocksize,
    final List favoredNodes,
    final byte storagePolicyID,
    final BlockType blockType,
    final ErasureCodingPolicy ecPolicy,
    final EnumSet flags) throws IOException {
    ... ...
    //存放数据副本的节点数组
    final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
        numOfReplicas, client, excludedNodes, blocksize, 
        favoredDatanodeDescriptors, storagePolicy, flags);
    ... ...
    //返回数据存放节点数组
    return targets;
}

以上代码blockplacement.chooseTarget()方法经过一层层对象封装,最终调用到BlockPlacementPolicyDefault.chooseTarget方法,该方法实现源码如下:

private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
                                  Node writer,
                                  List chosenStorage,
                                  boolean returnChosenNodes,
                                  Set excludedNodes,
                                  long blocksize,
                                  final BlockStoragePolicy storagePolicy,
                                  EnumSet addBlockFlags,
                                  EnumMap sTypes) {
    ... ...
    // 获取每个机架上的最大节点数
    int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
    ... ...
    List results = null;
    ... ...
    //这里的results 与  chosenStorage 完全相同,但是目前没有数据
    results = new ArrayList<]]>(chosenStorage);
    //设置副本分布并返回第一个副本要写入的DN节点
    localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
        blocksize, maxNodesPerRack, results, avoidStaleNodes,
        storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),
        sTypes);
    ... ...
    return getPipeline(
        (writer != null && writer instanceof DatanodeDescriptor) ? writer
            : localNode,
        results.toArray(new DatanodeStorageInfo[results.size()]));
}

进入以上代码中chooseTarget方法,源码如下:

private Node chooseTarget(final int numOfReplicas,
                          Node writer,
                          final Set excludedNodes,
                          final long blocksize,
                          final int maxNodesPerRack,
                          final List results,
                          final boolean avoidStaleNodes,
                          final BlockStoragePolicy storagePolicy,
                          final EnumSet unavailableStorages,
                          final boolean newBlock,
                          EnumMap storageTypes) {
    ... ...
    //准备多副本写入的DN节点分布,返回的writer为第一个副本要写入的DN节点
    writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize,
        maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes);
    ... ...
    return writer;
}

以上代码中chooseTargetInOrder中实现副本分布并返回第一个副本要写入的DN节点。chooseTargetInOrder源码如下:

protected Node chooseTargetInOrder(int numOfReplicas, 
                               Node writer,
                               final Set excludedNodes,
                               final long blocksize,
                               final int maxNodesPerRack,
                               final List results,
                               final boolean avoidStaleNodes,
                               final boolean newBlock,
                               EnumMap storageTypes)
                               throws NotEnoughReplicasException {
  // 计算结果列表的大小,默认初始 results 为0,result集合表示副本所在的节点
  final int numOfResults = results.size();
  // 如果结果列表为空
  if (numOfResults == 0) {
    // 选择本地节点作为第一个副本存储位置,并向result中加入节点
    DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
        storageTypes, true);

    //writer第一个副本要写出的DataNode节点
    writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
                                   : null;

    //减去一个副本后,如果为0则返回,writer,否则不返回,继续
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  //第一个副本所在DN节点
  final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();

  if (numOfResults <= 1) {
    //选择远程机架存放第二个副本
    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
    if (--numOfReplicas == 0) {
      //writer第一个副本要写出的DataNode节点
      return writer;
    }
  }

  if (numOfResults <= 2) {
    //第二个副本所在DN节点
    final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
    if (clusterMap.isOnSameRack(dn0, dn1)) {//如果dn0与dn1是同一机架,第三个副本选择不同机架
      chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else if (newBlock){//如果是新块,选择与dn1 第二个副本所在节点相同的机架上放第三个副本
      chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else {//随机选择一台节点存储第3个副本
      chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    }
    if (--numOfReplicas == 0) {
      //writer第一个副本要写出的DataNode节点
      return writer;
    }
  }
  //大于3个副本,随机选择节点存放副本
  chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
      maxNodesPerRack, results, avoidStaleNodes, storageTypes);
  //writer第一个副本要写出的DataNode节点
  return writer;
}

chooseTargetInOrder方法代码逻辑为block 副本找到存储节点的策略,然后返回block所在的第一个节点,首先第一个block存储在本机,第二个block存储在远程机架,第三个副本存储时先判断是否第一个副本和第二个副本是否在同一机架,如果在同一机架,那么第三个副本选择不同机架进行存储,否则选择与第二个副本相同机架的随机节点进行存储。最终该方法返回存储第一个副本的DataNode节点。

6.客户端与DataNode建立socket通信

在DataNode启动源码部分,DataNode.initDataXceiver()方法进行初始化DataXceiver服务,该服务是 DataNode 接收客户端请求的核心组件,其核心实现源码如下:

private void initDataXceiver() throws IOException {
    ... ...
    //TcpPeerServer 对象用于接收来自客户端的传输流量
    TcpPeerServer tcpPeerServer;
    ... ...
    //DataXceiverServer 是一个线程
    xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);
    //创建DataXceiverServer的后台线程,创建好DataNode后会启动
    this.dataXceiverServer = new Daemon(threadGroup, xserver);
    ... ...
}

DataNode.crateDataNode()方法中,当DataNode对象创建完成后,当执行dn.runDatanodeDaemon()时会运行DataXceiverServer对象的run方法。

DataXceiverServer.run方法实现源码如下:

public void run() {
    ... ...
    // 接受客户端的连接请求
    peer = peerServer.accept();
    ... ...
    //创建线程并传入peer参数,然后并启动,会调用到DataXceiver.run 方法
    new Daemon(datanode.threadGroup,
        DataXceiver.create(peer, datanode, this))
        .start();
    ... ...
}

以上代码中我们可以看到peerServer.accept()一直接受来自客户端传输数据socket通信,并且new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start()代码中创建了DataXceiver线程并启动,该线程主要从DataXceiverServer中读取socket传入数据并将数据写入到DataNode节点磁盘。

下面继续回到DataStreamer.nextBlockOutputStream()源码中,查看客户端与DataNode节点建立的连接。

DataStreamer.nextBlockOutputStream()方法源码如下:

protected LocatedBlock nextBlockOutputStream() throws IOException {
    ... ...
    //locateFollowingBlock方法中向NameNode申请副本写入的DN节点信息并设置副本策略
    lb = locateFollowingBlock(
        excluded.length ]]> 0 ? excluded : null, oldBlock);
    block.setCurrentBlock(lb.getBlock());
    ... ...
    //获取Block块所在的所有节点信息
    nodes = lb.getLocations();
    ... ...
    //连接到节点列表中的第一个 DataNode 节点并建立客户端与DataNode节点的socket连接,方便后续将数据写入到DataNode
    success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
        0L, false);
    ... ...
}

前面执行完locateFollowingBlock()方法,获取到了数据应该写往的DataNode节点后,后续会执行createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,0L, false)方法与第一个写出的DataNode节点建立连接。

createBlockOutputStream()实现部分源码如下:

boolean createBlockOutputStream(DatanodeInfo[] nodes,
    StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
    long newGS, boolean recoveryFlag) {
    ... ...
    // 创建客户端用于数据传输管道的Socket,这里传入的nodes[0]就是第一个DataNode节点
    s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
    ... ...
    //当输出流有数据时,通过socket将数据写出到DataNode中
    ... ... 
}

以上createSocketForPipeline(nodes[0], nodes.length, dfsClient)代码就是获取第一个写出数据的block所在的DataNode节点,并建立socket连接。

createSocketForPipeline源码如下:

static Socket createSocketForPipeline(final DatanodeInfo first,
    final int length, final DFSClient client) throws IOException {
    ... ...
    //获取第一个 DataNode节点 socket地址
    final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
    ... ...
    //客户端连接上 DataNode,DataNode 启动着DataXceiverServer 服务,该服务启动后一直会接收客户端scoket 通信
    NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(),
        conf.getSocketTimeout());
    ... ...
     return sock;
}

7.向Datanode中上传数据

回到 DataStreamr.createBlockOutputStream()方法中,核心源码如下:

boolean createBlockOutputStream(DatanodeInfo[] nodes,
    StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
    long newGS, boolean recoveryFlag) {
    ... ...
    // 创建客户端用于数据传输管道的Socket,这里传入的nodes[0]就是第一个DataNode节点
    s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
    ... ...
    // 获取未缓冲的输出流和输入流
    OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
    InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
    ... ...
    //包装 输出流 unbufOut 到 out 对象中
    out = new DataOutputStream(new BufferedOutputStream(unbufOut,
        DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
    ... ...
    //DataNode 启动着DataXceiverServer 服务,该服务启动后一直会接收客户端scoket 通信
    new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
        dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
        nodes.length, block.getNumBytes(), bytesSent, newGS,
        checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
        (targetPinnings != null && targetPinnings[0]), targetPinnings,
        nodeStorageIDs[0], nodeStorageIDs);
    ... ...
}

当写出数据的输出流out中有数据时,会通过new Sender(out).writeBlock()方法将数据发送到DataNode节点,writeBlock()实现具体源码如下:

public void writeBlock(final ExtendedBlock blk,
    final StorageType storageType,
    final Token blockToken,
    final String clientName,
    final DatanodeInfo[] targets,
    final StorageType[] targetStorageTypes,
    final DatanodeInfo source,
    final BlockConstructionStage stage,
    final int pipelineSize,
    final long minBytesRcvd,
    final long maxBytesRcvd,
    final long latestGenerationStamp,
    DataChecksum requestedChecksum,
    final CachingStrategy cachingStrategy,
    final boolean allowLazyPersist,
    final boolean pinning,
    final boolean[] targetPinnings,
    final String storageId,
    final String[] targetStorageIds) throws IOException {
    ... ...
    //包装socket 流和 操作类型 “WRITE_BLOCK” ,通过socket 发送到DataNode 节点
    send(out, Op.WRITE_BLOCK, proto.build());
}

以上send方法会将数据发送到DataNode 中,DataNode启动的DataXceiverServer 服务会接收客户端socket通信。

再次回到DataXceiverServer.run()方法源码中:

public void run() {
    ... ...
    // 接受客户端的连接请求
    peer = peerServer.accept();
    ... ...
    //创建线程并传入peer参数,然后并启动,会调用到DataXceiver.run 方法
    new Daemon(datanode.threadGroup,
        DataXceiver.create(peer, datanode, this))
        .start();
    ... ...
}

new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start()代码中会将接受到客户端的连接包装到DataXceiver线程对象中并启动,在DataXceiver.run方法中会对从客户端接收到的数据进行写出到DataNode磁盘处理。

DataXceiver.run方法源码如下:

public void run() {
    ... ...
    // 初始化操作对象
    Op op = null;
    ... ...
    // 初始化输入流
    InputStream input = socketIn;
    ... ...
    //读取客户端传入的数据给输入流赋值
    input = new BufferedInputStream(saslStreams.in,
        smallBufferSize);
    ... ...
    // 初始化DataXceiver的输入流 ,就是将 input 流赋值给了Receiver 中的 in 属性,后续使用
    super.initialize(new DataInputStream(input));
    ... ...
    //读取输入数据
    op = readOp();
    ... ...
    //处理读取过来的数据流
    processOp(op);
    ... ...
}

以上代码会将从客户端中接收过来的数据包装成数据输入流,最终执行processOp(op)写出到DataNode节点磁盘上。

processOp(op)实现源码如下,op默认从客户端传入类型值为 WRITE_BLOCK:

protected final void processOp(Op op) throws IOException {
    ... ...
    //从客户端获取过来的操作属性为 “WRITE_BLOCK”
    case WRITE_BLOCK:
      //向DataNode中写入Block块操作
      opWriteBlock(in);
      break;
    ... ...
}

opWriteBlock(in)实现代码如下:

private void opWriteBlock(DataInputStream in) throws IOException {
    ... ...
    // 调用writeBlock方法处理写入块操作
    writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
        PBHelperClient.convertStorageType(proto.getStorageType()),
        PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
        proto.getHeader().getClientName(),
        targets,
        PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
        PBHelperClient.convert(proto.getSource()),
        fromProto(proto.getStage()),
        proto.getPipelineSize(),
        proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
        proto.getLatestGenerationStamp(),
        fromProto(proto.getRequestedChecksum()),
        (proto.hasCachingStrategy() ?
            getCachingStrategy(proto.getCachingStrategy()) :
          CachingStrategy.newDefaultStrategy()),
        (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
        (proto.hasPinning() ? proto.getPinning(): false),
        (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
        proto.getStorageId(),
        proto.getTargetStorageIdsList().toArray(new String[0]));
    ... ...
}

以上 writeBlock最终调用到DataXceiver.writeBlock()方法,其源码实现如下:

public void writeBlock(...){
    ... ...
    // 创建blockReceiver 并赋值给 DataXceiver.blockReceiver,后续使用到该对象写出数据到磁盘
    setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
        peer.getRemoteAddressString(),
        peer.getLocalAddressString(),
        stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
        clientname, srcDataNode, datanode, requestedChecksum,
        cachingStrategy, allowLazyPersist, pinning, storageId));
    ... ...
    //发送数据到下游DN节点,对于下游DataNode节点,仍然要走一遍当前节点的流程,形成DataNode 依次向后写出数据
    new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
        blockToken, clientname, targets, targetStorageTypes,
        srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
        latestGenerationStamp, requestedChecksum, cachingStrategy,
        allowLazyPersist, targetPinnings[0], targetPinnings,
        targetStorageId, targetStorageIds);
    ... ...
    //receiveBlock 会接收packets 将数据写出到磁盘
    blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr,
        dataXceiverServer.getWriteThrottler(), targets, false);
    ... ...
}

以上代码中new Sender(mirrorOut).writeBlock()这部分代码是将写入到该DataNode节点的packet数据继续写往下个DataNode节点,如果block有多个副本,都是在下一个DataNode节点向后续DN节点发送写出数据。

最终执行blockReceiver.receiveBlock()代码将数据写出到磁盘中,receiverBlock()实现关键源码如下:

void receiveBlock(
    DataOutputStream mirrOut, // output to next datanode
    DataInputStream mirrIn,   // input from next datanode
    DataOutputStream replyOut,  // output to previous datanode
    String mirrAddr, DataTransferThrottler throttlerArg,
    DatanodeInfo[] downstreams,
    boolean isReplaceBlock) throws IOException {
    ... ...
    //receivePacket负责接收上游的packet
    while (receivePacket() ]]>= 0) { /* Receive until the last packet */ }
    ... ...
}

以上代码中receivePacket()会一直接受从客户端发送过来的packet并写入到DataNode节点磁盘,直到客户端数据传输完毕。

reveiverPacket()关键源码实现如下:

private int receivePacket() throws IOException {
    ... ...
    //将数据写出到DataNode节点磁盘
    streams.writeDataToDisk(dataBuf.array(),
        startByteToDisk, numBytesToDisk);
    ... ...
}

总结

我们一步步把数据上传源码梳理了一边,可以看到,HDFS确实帮我们屏蔽了很多底层的复杂实现逻辑。

今天的内容很多,有很多细节我没有去进一步拓展了,感兴趣的小伙伴可以跟着我的思路再深入拓展看看。


http://www.kler.cn/a/509005.html

相关文章:

  • Flink (九):DataStream API (六) Process Function
  • 前端小案例——网页井字棋
  • PTA L1-039 古风排版
  • 算法(蓝桥杯)贪心算法7——过河的最短时间问题解析
  • 【C++基础】enum,union,uint8_t,static
  • 构建一个简单的深度学习模型
  • wireshark上没有显示出来rtp协议如何处理
  • 群论学习笔记
  • Windows图形界面(GUI)-QT-C/C++ - Qt Table Widget详解教程
  • 【深度学习】Pytorch:在 ResNet 中加入注意力机制
  • 架构思考与实践:从通用到场景的转变
  • AI的出现,是否能替代IT从业者?
  • 如何使用Python将长图片分隔为若干张小图片
  • 数仓建模(五)选择数仓技术栈:Hive ClickHouse 其它
  • 函数(函数的概念、库函数、自定义函数、形参和实参、return语句、数组做函数参数、嵌套调用和链式访问、函数的声明和定义、static和extern)
  • Java中GUI编程和内部类的学习
  • 【SQL 中的分组查询与联合查询详解】
  • 面试经验分享-回忆版某小公司
  • Spring经典面试题
  • unfold函数
  • 什么是长连接?Netty如何设置进行长连接?
  • Docker详解与部署微服务实战
  • Ansible深度解析:如何精准区分并选用command与shell模块
  • Ruby语言的数据库交互
  • Redis 设计与实现:深入理解高性能缓存数据库
  • 【逆境中绽放:万字回顾2024我在挑战中突破自我】