hadoop2.7.3版本FileSystem调用过程(1)

论坛 期权论坛 脚本     
匿名网站用户   2020-12-19 17:01   11   0

之前接触过一段时间的HDFS,但是没有深入了解原理,今天有时间了整理一下。


FileSystem简介

FileSystem是hadoop定义的抽象的文件系统,而hdfs只是其中实现的文件系统之一。FileSystem提供了很多操作文件和文件夹的方法,如创建文件,删除文件,重命名,获取文件夹下面的子文件及判断是否是目录等。此外还提供了读取文件,追加内容的功能,但是hdfs暂时不支持追加功能。

FileSystem使用

使用FileSystem.get方法获取FileSystem对象.

第一个方法是用过Configuration对象获取,会使用conf中设置的fs.defaultFS值作为URI,如果没有指定就会使用本地文件。

public static FileSystem get(Configuration conf) throws IOException {
    return get(getDefaultUri(conf), conf);
}
public static URI getDefaultUri(Configuration conf) {
   return URI.create(fixName(conf.get("fs.defaultFS", "file:///")));
}

第二个重写方法是通过URI和Configuration对象获取,该方法首先获取uri的scheme和authority属性
,如果都为空,调用第一个方法,如果只有authority 为空会检验conf中fs.defaultFS属性的scheme与scheme是否一致,一致的情况下使用fs.defaultFS值。conf中fs.scheme.impl.disable.cache属性是禁用文件系统缓存,默认为false,如果禁用缓存每次都会重新初始化FileSystem对象,否则调用CACHE.get(uri, conf)获取FileSystem对象。

注意:禁用缓存后生成的FileSystem需要手动关闭

    public static FileSystem get(URI uri, Configuration conf) {
    String scheme = uri.getScheme();//协议
    String authority = uri.getAuthority();//host:port

    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }

    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }

    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }

createFileSystem方法里面就是根据uri的scheme属性获取需要的class对象,然后通过反射进行初始化。

private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
}

getFileSystemClass方法首先判断是否已经调用了loadFileSystems方法,如果已经调用过,首先会从conf对象中获取fs.scheme .impl属性,没有获取到就会从SERVICE_FILE_SYSTEMS缓存中查找。

public static Class<? extends FileSystem> getFileSystemClass(String scheme,
      Configuration conf) throws IOException {
    if (!FILE_SYSTEMS_LOADED) {
      loadFileSystems();
    }
    Class<? extends FileSystem> clazz = null;
    if (conf != null) {
      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
    }
    if (clazz == null) {
      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
    }
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + scheme);
    }
    return clazz;
  }

下面看loadFileSystems方法,由于FILE_SYSTEMS_LOADED是静态属性,多线程下修改会有线程安全问题,所以会加锁。这个方法里面会调用ServiceLoader.load(Class service) 方法,该方法是jdk里面提供的查找META-INF/services/org.apache.hadoop.fs.FileSystem 文件里面的内容,并全部加载到SERVICE_FILE_SYSTEMS,这里有一点不清楚的地方,就是FileSystem对象已经初始化了,为什么缓存的只是class对象。

private static void loadFileSystems() {
    synchronized (FileSystem.class) {
      if (!FILE_SYSTEMS_LOADED) {
        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
        Iterator<FileSystem> it = serviceLoader.iterator();
        while (it.hasNext()) {
          FileSystem fs = null;
          try {
            fs = it.next();
            try {
              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
            } catch (Exception e) {
              LOG.warn("Cannot load: " + fs + " from " +
                  ClassUtil.findContainingJar(fs.getClass()), e);
            }
          } catch (ServiceConfigurationError ee) {
            LOG.warn("Cannot load filesystem", ee);
          }
        }
        FILE_SYSTEMS_LOADED = true;
      }
    }
  }

hadoop-common-2.7.3.jar里面包含这些

org.apache.hadoop.fs.LocalFileSystem
org.apache.hadoop.fs.viewfs.ViewFileSystem
org.apache.hadoop.fs.ftp.FTPFileSystem
org.apache.hadoop.fs.HarFileSystem

hadoop-hdfs-2.7.3.jar里面包含这些

org.apache.hadoop.hdfs.DistributedFileSystem
org.apache.hadoop.hdfs.web.HftpFileSystem
org.apache.hadoop.hdfs.web.HsftpFileSystem
org.apache.hadoop.hdfs.web.WebHdfsFileSystem
org.apache.hadoop.hdfs.web.SWebHdfsFileSystem

fs.initialize方法,初始化了两个变量statistics与resolveSymlinks ,statistics对象是用来统计的,统计读写次数以及流量,resolveSymlinks字段没有发现使用的地方。根据上面几个方法可以看出URI 只使用到了scheme与authority属性,所以没有必要写具体要操作的路径。

public void initialize(URI name, Configuration conf) throws IOException {
    statistics = getStatistics(name.getScheme(), getClass());    
    resolveSymlinks = conf.getBoolean(
        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
  }

使用FileSystem.newInstance获取FileSystem对象

与get方法类似返回FileSystem 对象,不同的是这个方法每次都返回新的对象,也就是没有缓存,下面来具体的分析下。方法大致相似,不同点是最后调用的是CACHE.getUnique(uri, conf),顾名思义,这个方法返回的都是独立的对象

public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    if (scheme == null) {                       // no scheme: use default FS
      return newInstance(conf);
    }

    if (authority == null) {                       // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return newInstance(defaultUri, conf);              // return default
      }
    }
    return CACHE.getUnique(uri, conf);
}
public static FileSystem newInstance(Configuration conf) throws IOException {
    return newInstance(getDefaultUri(conf), conf);
}

