/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.server.starter.helix;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SystemResourceInfo;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.request.context.ThreadTimer;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.TlsConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.core.util.TlsUtils;
import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.server.api.access.AccessControlFactory;
import org.apache.pinot.server.conf.ServerConf;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.server.starter.ServerQueriesDisabledTracker;
import org.apache.pinot.server.starter.helix.AdminApiApplication;
import org.apache.pinot.server.starter.helix.DefaultHelixStarterServerConfig;
import org.apache.pinot.server.starter.helix.OffsetBasedConsumptionStatusChecker;
import org.apache.pinot.server.starter.helix.SegmentMessageHandlerFactory;
import org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.environmentprovider.PinotEnvironmentProvider;
import org.apache.pinot.spi.environmentprovider.PinotEnvironmentProviderFactory;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseServerStarter
implements ServiceStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseServerStarter.class);
    protected String _helixClusterName;
    protected String _zkAddress;
    protected PinotConfiguration _serverConf;
    protected List<ListenerConfig> _listenerConfigs;
    protected String _hostname;
    protected int _port;
    protected String _instanceId;
    protected HelixConfigScope _instanceConfigScope;
    protected HelixManager _helixManager;
    protected HelixAdmin _helixAdmin;
    protected ServerInstance _serverInstance;
    protected AdminApiApplication _adminApiApplication;
    protected ServerQueriesDisabledTracker _serverQueriesDisabledTracker;
    protected RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState;
    protected PinotEnvironmentProvider _pinotEnvironmentProvider;

    public void init(PinotConfiguration serverConf) throws Exception {
        this._serverConf = serverConf.clone();
        this._zkAddress = this._serverConf.getProperty("pinot.zk.server");
        this._helixClusterName = this._serverConf.getProperty("pinot.cluster.name");
        ServiceStartableUtils.applyClusterConfig((PinotConfiguration)this._serverConf, (String)this._zkAddress, (String)this._helixClusterName, (ServiceRole)ServiceRole.SERVER);
        this.setupHelixSystemProperties();
        this._listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs((PinotConfiguration)this._serverConf);
        this._hostname = this._serverConf.getProperty("pinot.server.netty.host", this._serverConf.getProperty("pinot.set.instance.id.to.hostname", false) ? NetUtils.getHostnameOrAddress() : NetUtils.getHostAddress());
        this._port = this._serverConf.getProperty("pinot.server.netty.port", 8098);
        this._instanceId = this._serverConf.getProperty("pinot.server.instance.id");
        if (this._instanceId != null) {
            if (!this._instanceId.startsWith("Server_")) {
                LOGGER.warn("Instance id '{}' does not have prefix '{}'", (Object)this._instanceId, (Object)"Server_");
            }
        } else {
            this._instanceId = "Server_" + this._hostname + "_" + this._port;
            this._serverConf.addProperty("pinot.server.instance.id", (Object)this._instanceId);
        }
        this._instanceConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, new String[]{this._helixClusterName}).forParticipant(this._instanceId).build();
        this._pinotEnvironmentProvider = this.initializePinotEnvironmentProvider();
        ThreadTimer.setThreadCpuTimeMeasurementEnabled((boolean)this._serverConf.getProperty("pinot.server.instance.enableThreadCpuTimeMeasurement", false));
        DataTableBuilder.setCurrentDataTableVersion((int)this._serverConf.getProperty("pinot.server.instance.currentDataTableVersion", 3));
        LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName: {}, instanceId: {}", new Object[]{this._zkAddress, this._helixClusterName, this._instanceId});
        this._helixManager = HelixManagerFactory.getZKHelixManager((String)this._helixClusterName, (String)this._instanceId, (InstanceType)InstanceType.PARTICIPANT, (String)this._zkAddress);
    }

    @Nullable
    private PinotEnvironmentProvider initializePinotEnvironmentProvider() {
        PinotConfiguration environmentProviderConfigs = this._serverConf.subset("pinot.server.environmentProvider.factory");
        if (environmentProviderConfigs.toMap().isEmpty()) {
            LOGGER.info("No environment provider config values provided for server property: {}", (Object)"pinot.server.environmentProvider.factory");
            return null;
        }
        PinotEnvironmentProviderFactory.init((PinotConfiguration)environmentProviderConfigs);
        String environmentProviderClassName = this._serverConf.getProperty("pinot.server.environmentProvider.className");
        if (environmentProviderClassName == null) {
            LOGGER.info("No className value provided for property: {}", (Object)"pinot.server.environmentProvider.className");
            return null;
        }
        return PinotEnvironmentProviderFactory.getEnvironmentProvider((String)environmentProviderClassName.toLowerCase());
    }

    private void registerServiceStatusHandler() {
        boolean foundConsuming;
        double minResourcePercentForStartup = this._serverConf.getProperty("pinot.server.startup.minResourcePercent", 100.0);
        int realtimeConsumptionCatchupWaitMs = this._serverConf.getProperty("pinot.server.starter.realtimeConsumptionCatchupWaitMs", 0);
        ArrayList<String> resourcesToMonitor = new ArrayList<String>();
        HashSet<String> consumingSegments = new HashSet<String>();
        boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0;
        for (String resourceName : this._helixAdmin.getResourcesInCluster(this._helixClusterName)) {
            IdealState idealState;
            if (!TableNameBuilder.isTableResource((String)resourceName) || !(idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, resourceName)).isEnabled()) continue;
            for (String partitionName : idealState.getPartitionSet()) {
                if (!idealState.getInstanceSet(partitionName).contains(this._instanceId)) continue;
                resourcesToMonitor.add(resourceName);
                break;
            }
            if (!checkRealtime || !TableNameBuilder.isRealtimeTableResource((String)resourceName)) continue;
            for (String partitionName : idealState.getPartitionSet()) {
                if (!"CONSUMING".equals(idealState.getInstanceStateMap(partitionName).get(this._instanceId))) continue;
                consumingSegments.add(partitionName);
            }
        }
        ImmutableList.Builder serviceStatusCallbackListBuilder = new ImmutableList.Builder();
        serviceStatusCallbackListBuilder.add((Object)new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(this._helixManager, this._helixClusterName, this._instanceId, resourcesToMonitor, minResourcePercentForStartup));
        serviceStatusCallbackListBuilder.add((Object)new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(this._helixManager, this._helixClusterName, this._instanceId, resourcesToMonitor, minResourcePercentForStartup));
        boolean bl = foundConsuming = !consumingSegments.isEmpty();
        if (checkRealtime && foundConsuming) {
            OffsetBasedConsumptionStatusChecker consumptionStatusChecker = new OffsetBasedConsumptionStatusChecker(this._serverInstance.getInstanceDataManager(), consumingSegments);
            serviceStatusCallbackListBuilder.add((Object)new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(this._helixManager, this._helixClusterName, this._instanceId, (long)realtimeConsumptionCatchupWaitMs, consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
        }
        LOGGER.info("Registering service status handler");
        ServiceStatus.setServiceStatusCallback((String)this._instanceId, (ServiceStatus.ServiceStatusCallback)new ServiceStatus.MultipleCallbackServiceStatusCallback((List)serviceStatusCallbackListBuilder.build()));
    }

    private void updateInstanceConfigIfNeeded() {
        ZNRecord znRecord;
        Map existingEnvironmentConfigsMap;
        String failureDomain;
        Map<String, String> environmentProperties;
        InstanceConfig instanceConfig = HelixHelper.getInstanceConfig((HelixManager)this._helixManager, (String)this._instanceId);
        boolean updated = HelixHelper.updateHostnamePort((InstanceConfig)instanceConfig, (String)this._hostname, (int)this._port);
        updated |= HelixHelper.addDefaultTags((InstanceConfig)instanceConfig, () -> {
            if (ZKMetadataProvider.getClusterTenantIsolationEnabled((ZkHelixPropertyStore)this._helixManager.getHelixPropertyStore())) {
                return Arrays.asList(TagNameUtils.getOfflineTagForTenant(null), TagNameUtils.getRealtimeTagForTenant(null));
            }
            return Collections.singletonList("server_untagged");
        });
        if (this._pinotEnvironmentProvider != null && !(environmentProperties = Collections.singletonMap("failureDomain", failureDomain = this._pinotEnvironmentProvider.getFailureDomain())).equals(existingEnvironmentConfigsMap = (znRecord = instanceConfig.getRecord()).getMapField("environment"))) {
            LOGGER.info("Updating instance: {} with environment properties: {}", environmentProperties, (Object)this._instanceId);
            znRecord.setMapField("environment", environmentProperties);
            updated = true;
        }
        if (updated) {
            HelixHelper.updateInstanceConfig((HelixManager)this._helixManager, (InstanceConfig)instanceConfig);
        }
    }

    private void setupHelixSystemProperties() {
        System.setProperty("helixmanager.flappingTimeWindow", this._serverConf.getProperty("pinot.server.flapping.timeWindowMs", "1"));
    }

    private void startupServiceStatusCheck(long endTimeMs) {
        LOGGER.info("Starting startup service status check");
        long startTimeMs = System.currentTimeMillis();
        long checkIntervalMs = this._serverConf.getProperty("pinot.server.startup.serviceStatusCheckIntervalMs", 10000L);
        while (System.currentTimeMillis() < endTimeMs) {
            ServiceStatus.Status serviceStatus = ServiceStatus.getServiceStatus();
            long currentTimeMs = System.currentTimeMillis();
            if (serviceStatus == ServiceStatus.Status.GOOD) {
                LOGGER.info("Service status is GOOD after {}ms", (Object)(currentTimeMs - startTimeMs));
                return;
            }
            if (serviceStatus == ServiceStatus.Status.BAD) {
                throw new IllegalStateException("Service status is BAD");
            }
            long sleepTimeMs = Math.min(checkIntervalMs, endTimeMs - currentTimeMs);
            if (sleepTimeMs <= 0L) continue;
            LOGGER.info("Sleep for {}ms as service status has not turned GOOD: {}", (Object)sleepTimeMs, (Object)ServiceStatus.getStatusDescription());
            try {
                Thread.sleep(sleepTimeMs);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Got interrupted while checking service status", (Throwable)e);
                Thread.currentThread().interrupt();
                break;
            }
        }
        LOGGER.warn("Service status has not turned GOOD within {}ms: {}", (Object)(System.currentTimeMillis() - startTimeMs), (Object)ServiceStatus.getStatusDescription());
    }

    public ServiceRole getServiceRole() {
        return ServiceRole.SERVER;
    }

    public void start() throws Exception {
        AccessControlFactory accessControlFactory;
        LOGGER.info("Starting Pinot server");
        long startTimeMs = System.currentTimeMillis();
        TlsConfig tlsDefaults = TlsUtils.extractTlsConfig((PinotConfiguration)this._serverConf, (String)"pinot.server.tls");
        if (StringUtils.isNotBlank((CharSequence)tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank((CharSequence)tlsDefaults.getTrustStorePath())) {
            LOGGER.info("Installing default SSL context for any client requests");
            TlsUtils.installDefaultSSLSocketFactory((TlsConfig)tlsDefaults);
        }
        LOGGER.info("Initializing server instance and registering state model factory");
        Utils.logVersions();
        ControllerLeaderLocator.create((HelixManager)this._helixManager);
        ServerSegmentCompletionProtocolHandler.init((PinotConfiguration)this._serverConf.subset("pinot.server.segment.uploader"));
        ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(this._serverConf);
        this._serverInstance = new ServerInstance(serverInstanceConfig, this._helixManager);
        ServerMetrics serverMetrics = this._serverInstance.getServerMetrics();
        InstanceDataManager instanceDataManager = this._serverInstance.getInstanceDataManager();
        this.initSegmentFetcher(this._serverConf);
        SegmentOnlineOfflineStateModelFactory stateModelFactory = new SegmentOnlineOfflineStateModelFactory(this._instanceId, instanceDataManager);
        this._helixManager.getStateMachineEngine().registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), (StateModelFactory)stateModelFactory);
        this._helixManager.addPreConnectCallback(this._serverInstance::start);
        LOGGER.info("Connecting Helix manager");
        this._helixManager.connect();
        this._helixAdmin = this._helixManager.getClusterManagmentTool();
        this.updateInstanceConfigIfNeeded();
        String accessControlFactoryClass = this._serverConf.getProperty("pinot.server.admin.access.control.factory.class", "org.apache.pinot.server.api.access.AllowAllAccessFactory");
        LOGGER.info("Using class: {} as the AccessControlFactory", (Object)accessControlFactoryClass);
        try {
            accessControlFactory = (AccessControlFactory)PluginManager.get().createInstance(accessControlFactoryClass);
        }
        catch (Exception e) {
            throw new RuntimeException("Caught exception while creating new AccessControlFactory instance using class '" + accessControlFactoryClass + "'", e);
        }
        LOGGER.info("Starting server admin application on: {}", (Object)ListenerConfigUtil.toString(this._listenerConfigs));
        this._adminApiApplication = new AdminApiApplication(this._serverInstance, accessControlFactory, this._serverConf);
        this._adminApiApplication.start(this._listenerConfigs);
        Optional<ListenerConfig> adminApiHttp = this._listenerConfigs.stream().filter(listener -> "http".equals(listener.getProtocol())).findFirst();
        if (adminApiHttp.isPresent()) {
            this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("adminPort", String.valueOf(adminApiHttp.get().getPort())));
        } else {
            this._helixAdmin.removeConfig(this._instanceConfigScope, Collections.singletonList("adminPort"));
        }
        Optional<ListenerConfig> adminApiHttps = this._listenerConfigs.stream().filter(listener -> "https".equals(listener.getProtocol())).findFirst();
        if (adminApiHttps.isPresent()) {
            this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("adminHttpsPort", String.valueOf(adminApiHttps.get().getPort())));
        } else {
            this._helixAdmin.removeConfig(this._instanceConfigScope, Collections.singletonList("adminHttpsPort"));
        }
        if (serverInstanceConfig.isNettyTlsServerEnabled()) {
            this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("nettyTlsPort", String.valueOf(serverInstanceConfig.getNettyTlsPort())));
        } else {
            this._helixAdmin.removeConfig(this._instanceConfigScope, Collections.singletonList("nettyTlsPort"));
        }
        if (serverInstanceConfig.isEnableGrpcServer()) {
            this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("grpcPort", String.valueOf(serverInstanceConfig.getGrpcPort())));
        } else {
            this._helixAdmin.removeConfig(this._instanceConfigScope, Collections.singletonList("grpcPort"));
        }
        LOGGER.info("Initializing QueryRewriterFactory");
        QueryRewriterFactory.init((String)this._serverConf.getProperty("pinot.server.query.rewriter.class.names"));
        SegmentMessageHandlerFactory messageHandlerFactory = new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
        this._helixManager.getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), (MessageHandlerFactory)messageHandlerFactory);
        serverMetrics.addCallbackGauge("helix.connected", () -> this._helixManager.isConnected() ? 1L : 0L);
        this._helixManager.addPreConnectCallback(() -> serverMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
        this.registerServiceStatusHandler();
        if (this._serverConf.getProperty("pinot.server.startup.enableServiceStatusCheck", true)) {
            long endTimeMs = startTimeMs + this._serverConf.getProperty("pinot.server.startup.timeoutMs", 600000L);
            this.startupServiceStatusCheck(endTimeMs);
        }
        this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("shutdownInProgress", Boolean.toString(false)));
        this.setInstanceResourceInfo(this._helixAdmin, this._helixClusterName, this._instanceId, new SystemResourceInfo().toMap());
        LOGGER.info("Pinot server ready");
        serverMetrics.addCallbackGauge("memory.directBufferCount", PinotDataBuffer::getDirectBufferCount);
        serverMetrics.addCallbackGauge("memory.directBufferUsage", PinotDataBuffer::getDirectBufferUsage);
        serverMetrics.addCallbackGauge("memory.mmapBufferCount", PinotDataBuffer::getMmapBufferCount);
        serverMetrics.addCallbackGauge("memory.mmapBufferUsage", PinotDataBuffer::getMmapBufferUsage);
        serverMetrics.addCallbackGauge("memory.allocationFailureCount", PinotDataBuffer::getAllocationFailureCount);
        this._serverQueriesDisabledTracker = new ServerQueriesDisabledTracker(this._helixClusterName, this._instanceId, this._helixManager, serverMetrics);
        this._serverQueriesDisabledTracker.start();
        this._realtimeLuceneIndexRefreshState = RealtimeLuceneIndexRefreshState.getInstance();
        this._realtimeLuceneIndexRefreshState.start();
    }

    public void stop() {
        LOGGER.info("Shutting down Pinot server");
        long startTimeMs = System.currentTimeMillis();
        try {
            LOGGER.info("Closing PinotFS classes");
            PinotFSFactory.shutdown();
        }
        catch (IOException e) {
            LOGGER.warn("Caught exception closing PinotFS classes", (Throwable)e);
        }
        this._adminApiApplication.stop();
        this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("shutdownInProgress", Boolean.toString(true)));
        long endTimeMs = startTimeMs + this._serverConf.getProperty("pinot.server.shutdown.timeoutMs", 600000L);
        if (this._serverConf.getProperty("pinot.server.shutdown.enableQueryCheck", true)) {
            this.shutdownQueryCheck(endTimeMs);
        }
        this._helixManager.disconnect();
        this._serverInstance.shutDown();
        if (this._serverConf.getProperty("pinot.server.shutdown.enableResourceCheck", false)) {
            this.shutdownResourceCheck(endTimeMs);
        }
        this._serverQueriesDisabledTracker.stop();
        this._realtimeLuceneIndexRefreshState.stop();
        LOGGER.info("Deregistering service status handler");
        ServiceStatus.removeServiceStatusCallback((String)this._instanceId);
        LOGGER.info("Finish shutting down Pinot server for {}", (Object)this._instanceId);
    }

    private void shutdownQueryCheck(long endTimeMs) {
        long sleepTimeMs;
        long currentTimeMs;
        LOGGER.info("Starting shutdown query check");
        long startTimeMs = System.currentTimeMillis();
        long maxQueryTimeMs = this._serverConf.getProperty("pinot.server.query.executor.timeout", 15000L);
        long noQueryThresholdMs = this._serverConf.getProperty("pinot.server.shutdown.noQueryThresholdMs", maxQueryTimeMs);
        boolean noIncomingQueries = false;
        while ((currentTimeMs = System.currentTimeMillis()) < endTimeMs) {
            long noQueryTimeMs = currentTimeMs - this._serverInstance.getLatestQueryTime();
            if (noQueryTimeMs >= noQueryThresholdMs) {
                LOGGER.info("No query received within {}ms (larger than the threshold: {}ms), mark it as no incoming queries", (Object)noQueryTimeMs, (Object)noQueryThresholdMs);
                noIncomingQueries = true;
                break;
            }
            sleepTimeMs = Math.min(noQueryThresholdMs - noQueryTimeMs, endTimeMs - currentTimeMs);
            LOGGER.info("Sleep for {}ms as there are still incoming queries (no query time: {}ms is smaller than the threshold: {}ms)", new Object[]{sleepTimeMs, noQueryTimeMs, noQueryThresholdMs});
            try {
                Thread.sleep(sleepTimeMs);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Got interrupted while waiting for no incoming queries", (Throwable)e);
                Thread.currentThread().interrupt();
                break;
            }
        }
        if (noIncomingQueries) {
            long latestQueryFinishTimeMs = this._serverInstance.getLatestQueryTime() + maxQueryTimeMs;
            if (latestQueryFinishTimeMs > currentTimeMs) {
                sleepTimeMs = latestQueryFinishTimeMs - currentTimeMs;
                LOGGER.info("Sleep for {}ms to ensure all the existing queries are finished", (Object)sleepTimeMs);
                try {
                    Thread.sleep(sleepTimeMs);
                }
                catch (InterruptedException e) {
                    LOGGER.warn("Got interrupted while waiting for all the existing queries to be finished", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
            }
            LOGGER.info("Finished draining queries after {}ms", (Object)(System.currentTimeMillis() - startTimeMs));
        } else {
            LOGGER.warn("Failed to drain queries within {}ms", (Object)(System.currentTimeMillis() - startTimeMs));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownResourceCheck(long endTimeMs) {
        LOGGER.info("Starting shutdown resource check");
        long startTimeMs = System.currentTimeMillis();
        if (startTimeMs >= endTimeMs) {
            LOGGER.warn("Skipping shutdown resource check because shutdown timeout is already reached");
            return;
        }
        try (ZKHelixAdmin helixAdmin = null;){
            Iterator iterator;
            helixAdmin = new ZKHelixAdmin(this._zkAddress);
            HashSet<String> resourcesToMonitor = new HashSet<String>();
            block6: for (String resourceName : helixAdmin.getResourcesInCluster(this._helixClusterName)) {
                IdealState idealState;
                if (!TableNameBuilder.isTableResource((String)resourceName) || (idealState = helixAdmin.getResourceIdealState(this._helixClusterName, resourceName)) == null || !idealState.isEnabled()) continue;
                for (String partition : idealState.getPartitionSet()) {
                    if (!idealState.getInstanceSet(partition).contains(this._instanceId)) continue;
                    resourcesToMonitor.add(resourceName);
                    continue block6;
                }
            }
            long checkIntervalMs = this._serverConf.getProperty("pinot.server.shutdown.resourceCheckIntervalMs", 10000L);
            while (System.currentTimeMillis() < endTimeMs) {
                iterator = resourcesToMonitor.iterator();
                String currentResource = null;
                while (iterator.hasNext() && this.isResourceOffline((HelixAdmin)helixAdmin, currentResource = (String)iterator.next())) {
                    iterator.remove();
                }
                long currentTimeMs = System.currentTimeMillis();
                if (resourcesToMonitor.isEmpty()) {
                    LOGGER.info("All resources are OFFLINE after {}ms", (Object)(currentTimeMs - startTimeMs));
                    return;
                }
                long sleepTimeMs = Math.min(checkIntervalMs, endTimeMs - currentTimeMs);
                if (sleepTimeMs <= 0L) continue;
                LOGGER.info("Sleep for {}ms as some resources [{}, ...] are still ONLINE", (Object)sleepTimeMs, (Object)currentResource);
                try {
                    Thread.sleep(sleepTimeMs);
                }
                catch (InterruptedException e) {
                    LOGGER.warn("Got interrupted while waiting for all resources OFFLINE", (Throwable)e);
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            iterator = resourcesToMonitor.iterator();
            while (iterator.hasNext()) {
                if (!this.isResourceOffline((HelixAdmin)helixAdmin, (String)iterator.next())) continue;
                iterator.remove();
            }
            long currentTimeMs = System.currentTimeMillis();
            if (resourcesToMonitor.isEmpty()) {
                LOGGER.info("All resources are OFFLINE after {}ms", (Object)(currentTimeMs - startTimeMs));
            } else {
                LOGGER.warn("There are still {} resources ONLINE within {}ms: {}", new Object[]{resourcesToMonitor.size(), currentTimeMs - startTimeMs, resourcesToMonitor});
            }
        }
    }

    private boolean isResourceOffline(HelixAdmin helixAdmin, String resource) {
        ExternalView externalView = helixAdmin.getResourceExternalView(this._helixClusterName, resource);
        if (externalView == null) {
            return true;
        }
        for (String partition : externalView.getPartitionSet()) {
            Map instanceStateMap = externalView.getStateMap(partition);
            String state = (String)instanceStateMap.get(this._instanceId);
            if (!"ONLINE".equals(state) && !"CONSUMING".equals(state)) continue;
            return false;
        }
        return true;
    }

    public String getInstanceId() {
        return this._instanceId;
    }

    public PinotConfiguration getConfig() {
        return this._serverConf;
    }

    protected void setInstanceResourceInfo(HelixAdmin helixAdmin, String helixClusterName, String instanceId, Map<String, String> systemResourceMap) {
        InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(helixClusterName, instanceId);
        instanceConfig.getRecord().setMapField("SYSTEM_RESOURCE_INFO", systemResourceMap);
        helixAdmin.setInstanceConfig(helixClusterName, instanceId, instanceConfig);
    }

    private void initSegmentFetcher(PinotConfiguration config) throws Exception {
        PinotConfiguration pinotFSConfig = config.subset("pinot.server.storage.factory");
        PinotFSFactory.init((PinotConfiguration)pinotFSConfig);
        PinotConfiguration segmentFetcherFactoryConfig = config.subset("pinot.server.segment.fetcher");
        SegmentFetcherFactory.init((PinotConfiguration)segmentFetcherFactoryConfig);
        PinotConfiguration pinotCrypterConfig = config.subset("pinot.server.crypter");
        PinotCrypterFactory.init((PinotConfiguration)pinotCrypterConfig);
    }
}

