之前接触过一段时间的HDFS,但是没有深入了解原理,今天有时间了整理一下。
FileSystem简介
FileSystem是hadoop定义的抽象的文件系统,而hdfs只是其中实现的文件系统之一。FileSystem提供了很多操作文件和文件夹的方法,如创建文件,删除文件,重命名,获取文件夹下面的子文件及判断是否是目录等。此外还提供了读取文件,追加内容的功能,但是hdfs暂时不支持追加功能。
FileSystem使用
使用FileSystem.get方法获取FileSystem对象.
第一个方法是用过Configuration对象获取,会使用conf中设置的fs.defaultFS值作为URI,如果没有指定就会使用本地文件。
public static FileSystem get (Configuration conf) throws IOException {
return get (getDefaultUri(conf), conf);
}
public static URI getDefaultUri (Configuration conf) {
return URI.create(fixName(conf.get ("fs.defaultFS" , "file:///" )));
}
第二个重写方法是通过URI和Configuration对象获取,该方法首先获取uri的scheme和authority属性
,如果都为空,调用第一个方法,如果只有authority 为空会检验conf中fs.defaultFS属性的scheme与scheme是否一致,一致的情况下使用fs.defaultFS值。conf中fs.scheme.impl.disable.cache属性是禁用文件系统缓存,默认为false,如果禁用缓存每次都会重新初始化FileSystem对象,否则调用CACHE.get(uri, conf)获取FileSystem对象。
注意: 禁用缓存后生成的FileSystem需要手动关闭 。
public static FileSystem get (URI uri, Configuration conf) {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null ) {
return get (conf);
}
if (scheme != null && authority == null ) {
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme())
&& defaultUri.getAuthority() != null ) {
return get (defaultUri, conf);
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache" , scheme);
if (conf.getBoolean(disableCacheName, false )) {
return createFileSystem(uri, conf);
}
return CACHE.get (uri, conf);
}
createFileSystem方法里面就是根据uri的scheme属性获取需要的class对象,然后通过反射进行初始化。
private static FileSystem createFileSystem (URI uri, Configuration conf
) throws IOException {
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}
getFileSystemClass方法首先判断是否已经调用了loadFileSystems方法,如果已经调用过,首先会从conf对象中获取fs.scheme .impl属性,没有获取到就会从SERVICE_FILE_SYSTEMS缓存中查找。
public static Class <? extends FileSystem > getFileSystemClass (String scheme ,
Configuration conf ) throws IOException {
if (!FILE_SYSTEMS_LOADED) {
loadFileSystems();
}
Class <? extends FileSystem > clazz = null ;
if (conf != null ) {
clazz = (Class <? extends FileSystem >) conf .getClass ("fs ." + scheme + ".impl ", null );
}
if (clazz == null ) {
clazz = SERVICE_FILE_SYSTEMS.get(scheme);
}
if (clazz == null ) {
throw new IOException("No FileSystem for scheme: " + scheme);
}
return clazz;
}
下面看loadFileSystems方法,由于FILE_SYSTEMS_LOADED是静态属性,多线程下修改会有线程安全问题,所以会加锁。这个方法里面会调用ServiceLoader.load(Class service) 方法,该方法是jdk里面提供的查找META-INF/services/org.apache.hadoop.fs.FileSystem 文件里面的内容,并全部加载到SERVICE_FILE_SYSTEMS,这里有一点不清楚的地方,就是FileSystem对象已经初始化了,为什么缓存的只是class对象。
private static void loadFileSystems () {
synchronized (FileSystem.class) {
if (!FILE_SYSTEMS_LOADED) {
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
Iterator<FileSystem> it = serviceLoader.iterator();
while (it.hasNext()) {
FileSystem fs = null ;
try {
fs = it.next();
try {
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
} catch (Exception e) {
LOG.warn("Cannot load: " + fs + " from " +
ClassUtil.findContainingJar(fs.getClass()), e);
}
} catch (ServiceConfigurationError ee) {
LOG.warn("Cannot load filesystem" , ee);
}
}
FILE_SYSTEMS_LOADED = true ;
}
}
}
hadoop-common-2.7.3.jar里面包含这些
org.apache .hadoop .fs .LocalFileSystem
org.apache .hadoop .fs .viewfs .ViewFileSystem
org.apache .hadoop .fs .ftp .FTPFileSystem
org.apache .hadoop .fs .HarFileSystem
hadoop-hdfs-2.7.3.jar里面包含这些
org.apache .hadoop .hdfs .DistributedFileSystem
org.apache .hadoop .hdfs .web .HftpFileSystem
org.apache .hadoop .hdfs .web .HsftpFileSystem
org.apache .hadoop .hdfs .web .WebHdfsFileSystem
org.apache .hadoop .hdfs .web .SWebHdfsFileSystem
fs.initialize方法,初始化了两个变量statistics与resolveSymlinks ,statistics对象是用来统计的,统计读写次数以及流量,resolveSymlinks字段没有发现使用的地方。根据上面几个方法可以看出URI 只使用到了scheme与authority属性,所以没有必要写具体要操作的路径。
public void initialize (URI name, Configuration conf) throws IOException {
statistics = getStatistics(name.getScheme(), getClass());
resolveSymlinks = conf.getBoolean(
CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
}
使用FileSystem.newInstance获取FileSystem对象
与get方法类似返回FileSystem 对象,不同的是这个方法每次都返回新的对象,也就是没有缓存,下面来具体的分析下。方法大致相似,不同点是最后调用的是CACHE.getUnique(uri, conf),顾名思义,这个方法返回的都是独立的对象
public static FileSystem newInstance (URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null ) {
return newInstance(conf);
}
if (authority == null ) {
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme())
&& defaultUri.getAuthority() != null ) {
return newInstance(defaultUri, conf);
}
}
return CACHE.getUnique(uri, conf);
}
public static FileSystem newInstance (Configuration conf) throws IOException {
return newInstance(getDefaultUri(conf), conf);
}
Cache类简介
Cache类的功能主要是缓存和自动关闭FileSystem对象。
get方法与getUnique最大的区别是生成的Key对象时调用的构造方法不一样,后面分析Key的构造函数有什么不同。最后都是调用的getInternal方法返回FileSystem对象。
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
}
FileSystem getUnique(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf, unique.getAndIncrement());
return getInternal(uri, conf, key);
}
getInternal方法先根据key查看缓存中是否存在,存在直接返回,不存在就会重新创建并存放入缓存,如果设置了自动关闭属性fs.automatic.close 还会把生成的对象放入自动关闭的集合中。
由于存在多线程访问时的线程安全问题,所以从缓存中获取时有了加锁操作,加上createFileSystem方法创建新的连接是耗时操作,所以拆分成两个加锁代码块。
这里还有一个逻辑,当缓存为空时,会在全局的钩子函数管理对象中添加一个钩子函数,该函数只有一个操作,就是调用closeAll(true)方法,会把自动关闭的集合保存的对象全部关闭。所以如果设置自动关闭的属性为false,需要在最后手动关闭FileSystem对象。
private FileSystem getInternal (URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs;
synchronized (this ) {
fs = map.get(key);
}
if (fs != null ) {
return fs;
}
fs = createFileSystem(uri, conf);
synchronized (this ) {
FileSystem oldfs = map.get(key);
if (oldfs != null ) {
fs.close();
return oldfs;
}
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
}
fs.key = key;
map.put(key, fs);
if (conf.getBoolean("fs.automatic.close" , true )) {
toAutoClose.add(key);
}
return fs;
}
}
Key类简介
该类作为缓存中的key对象,逻辑很简单,有两个构造函数,不同点是第一个构造函数中unique值默认给的0,而第二个调用时传的是自动ID。
final String scheme;
final String authority;
final UserGroupInformation ugi;
final long unique;
Key(URI uri, Configuration conf) throws IOException {
this (uri, conf, 0 );
}
Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null ?
"" : StringUtils.toLowerCase(uri.getScheme());
authority = uri.getAuthority()==null ?
"" : StringUtils.toLowerCase(uri.getAuthority());
this .unique = unique;
this .ugi = UserGroupInformation.getCurrentUser();
}
看一下equals和hashCode方法,可以看的equals方法中判断了unique是否一致,也就是说unique不一致,两个key就不会相同,所以FileSystem.newInstance 方法每次都产生新的对象。
public int hashCode () {
return (scheme + authority).hashCode() + ugi.hashCode() + (int )unique;
}
public boolean equals (Object obj) {
if (obj == this ) {
return true ;
}
if (obj != null && obj instanceof Key) {
Key that = (Key)obj;
return isEqual(this .scheme, that.scheme)
&& isEqual(this .authority, that.authority)
&& isEqual(this .ugi, that.ugi)
&& (this .unique == that.unique);
}
return false ;
}
使用FileSystem获取流
open方法用于获取文件流,FileSystem对象只是提供一个抽象方法,具体获取流的方法需要子类去实现。
public abstract FSDataInputStream open (Path f, int bufferSize)
throws IOException;
public FSDataInputStream open (Path f) throws IOException {
return open(f, getConf().getInt("io.file.buffer.size" , 4096 ));
}
下面是读取hdfs文件流并打印到控制台的两个demo
demo1是利用jdk里面提供的URL类读取流的,使用前需要指定URLStreamHandlerFactory为hdfs里面提供的FsUrlStreamHandlerFactory方法。
demo2是直接使用FileSystem对象获取流。
public static void readMethod1 (){
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
String url = "hdfs://master:9000/test/map/input/temperature.txt" ;
InputStream is = null ;
try {
is = new URL(url).openStream();
IOUtils.copyBytes(is , System.out , 4096 , false );
} catch (Exception e) {
e.printStackTrace();
}finally {
IOUtils.closeStream(is );
}
}
public static void readMethod2 () throws IOException{
FileSystem fs = FileSystem.get (URI.create("hdfs://master:9000" ), new Configuration());
FSDataInputStream is = null ;
try {
is = fs.open(new Path("/test/map/input/temperature.txt" ));
IOUtils.copyBytes(is , System.out , 4096 , false );
} catch (IllegalArgumentException e) {
e.printStackTrace();
}finally {
if (is != null ){
is .close();
}
if (fs != null ){
fs.close();
}
}
}
可以看到URL类里面的openStream方法最后调用handler.openConnection.getInputStream 方法
public final InputStream openStream () throws java.io.IOException {
return openConnection().getInputStream();
}
public URLConnection openConnection () throws java.io.IOException {
return handler.openConnection(this );
}
openConnection返回FsUrlConnection对象
protected FsUrlConnection openConnection (URL url) throws IOException {
return new FsUrlConnection(conf, url);
}
FsUrlConnection对象的getInputStream 方法调用connect 方法,可以看到最后获取FileSystem对象,使用FileSystem对象的open(Path path)方法获取流
public void connect () throws IOException {
try {
FileSystem fs = FileSystem.get (url.toURI(), conf);
is = fs.open(new Path(url.getPath()));
} catch (URISyntaxException e) {
throw new IOException(e.toString());
}
}
@Override
public InputStream getInputStream () throws IOException {
if (is == null ) {
connect();
}
return is ;
}
参考资料
《Hadoop权威指南》