Cache类简介

Cache类的功能主要是缓存和自动关闭FileSystem对象。
get方法与getUnique最大的区别是生成的Key对象时调用的构造方法不一样,后面分析Key的构造函数有什么不同。最后都是调用的getInternal方法返回FileSystem对象。

FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
}

FileSystem getUnique(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf, unique.getAndIncrement());
      return getInternal(uri, conf, key);
}

getInternal方法先根据key查看缓存中是否存在,存在直接返回,不存在就会重新创建并存放入缓存,如果设置了自动关闭属性fs.automatic.close 还会把生成的对象放入自动关闭的集合中。
由于存在多线程访问时的线程安全问题,所以从缓存中获取时有了加锁操作,加上createFileSystem方法创建新的连接是耗时操作,所以拆分成两个加锁代码块。
这里还有一个逻辑,当缓存为空时,会在全局的钩子函数管理对象中添加一个钩子函数,该函数只有一个操作,就是调用closeAll(true)方法,会把自动关闭的集合保存的对象全部关闭。所以如果设置自动关闭的属性为false,需要在最后手动关闭FileSystem对象。

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      FileSystem fs;
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }

      fs = createFileSystem(uri, conf);
      synchronized (this) { // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty()
                && !ShutdownHookManager.get().isShutdownInProgress()) {
          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
        }
        fs.key = key;
        map.put(key, fs);
        if (conf.getBoolean("fs.automatic.close", true)) {
          toAutoClose.add(key);
        }
        return fs;
      }
    }

Key类简介

该类作为缓存中的key对象,逻辑很简单,有两个构造函数,不同点是第一个构造函数中unique值默认给的0,而第二个调用时传的是自动ID。

final String scheme;
final String authority;
final UserGroupInformation ugi;
final long unique;   // an artificial way to make a key unique

Key(URI uri, Configuration conf) throws IOException {
        this(uri, conf, 0);
}
Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null ?
            "" : StringUtils.toLowerCase(uri.getScheme());
        authority = uri.getAuthority()==null ?
            "" : StringUtils.toLowerCase(uri.getAuthority());
        this.unique = unique;
        this.ugi = UserGroupInformation.getCurrentUser();
}

看一下equals和hashCode方法,可以看的equals方法中判断了unique是否一致,也就是说unique不一致,两个key就不会相同,所以FileSystem.newInstance 方法每次都产生新的对象。

public int hashCode() {
   return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
}
public boolean equals(Object obj) {
   if (obj == this) {
      return true;
   }
   if (obj != null && obj instanceof Key) {
      Key that = (Key)obj;
      return isEqual(this.scheme, that.scheme)
             && isEqual(this.authority, that.authority)
             && isEqual(this.ugi, that.ugi)
             && (this.unique == that.unique);
        }
    return false;        
}

使用FileSystem获取流

open方法用于获取文件流,FileSystem对象只是提供一个抽象方法,具体获取流的方法需要子类去实现。

public abstract FSDataInputStream open(Path f, int bufferSize)
 throws IOException;

public FSDataInputStream open(Path f) throws IOException {
  return open(f, getConf().getInt("io.file.buffer.size", 4096));
}

下面是读取hdfs文件流并打印到控制台的两个demo
demo1是利用jdk里面提供的URL类读取流的,使用前需要指定URLStreamHandlerFactory为hdfs里面提供的FsUrlStreamHandlerFactory方法。
demo2是直接使用FileSystem对象获取流。

public static void readMethod1(){
    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    String url = "hdfs://master:9000/test/map/input/temperature.txt";
    InputStream is = null;
    try {
        is = new URL(url).openStream();
        IOUtils.copyBytes(is, System.out, 4096, false);
    } catch (Exception e) {
        e.printStackTrace();
    }finally{
        IOUtils.closeStream(is);
    }
}
public static void readMethod2() throws IOException{
        FileSystem fs = FileSystem.get(URI.create("hdfs://master:9000"), new Configuration());
        FSDataInputStream is = null;
        try {
            is = fs.open(new Path("/test/map/input/temperature.txt"));
            IOUtils.copyBytes(is, System.out, 4096, false);
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        }finally{
            if(is != null){
                is.close();
            }
            if(fs != null){
                fs.close();
            }
        }
    }

可以看到URL类里面的openStream方法最后调用handler.openConnection.getInputStream 方法

public final InputStream openStream() throws java.io.IOException {
        return openConnection().getInputStream();
}
public URLConnection openConnection() throws java.io.IOException {
        return handler.openConnection(this);
}

openConnection返回FsUrlConnection对象

protected FsUrlConnection openConnection(URL url) throws IOException {
  return new FsUrlConnection(conf, url);
}

FsUrlConnection对象的getInputStream 方法调用connect 方法,可以看到最后获取FileSystem对象,使用FileSystem对象的open(Path path)方法获取流

 public void connect() throws IOException {
    try {
      FileSystem fs = FileSystem.get(url.toURI(), conf);
      is = fs.open(new Path(url.getPath()));
    } catch (URISyntaxException e) {
      throw new IOException(e.toString());
    }
  }

  @Override
  public InputStream getInputStream() throws IOException {
    if (is == null) {
      connect();
    }
    return is;
  }

参考资料
《Hadoop权威指南》

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP