package org.apache.storm.daemon.supervisor;

import com.codahale.metrics.Meter;
import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.commons.io.FileUtils;
import org.apache.storm.DaemonConfig;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.supervisor.timer.ReportWorkerHeartbeats;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
import org.apache.storm.daemon.supervisor.timer.SynchronizeAssignments;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.NotAliveException;
import org.apache.storm.generated.Supervisor;
import org.apache.storm.generated.SupervisorAssignments;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.logging.ThriftAccessLogger;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.storm.security.auth.ThriftServer;
import org.apache.storm.security.auth.workertoken.WorkerTokenAuthorizer;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ShellUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.apache.storm.utils.WrappedAuthorizationException;
import org.apache.storm.utils.WrappedNotAliveException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/supervisor/Supervisor.class */
public class Supervisor implements DaemonCommon, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
    private final Map<String, Object> conf;
    private final IContext sharedContext;
    private final IAuthorizer authorizationHandler;
    private final ISupervisor iSupervisor;
    private final Utils.UptimeComputer upTime;
    private final String stormVersion;
    private final IStormClusterState stormClusterState;
    private final LocalState localState;
    private final String supervisorId;
    private final String assignmentId;
    private final int supervisorPort;
    private final String hostName;
    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
    private final StormTimer heartbeatTimer;
    private final StormTimer workerHeartbeatTimer;
    private final StormTimer eventTimer;
    private final ExecutorService heartbeatExecutor;
    private final AsyncLocalizer asyncLocalizer;
    private final StormMetricsRegistry metricsRegistry;
    private Meter killErrorMeter;
    private final ContainerMemoryTracker containerMemoryTracker;
    private final SlotMetrics slotMetrics;
    private volatile boolean active;
    private EventManager eventManager;
    private ReadClusterState readState;
    private ThriftServer thriftServer;
    private Nimbus.Iface localNimbus;
    private Supervisor.Iface supervisorThriftInterface;

    private Supervisor(ISupervisor iSupervisor, StormMetricsRegistry stormMetricsRegistry) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
        this(ConfigUtils.readStormConfig(), null, iSupervisor, stormMetricsRegistry);
    }

    public Supervisor(Map<String, Object> map, IContext iContext, ISupervisor iSupervisor, StormMetricsRegistry stormMetricsRegistry) throws IOException, IllegalAccessException, ClassNotFoundException, InstantiationException {
        this.conf = map;
        this.metricsRegistry = stormMetricsRegistry;
        this.containerMemoryTracker = new ContainerMemoryTracker(stormMetricsRegistry);
        this.slotMetrics = new SlotMetrics(stormMetricsRegistry);
        this.iSupervisor = iSupervisor;
        this.active = true;
        this.upTime = Utils.makeUptimeComputer();
        this.stormVersion = VersionInfo.getVersion();
        this.sharedContext = iContext;
        this.heartbeatExecutor = Executors.newFixedThreadPool(1);
        this.authorizationHandler = StormCommon.mkAuthorizationHandler((String) map.get(DaemonConfig.SUPERVISOR_AUTHORIZER), map);
        if (this.authorizationHandler == null && map.get(DaemonConfig.NIMBUS_AUTHORIZER) != null) {
            throw new IllegalStateException("It looks like authorization is turned on for nimbus but not for the supervisor. ( supervisor.authorizer is not set)");
        }
        iSupervisor.prepare(map, ServerConfigUtils.supervisorIsupervisorDir(map));
        try {
            this.stormClusterState = ClusterUtils.mkStormClusterState(map, new ClusterStateContext(DaemonType.SUPERVISOR, map));
            this.currAssignment = new AtomicReference<>(new HashMap());
            try {
                this.localState = ServerConfigUtils.supervisorState(map);
                this.asyncLocalizer = new AsyncLocalizer(map, stormMetricsRegistry);
                this.supervisorId = iSupervisor.getSupervisorId();
                this.assignmentId = iSupervisor.getAssignmentId();
                this.supervisorPort = ObjectReader.getInt(map.get("supervisor.thrift.port")).intValue();
                try {
                    this.hostName = Utils.hostname();
                    this.heartbeatTimer = new StormTimer("HBTimer", new DefaultUncaughtExceptionHandler());
                    this.workerHeartbeatTimer = new StormTimer("WorkerHBTimer", new DefaultUncaughtExceptionHandler());
                    this.eventTimer = new StormTimer("EventTimer", new DefaultUncaughtExceptionHandler());
                    this.supervisorThriftInterface = createSupervisorIface();
                } catch (UnknownHostException e) {
                    throw Utils.wrapInRuntime(e);
                }
            } catch (IOException e2) {
                throw Utils.wrapInRuntime(e2);
            }
        } catch (Exception e3) {
            LOG.error("supervisor can't create stormClusterState");
            throw Utils.wrapInRuntime(e3);
        }
    }

    public static void main(String[] strArr) throws Exception {
        Utils.setupDefaultUncaughtExceptionHandler();
        new Supervisor(new StandaloneSupervisor(), new StormMetricsRegistry()).launchDaemon();
    }

    public ExecutorService getHeartbeatExecutor() {
        return this.heartbeatExecutor;
    }

    public String getId() {
        return this.supervisorId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IContext getSharedContext() {
        return this.sharedContext;
    }

    public StormMetricsRegistry getMetricsRegistry() {
        return this.metricsRegistry;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ContainerMemoryTracker getContainerMemoryTracker() {
        return this.containerMemoryTracker;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SlotMetrics getSlotMetrics() {
        return this.slotMetrics;
    }

    public Map<String, Object> getConf() {
        return this.conf;
    }

    public ISupervisor getiSupervisor() {
        return this.iSupervisor;
    }

    public Utils.UptimeComputer getUpTime() {
        return this.upTime;
    }

    public String getStormVersion() {
        return this.stormVersion;
    }

    public IStormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    public ReadClusterState getReadClusterState() {
        return this.readState;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalState getLocalState() {
        return this.localState;
    }

    public String getAssignmentId() {
        return this.assignmentId;
    }

    public int getThriftServerPort() {
        return this.supervisorPort;
    }

    public String getHostName() {
        return this.hostName;
    }

    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
        return this.currAssignment;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncLocalizer getAsyncLocalizer() {
        return this.asyncLocalizer;
    }

    EventManager getEventManger() {
        return this.eventManager;
    }

    Supervisor getSupervisor() {
        return this;
    }

    public Nimbus.Iface getLocalNimbus() {
        return this.localNimbus;
    }

    public void setLocalNimbus(Nimbus.Iface iface) {
        this.localNimbus = iface;
    }

    public void launch() throws Exception {
        LOG.info("Starting Supervisor with conf {}", ConfigUtils.maskPasswords(this.conf));
        FileUtils.cleanDirectory(new File(ServerConfigUtils.supervisorTmpDir(this.conf)));
        SupervisorHeartbeat supervisorHeartbeat = new SupervisorHeartbeat(this.conf, this);
        supervisorHeartbeat.run();
        this.heartbeatTimer.scheduleRecurring(0, ObjectReader.getInt(this.conf.get(DaemonConfig.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)).intValue(), supervisorHeartbeat);
        this.eventManager = new EventManagerImp(false);
        this.readState = new ReadClusterState(this);
        this.asyncLocalizer.start();
        if (((Boolean) this.conf.get(DaemonConfig.SUPERVISOR_ENABLE)).booleanValue()) {
            this.eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(new SynchronizeAssignments(this, null, this.readState), this.eventManager));
            this.eventTimer.scheduleRecurring(30, 30, new SupervisorHealthCheck(this));
        }
        this.workerHeartbeatTimer.scheduleRecurring(0, ObjectReader.getInt(this.conf.get("worker.heartbeat.frequency.secs")).intValue(), new ReportWorkerHeartbeats(this.conf, this));
        LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
    }

    public void launchDaemon() {
        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
        try {
            Map<String, Object> conf = getConf();
            if (ConfigUtils.isLocalMode(conf)) {
                throw new IllegalArgumentException("Cannot start server in local mode!");
            }
            launch();
            this.metricsRegistry.registerGauge("supervisor:num-slots-used-gauge", () -> {
                return Integer.valueOf(SupervisorUtils.supervisorWorkerIds(conf).size());
            });
            this.metricsRegistry.registerMeter("supervisor:num-launched").mark();
            this.metricsRegistry.registerMeter("supervisor:num-shell-exceptions", ShellUtils.numShellExceptions);
            this.metricsRegistry.registerMeter("supervisor:health-check-timeouts");
            this.killErrorMeter = this.metricsRegistry.registerMeter("supervisor:num-kill-worker-errors");
            this.metricsRegistry.registerMeter("supervisor:workerTokenAuthorizer-get-password-failures", WorkerTokenAuthorizer.getPasswordFailuresMeter());
            this.metricsRegistry.startMetricsReporters(conf);
            Utils.addShutdownHookWithForceKillIn1Sec(() -> {
                this.metricsRegistry.stopMetricsReporters();
                close();
            });
            launchSupervisorThriftServer(conf);
        } catch (Exception e) {
            LOG.error("Failed to start supervisor\n", e);
            System.exit(1);
        }
    }

    @VisibleForTesting
    public void checkAuthorization(String str) throws AuthorizationException {
        checkAuthorization(null, null, str, null);
    }

    @VisibleForTesting
    public void checkAuthorization(String str, Map<String, Object> map, String str2) throws AuthorizationException {
        checkAuthorization(str, map, str2, null);
    }

    @VisibleForTesting
    public void checkAuthorization(String str, Map<String, Object> map, String str2, ReqContext reqContext) throws AuthorizationException {
        if (reqContext == null) {
            reqContext = ReqContext.context();
        }
        HashMap hashMap = new HashMap();
        if (map != null) {
            hashMap.putAll(map);
        } else if (str != null) {
            hashMap.put("topology.name", str);
        }
        if (reqContext.isImpersonating()) {
            LOG.info("principal: {} is trying to impersonate principal: {}", reqContext.realPrincipal(), reqContext.principal());
            throw new WrappedAuthorizationException("Supervisor does not support impersonation");
        }
        IAuthorizer iAuthorizer = this.authorizationHandler;
        if (iAuthorizer != null) {
            if (iAuthorizer.permit(reqContext, str2, hashMap)) {
                ThriftAccessLogger.logAccess(Integer.valueOf(reqContext.requestID()), reqContext.remoteAddress(), reqContext.principal(), str2, str, "access-granted");
            } else {
                ThriftAccessLogger.logAccess(Integer.valueOf(reqContext.requestID()), reqContext.remoteAddress(), reqContext.principal(), str2, str, "access-denied");
                throw new WrappedAuthorizationException(str2 + (str != null ? " on topology " + str : "") + " is not authorized");
            }
        }
    }

    private Supervisor.Iface createSupervisorIface() {
        return new Supervisor.Iface() { // from class: org.apache.storm.daemon.supervisor.Supervisor.1
            public void sendSupervisorAssignments(SupervisorAssignments supervisorAssignments) throws AuthorizationException, TException {
                Supervisor.this.checkAuthorization("sendSupervisorAssignments");
                Supervisor.LOG.info("Got an assignments from master, will start to sync with assignments: {}", supervisorAssignments);
                Supervisor.this.getEventManger().add(new SynchronizeAssignments(Supervisor.this.getSupervisor(), supervisorAssignments, Supervisor.this.getReadClusterState()));
            }

            public Assignment getLocalAssignmentForStorm(String str) throws NotAliveException, AuthorizationException, TException {
                Map<String, Object> map = null;
                try {
                    map = ConfigUtils.readSupervisorStormConf(Supervisor.this.conf, str);
                } catch (IOException e) {
                    Supervisor.LOG.warn("Topology config is not localized yet...");
                }
                Supervisor.this.checkAuthorization(str, map, "getLocalAssignmentForStorm");
                Assignment assignmentInfo = Supervisor.this.getStormClusterState().assignmentInfo(str, (Runnable) null);
                if (null == assignmentInfo) {
                    throw new WrappedNotAliveException("No local assignment assigned for storm: " + str + " for node: " + Supervisor.this.getHostName());
                }
                return assignmentInfo;
            }

            public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat supervisorWorkerHeartbeat) throws AuthorizationException, NotAliveException, TException {
                String str = supervisorWorkerHeartbeat.get_storm_id();
                try {
                    Supervisor.this.checkAuthorization(str, ConfigUtils.readSupervisorStormConf(Supervisor.this.conf, str), "sendSupervisorWorkerHeartbeat");
                } catch (IOException e) {
                    Supervisor.LOG.warn("Topology config is not localized yet...");
                    throw new WrappedNotAliveException(str + " does not appear to be alive, you should probably exit");
                }
            }
        };
    }

    public Supervisor.Iface getSupervisorThriftInterface() {
        return this.supervisorThriftInterface;
    }

    private void launchSupervisorThriftServer(Map<String, Object> map) throws IOException {
        int thriftServerPort = getThriftServerPort();
        try {
            new ServerSocket(thriftServerPort).close();
            this.thriftServer = new ThriftServer(map, new Supervisor.Processor(this.supervisorThriftInterface), ThriftConnectionType.SUPERVISOR);
            this.thriftServer.serve();
        } catch (BindException e) {
            LOG.error("{} is not available. Check if another process is already listening on {}", Integer.valueOf(thriftServerPort), Integer.valueOf(thriftServerPort));
            throw new RuntimeException(e);
        }
    }

    public void sendSupervisorAssignments(SupervisorAssignments supervisorAssignments) {
        if (!Time.isSimulating() || ((Boolean) this.conf.get(DaemonConfig.SUPERVISOR_ENABLE)).booleanValue()) {
            this.eventManager.add(new SynchronizeAssignments(this, supervisorAssignments, this.readState));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            LOG.info("Shutting down supervisor {}", getId());
            this.active = false;
            this.heartbeatTimer.close();
            this.workerHeartbeatTimer.close();
            this.eventTimer.close();
            if (this.eventManager != null) {
                this.eventManager.close();
            }
            if (this.readState != null) {
                this.readState.close();
            }
            this.asyncLocalizer.close();
            getStormClusterState().disconnect();
            if (this.thriftServer != null) {
                this.thriftServer.stop();
            }
        } catch (Exception e) {
            LOG.error("Error Shutting down", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void killWorkers(Collection<String> collection, ContainerLauncher containerLauncher) throws InterruptedException, IOException {
        HashSet hashSet = new HashSet();
        for (String str : collection) {
            try {
                Killable recoverContainer = containerLauncher.recoverContainer(str, this.localState);
                if (recoverContainer.areAllProcessesDead()) {
                    recoverContainer.cleanUp();
                } else {
                    recoverContainer.kill();
                    hashSet.add(recoverContainer);
                }
            } catch (Exception e) {
                LOG.error("Error trying to kill {}", str, e);
            }
        }
        int intValue = ObjectReader.getInt(this.conf.get("supervisor.worker.shutdown.sleep.secs")).intValue();
        if (!hashSet.isEmpty()) {
            Time.sleepSecs(intValue);
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            Killable killable = (Killable) it.next();
            try {
                long currentTimeMillis = Time.currentTimeMillis();
                while (!killable.areAllProcessesDead()) {
                    if (Time.currentTimeMillis() - currentTimeMillis > 10000) {
                        if (this.killErrorMeter != null) {
                            this.killErrorMeter.mark();
                        }
                        throw new RuntimeException("Giving up on killing " + killable + " after " + (Time.currentTimeMillis() - currentTimeMillis) + " ms");
                        break;
                    }
                    killable.forceKill();
                    Time.sleep(100L);
                }
                killable.cleanUp();
            } catch (Exception e2) {
                LOG.error("Error trying to clean up {}", killable, e2);
            }
        }
    }

    public void shutdownAllWorkers(BiConsumer<Slot, Long> biConsumer, UniFunc<Slot> uniFunc) {
        if (this.readState != null) {
            this.readState.shutdownAllWorkers(biConsumer, uniFunc);
            return;
        }
        try {
            killWorkers(SupervisorUtils.supervisorWorkerIds(this.conf), ContainerLauncher.make(getConf(), getId(), getThriftServerPort(), getSharedContext(), getMetricsRegistry(), getContainerMemoryTracker(), this.supervisorThriftInterface));
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public boolean isWaiting() {
        if (this.active) {
            return this.heartbeatTimer.isTimerWaiting() && this.workerHeartbeatTimer.isTimerWaiting() && this.eventTimer.isTimerWaiting() && this.eventManager.waiting();
        }
        return true;
    }
}
