public
NameNodeRpcServer(Configuration conf, NameNode nn)
throws
IOException {
this
.nn = nn;
this
.namesystem = nn.getNamesystem();
this
.metrics = NameNode.getNameNodeMetrics();
int
handlerCount =
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
RPC.setProtocolEngine(conf,ClientNamenodeProtocolPB.
class
,ProtobufRpcEngine.
class
);
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
new
ClientNamenodeProtocolServerSideTranslatorPB(
this
);
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
new
DatanodeProtocolServerSideTranslatorPB(
this
);
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new
NamenodeProtocolServerSideTranslatorPB(
this
);
BlockingService NNPbService = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =
new
RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(
this
);
BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
.newReflectiveBlockingService(refreshAuthPolicyXlator);
RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =
new
RefreshUserMappingsProtocolServerSideTranslatorPB(
this
);
BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
.newReflectiveBlockingService(refreshUserMappingXlator);
GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
new
GetUserMappingsProtocolServerSideTranslatorPB(
this
);
BlockingService getUserMappingService = GetUserMappingsProtocolService
.newReflectiveBlockingService(getUserMappingXlator);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new
HAServiceProtocolServerSideTranslatorPB(
this
);
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
WritableRpcEngine.ensureInitialized();
InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
if
(dnSocketAddr !=
null
) {
int
serviceHandlerCount =
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this
.serviceRpcServer =
RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
ClientNamenodeProtocolPB.
class
, clientNNPbService,
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
serviceHandlerCount,
false
, conf, namesystem.getDelegationTokenSecretManager());
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.
class
, haPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.
class
, NNPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.
class
, dnProtoPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.
class
,
refreshAuthService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.
class
,
refreshUserMappingService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.
class
,
getUserMappingService, serviceRpcServer);
this
.serviceRPCAddress =
this
.serviceRpcServer.getListenerAddress();
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
}
else
{
serviceRpcServer =
null
;
serviceRPCAddress =
null
;
}
this
.clientRpcServer = RPC.getServer(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.
class
,
clientNNPbService, socAddr.getHostName(),
socAddr.getPort(), handlerCount,
false
, conf,
namesystem.getDelegationTokenSecretManager());
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.
class
, haPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.
class
, NNPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.
class
, dnProtoPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.
class
,
refreshAuthService, clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.
class
,
refreshUserMappingService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.
class
,
getUserMappingService, clientRpcServer);
if
(serviceAuthEnabled =
conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
false
)) {
this
.clientRpcServer.refreshServiceAcl(conf,
new
HDFSPolicyProvider());
if
(
this
.serviceRpcServer !=
null
) {
this
.serviceRpcServer.refreshServiceAcl(conf,
new
HDFSPolicyProvider());
}
}
this
.clientRpcAddress =
this
.clientRpcServer.getListenerAddress();
nn.setRpcServerAddress(conf, clientRpcAddress);
this
.minimumDataNodeVersion = conf.get(
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
}