上图为使用 IDEA 2018.2 进行调试的一个截图,左上角84行出红点为一个断点,1、2、3编号的3个按钮是较为常用的按钮,作用如下:
按钮1:step over,执行到下一行,遇到方法不进入方法内部
按钮2:step into,执行到下一句代码,遇到方法则进入方法内部
按钮3:Run to cursor,执行到下一个断点处,后面没有断点则执行到结束
通过XMind记录ES启动流程的整个过程
根据上图,作者大概地把ES启动流程分为四个阶段:
Elasticsearch 解析 Command,加载配置
Bootstrap 初始化,资源检查
Node 创建节点
Bootstrap 启动节点和保活线程
Elasticsearch 解析 Command,加载配置
首先可以看一下入口方法 Elasticsearch.main:
publicstaticvoidmain(final String[] args) throws Exception {
System.setSecurityManager(new SecurityManager() {
@OverridepublicvoidcheckPermission(Permission perm) {
// grant all permissions so that we can later set the security manager to the one that we want
}
});
LogConfigurator.registerErrorListener();
final Elasticsearch elasticsearch = new Elasticsearch();
int status = main(args, elasticsearch, Terminal.DEFAULT);
if (status != ExitCodes.OK) {
exit(status);
}
}
EnvironmentAwareCommand: A cli command which requires an org.elasticsearch.env.Environment to use current paths and settings
Command: An action to execute within a cli.
可以看出Elasticsearch的一个重要作用是解析命令参数
执行带 -h 参数的Elasticsearch启动命令
可以发现这几个参数与 Cammand 类 和 Elasticsearch 的几个私有变量是对应的
Elasticsearch的构造函数如下:
Elasticsearch() {
super("starts elasticsearch", () -> {}); // weconfigurelogginglatersoweoverridethebaseclassfromconfiguringloggingversionOption = parser.acceptsAll(Arrays.asList("V", "version"), "Prints elasticsearch version information and exits");
daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"), "Starts Elasticsearch in the background")
.availableUnless(versionOption);
pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"), "Creates a pid file in the specified path on start")
.availableUnless(versionOption).withRequiredArg().withValuesConvertedBy(new PathConverter());
quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"), "Turns off standard output/error streams logging in console")
.availableUnless(versionOption).availableUnless(daemonizeOption);
}
// install SM after natives, shutdown hooks, etc.
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
创建 node 节点
node = new Node(environment) {
@OverrideprotectedvoidvalidateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
4.1, 这里直接贴一下代码(前半部分)
protected Node(final Environment environment, Collection<Class<? extendsPlugin>> classpathPlugins) {finalList<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;
{
// use temp logger just to say we are starting. we can't use it later on because the node name might not be set
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
logger.info("initializing ...");
}
try {
originalSettings = environment.settings();
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
// create the node environment as soon as possible, to recover the node id and enable loggingtry {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
resourcesToClose.add(nodeEnvironment);
} catch (IOException ex) {
thrownew IllegalStateException("Failed to create node environment", ex);
}
final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
final String nodeId = nodeEnvironment.nodeId();
tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
// this must be captured after the node name is possibly added to the settingsfinal String nodeName = NODE_NAME_SETTING.get(tmpSettings);
if (hadPredefinedNodeName == false) {
logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
} else {
logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
}
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info(
"version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
jvmInfo.pid(),
Build.CURRENT.flavor().displayName(),
Build.CURRENT.type().displayName(),
Build.CURRENT.shortHash(),
Build.CURRENT.date(),
Constants.OS_NAME,
Constants.OS_VERSION,
Constants.OS_ARCH,
Constants.JVM_VENDOR,
Constants.JVM_NAME,
Constants.JAVA_VERSION,
Constants.JVM_VERSION);
logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
if (logger.isDebugEnabled()) {
logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile());
}
this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
this.settings = pluginsService.updatedSettings();
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
// create the environment based on the finalized (processed) view of the settings// this is just to makes sure that people get the same settings, no matter where they ask them from
this.environment = new Environment(this.settings, environment.configFile());
Environment.assertEquivalent(environment, this.environment);
finalList<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
DeprecationLogger.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
finalList<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
finalList<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
client = new NodeClient(settings, threadPool);
...
Set<Bundle> seenBundles = new LinkedHashSet<>();
List<PluginInfo> modulesList = new ArrayList<>();
Set<Bundle> modules = getModuleBundles(modulesDirectory);
for (Bundle bundle : modules) {
modulesList.add(bundle.plugin);
}
seenBundles.addAll(modules);
/** Get bundles for plugins installed in the given modules directory. */static Set<Bundle> getModuleBundles(Path modulesDirectory) throws IOException {
return findBundles(modulesDirectory, "module").stream().flatMap(b -> b.bundles().stream()).collect(Collectors.toSet());
}
其中的 Bundle是一个内部类(a “bundle” is a group of plugins in a single classloader)
而 PluginInfo 则是 An in-memory representation of the plugin descriptor. 存在内存中的用来描述一个 plugin 的类
插件加载的实际代码如下:
/**
* Reads the plugin descriptor file.
*
* @param path the path to the root directory for the plugin
* @return the plugin info
* @throws IOException if an I/O exception occurred reading the plugin descriptor
*/publicstatic PluginInfo readFromProperties(final Path path) throws IOException {
final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);
final Map<String, String> propsMap;
{
final Properties props = new Properties();
try (InputStream stream = Files.newInputStream(descriptor)) {
props.load(stream);
}
propsMap = props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(), props::getProperty));
}
final String name = propsMap.remove("name");
if (name == null || name.isEmpty()) {
thrownew IllegalArgumentException(
"property [name] is missing in [" + descriptor + "]");
}
final String description = propsMap.remove("description");
if (description == null) {
thrownew IllegalArgumentException(
"property [description] is missing for plugin [" + name + "]");
}
final String version = propsMap.remove("version");
if (version == null) {
thrownew IllegalArgumentException(
"property [version] is missing for plugin [" + name + "]");
}
final String esVersionString = propsMap.remove("elasticsearch.version");
if (esVersionString == null) {
thrownew IllegalArgumentException(
"property [elasticsearch.version] is missing for plugin [" + name + "]");
}
final Version esVersion = Version.fromString(esVersionString);
final String javaVersionString = propsMap.remove("java.version");
if (javaVersionString == null) {
thrownew IllegalArgumentException(
"property [java.version] is missing for plugin [" + name + "]");
}
JarHell.checkVersionFormat(javaVersionString);
final String classname = propsMap.remove("classname");
if (classname == null) {
thrownew IllegalArgumentException(
"property [classname] is missing for plugin [" + name + "]");
}
final String extendedString = propsMap.remove("extended.plugins");
final List<String> extendedPlugins;
if (extendedString == null) {
extendedPlugins = Collections.emptyList();
} else {
extendedPlugins = Arrays.asList(Strings.delimitedListToStringArray(extendedString, ","));
}
final String hasNativeControllerValue = propsMap.remove("has.native.controller");
finalboolean hasNativeController;
if (hasNativeControllerValue == null) {
hasNativeController = false;
} else {
switch (hasNativeControllerValue) {
case"true":
hasNativeController = true;
break;
case"false":
hasNativeController = false;
break;
default:
final String message = String.format(
Locale.ROOT,
"property [%s] must be [%s], [%s], or unspecified but was [%s]",
"has_native_controller",
"true",
"false",
hasNativeControllerValue);
thrownew IllegalArgumentException(message);
}
}
if (esVersion.before(Version.V_6_3_0) && esVersion.onOrAfter(Version.V_6_0_0_beta2)) {
propsMap.remove("requires.keystore");
}
if (propsMap.isEmpty() == false) {
thrownew IllegalArgumentException("Unknown properties in plugin descriptor: " + propsMap.keySet());
}
returnnew PluginInfo(name, description, version, esVersion, javaVersionString,
classname, extendedPlugins, hasNativeController);
}
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
super(settings);
assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) {
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
}
builders.put(builder.name(), builder);
}
this.builders = Collections.unmodifiableMap(builders);
threadContext = new ThreadContext(settings);
final Map<String, ExecutorHolder> executors = new HashMap<>();
for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
if (executors.containsKey(executorHolder.info.getName())) {
throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
}
logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
executors.put(entry.getKey(), executorHolder);
}
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = unmodifiableMap(executors);
this.scheduler = Scheduler.initScheduler(settings);
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}
validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
.filterPlugins(Plugin
.class)
.stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
clusterService.addStateApplier(transportService.getTaskManager());
// start after transport service so the local disco is known
discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
clusterService.start();
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();
discovery.startInitialJoin();
// tribe nodes don't have a master so we shouldn't register an observer sfinal TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
if (initialStateTimeout.millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
if (clusterState.nodes().getMasterNodeId() == null) {
logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
final CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@OverridepublicvoidonNewClusterState(ClusterState state) { latch.countDown(); }
@OverridepublicvoidonClusterServiceClose() {
latch.countDown();
}
@OverridepublicvoidonTimeout(TimeValue timeout) {
logger.warn("timed out while waiting for initial discovery state - timeout: {}",
initialStateTimeout);
latch.countDown();
}
}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
try {
latch.await();
} catch (InterruptedException e) {
thrownew ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
}
}
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).start();
}
if (WRITE_PORTS_FILE_SETTING.get(settings)) {
if (NetworkModule.HTTP_ENABLED.get(settings)) {
HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
writePortsFile("http", http.boundAddress());
}
TransportService transport = injector.getInstance(TransportService.class);
writePortsFile("transport", transport.boundAddress());
}