/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.supervisor;

import com.codahale.metrics.Meter;
import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.commons.io.FileUtils;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.supervisor.ContainerLauncher;
import org.apache.storm.daemon.supervisor.ContainerMemoryTracker;
import org.apache.storm.daemon.supervisor.DefaultUncaughtExceptionHandler;
import org.apache.storm.daemon.supervisor.EventManagerPushCallback;
import org.apache.storm.daemon.supervisor.Killable;
import org.apache.storm.daemon.supervisor.ReadClusterState;
import org.apache.storm.daemon.supervisor.Slot;
import org.apache.storm.daemon.supervisor.SlotMetrics;
import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.daemon.supervisor.UniFunc;
import org.apache.storm.daemon.supervisor.timer.ReportWorkerHeartbeats;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
import org.apache.storm.daemon.supervisor.timer.SynchronizeAssignments;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.NotAliveException;
import org.apache.storm.generated.Supervisor;
import org.apache.storm.generated.SupervisorAssignments;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.logging.ThriftAccessLogger;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.storm.security.auth.ThriftServer;
import org.apache.storm.security.auth.workertoken.WorkerTokenAuthorizer;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.thrift.TException;
import org.apache.storm.thrift.TProcessor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ShellUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.apache.storm.utils.WrappedAuthorizationException;
import org.apache.storm.utils.WrappedNotAliveException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Supervisor
implements DaemonCommon,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
    private final Map<String, Object> conf;
    private final IContext sharedContext;
    private final IAuthorizer authorizationHandler;
    private final ISupervisor iSupervisor;
    private final Utils.UptimeComputer upTime;
    private final String stormVersion;
    private final IStormClusterState stormClusterState;
    private final LocalState localState;
    private final String supervisorId;
    private final String assignmentId;
    private final int supervisorPort;
    private final String hostName;
    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
    private final StormTimer heartbeatTimer;
    private final StormTimer workerHeartbeatTimer;
    private final StormTimer eventTimer;
    private final ExecutorService heartbeatExecutor;
    private final AsyncLocalizer asyncLocalizer;
    private final StormMetricsRegistry metricsRegistry;
    private Meter killErrorMeter;
    private final ContainerMemoryTracker containerMemoryTracker;
    private final SlotMetrics slotMetrics;
    private volatile boolean active;
    private EventManager eventManager;
    private ReadClusterState readState;
    private ThriftServer thriftServer;
    private Nimbus.Iface localNimbus;
    private Supervisor.Iface supervisorThriftInterface;

    private Supervisor(ISupervisor iSupervisor, StormMetricsRegistry metricsRegistry) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
        this(ConfigUtils.readStormConfig(), null, iSupervisor, metricsRegistry);
    }

    public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor, StormMetricsRegistry metricsRegistry) throws IOException, IllegalAccessException, ClassNotFoundException, InstantiationException {
        this.conf = conf;
        this.metricsRegistry = metricsRegistry;
        this.containerMemoryTracker = new ContainerMemoryTracker(metricsRegistry);
        this.slotMetrics = new SlotMetrics(metricsRegistry);
        this.iSupervisor = iSupervisor;
        this.active = true;
        this.upTime = Utils.makeUptimeComputer();
        this.stormVersion = VersionInfo.getVersion();
        this.sharedContext = sharedContext;
        this.heartbeatExecutor = Executors.newFixedThreadPool(1);
        this.authorizationHandler = StormCommon.mkAuthorizationHandler((String)((String)conf.get("supervisor.authorizer")), conf);
        if (this.authorizationHandler == null && conf.get("nimbus.authorizer") != null) {
            throw new IllegalStateException("It looks like authorization is turned on for nimbus but not for the supervisor. ( supervisor.authorizer is not set)");
        }
        iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf));
        try {
            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, (ClusterStateContext)new ClusterStateContext(DaemonType.SUPERVISOR, conf));
        }
        catch (Exception e) {
            LOG.error("supervisor can't create stormClusterState");
            throw Utils.wrapInRuntime((Exception)e);
        }
        this.currAssignment = new AtomicReference(new HashMap());
        try {
            this.localState = ServerConfigUtils.supervisorState(conf);
            this.asyncLocalizer = new AsyncLocalizer(conf, metricsRegistry);
        }
        catch (IOException e) {
            throw Utils.wrapInRuntime((Exception)e);
        }
        this.supervisorId = iSupervisor.getSupervisorId();
        this.assignmentId = iSupervisor.getAssignmentId();
        this.supervisorPort = ObjectReader.getInt((Object)conf.get("supervisor.thrift.port"));
        try {
            this.hostName = Utils.hostname();
        }
        catch (UnknownHostException e) {
            throw Utils.wrapInRuntime((Exception)e);
        }
        this.heartbeatTimer = new StormTimer("HBTimer", (Thread.UncaughtExceptionHandler)new DefaultUncaughtExceptionHandler());
        this.workerHeartbeatTimer = new StormTimer("WorkerHBTimer", (Thread.UncaughtExceptionHandler)new DefaultUncaughtExceptionHandler());
        this.eventTimer = new StormTimer("EventTimer", (Thread.UncaughtExceptionHandler)new DefaultUncaughtExceptionHandler());
        this.supervisorThriftInterface = this.createSupervisorIface();
    }

    public static void main(String[] args) throws Exception {
        Utils.setupDefaultUncaughtExceptionHandler();
        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
        Supervisor instance = new Supervisor(new StandaloneSupervisor(), metricsRegistry);
        instance.launchDaemon();
    }

    public ExecutorService getHeartbeatExecutor() {
        return this.heartbeatExecutor;
    }

    public String getId() {
        return this.supervisorId;
    }

    IContext getSharedContext() {
        return this.sharedContext;
    }

    public StormMetricsRegistry getMetricsRegistry() {
        return this.metricsRegistry;
    }

    ContainerMemoryTracker getContainerMemoryTracker() {
        return this.containerMemoryTracker;
    }

    SlotMetrics getSlotMetrics() {
        return this.slotMetrics;
    }

    public Map<String, Object> getConf() {
        return this.conf;
    }

    public ISupervisor getiSupervisor() {
        return this.iSupervisor;
    }

    public Utils.UptimeComputer getUpTime() {
        return this.upTime;
    }

    public String getStormVersion() {
        return this.stormVersion;
    }

    public IStormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    public ReadClusterState getReadClusterState() {
        return this.readState;
    }

    LocalState getLocalState() {
        return this.localState;
    }

    public String getAssignmentId() {
        return this.assignmentId;
    }

    public int getThriftServerPort() {
        return this.supervisorPort;
    }

    public String getHostName() {
        return this.hostName;
    }

    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
        return this.currAssignment;
    }

    AsyncLocalizer getAsyncLocalizer() {
        return this.asyncLocalizer;
    }

    EventManager getEventManger() {
        return this.eventManager;
    }

    Supervisor getSupervisor() {
        return this;
    }

    public Nimbus.Iface getLocalNimbus() {
        return this.localNimbus;
    }

    public void setLocalNimbus(Nimbus.Iface nimbus) {
        this.localNimbus = nimbus;
    }

    public void launch() throws Exception {
        LOG.info("Starting Supervisor with conf {}", (Object)ConfigUtils.maskPasswords(this.conf));
        String path = ServerConfigUtils.supervisorTmpDir(this.conf);
        FileUtils.cleanDirectory((File)new File(path));
        SupervisorHeartbeat hb = new SupervisorHeartbeat(this.conf, this);
        hb.run();
        Integer heartbeatFrequency = ObjectReader.getInt((Object)this.conf.get("supervisor.heartbeat.frequency.secs"));
        this.heartbeatTimer.scheduleRecurring(0, heartbeatFrequency.intValue(), (Runnable)hb);
        this.eventManager = new EventManagerImp(false);
        this.readState = new ReadClusterState(this);
        this.asyncLocalizer.start();
        if (((Boolean)this.conf.get("supervisor.enable")).booleanValue()) {
            this.eventTimer.scheduleRecurring(0, 10, (Runnable)new EventManagerPushCallback(new SynchronizeAssignments(this, null, this.readState), this.eventManager));
            this.eventTimer.scheduleRecurring(30, 30, (Runnable)new SupervisorHealthCheck(this));
        }
        ReportWorkerHeartbeats reportWorkerHeartbeats = new ReportWorkerHeartbeats(this.conf, this);
        Integer workerHeartbeatFrequency = ObjectReader.getInt((Object)this.conf.get("worker.heartbeat.frequency.secs"));
        this.workerHeartbeatTimer.scheduleRecurring(0, workerHeartbeatFrequency.intValue(), (Runnable)reportWorkerHeartbeats);
        LOG.info("Starting supervisor with id {} at host {}.", (Object)this.getId(), (Object)this.getHostName());
    }

    public void launchDaemon() {
        LOG.info("Starting supervisor for storm version '{}'.", (Object)VersionInfo.getVersion());
        try {
            Map<String, Object> conf = this.getConf();
            if (ConfigUtils.isLocalMode(conf)) {
                throw new IllegalArgumentException("Cannot start server in local mode!");
            }
            this.launch();
            this.metricsRegistry.registerGauge("supervisor:num-slots-used-gauge", () -> SupervisorUtils.supervisorWorkerIds(conf).size());
            this.metricsRegistry.registerMeter("supervisor:num-launched").mark();
            this.metricsRegistry.registerMeter("supervisor:num-shell-exceptions", ShellUtils.numShellExceptions);
            this.metricsRegistry.registerMeter("supervisor:health-check-timeouts");
            this.killErrorMeter = this.metricsRegistry.registerMeter("supervisor:num-kill-worker-errors");
            this.metricsRegistry.registerMeter("supervisor:workerTokenAuthorizer-get-password-failures", WorkerTokenAuthorizer.getPasswordFailuresMeter());
            this.metricsRegistry.startMetricsReporters(conf);
            Utils.addShutdownHookWithForceKillIn1Sec(() -> {
                this.metricsRegistry.stopMetricsReporters();
                this.close();
            });
            this.launchSupervisorThriftServer(conf);
        }
        catch (Exception e) {
            LOG.error("Failed to start supervisor\n", (Throwable)e);
            System.exit(1);
        }
    }

    @VisibleForTesting
    public void checkAuthorization(String operation) throws AuthorizationException {
        this.checkAuthorization(null, null, operation, null);
    }

    @VisibleForTesting
    public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation) throws AuthorizationException {
        this.checkAuthorization(topoName, topoConf, operation, null);
    }

    @VisibleForTesting
    public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation, ReqContext context) throws AuthorizationException {
        if (context == null) {
            context = ReqContext.context();
        }
        HashMap<String, Object> checkConf = new HashMap<String, Object>();
        if (topoConf != null) {
            checkConf.putAll(topoConf);
        } else if (topoName != null) {
            checkConf.put("topology.name", topoName);
        }
        if (context.isImpersonating()) {
            LOG.info("principal: {} is trying to impersonate principal: {}", (Object)context.realPrincipal(), (Object)context.principal());
            throw new WrappedAuthorizationException("Supervisor does not support impersonation");
        }
        IAuthorizer aclHandler = this.authorizationHandler;
        if (aclHandler != null) {
            if (!aclHandler.permit(context, operation, checkConf)) {
                ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(), operation, topoName, "access-denied");
                throw new WrappedAuthorizationException(operation + (String)(topoName != null ? " on topology " + topoName : "") + " is not authorized");
            }
            ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(), operation, topoName, "access-granted");
        }
    }

    private Supervisor.Iface createSupervisorIface() {
        return new Supervisor.Iface(){

            public void sendSupervisorAssignments(SupervisorAssignments assignments) throws AuthorizationException, TException {
                Supervisor.this.checkAuthorization("sendSupervisorAssignments");
                LOG.info("Got an assignments from master, will start to sync with assignments: {}", (Object)assignments);
                SynchronizeAssignments syn = new SynchronizeAssignments(Supervisor.this.getSupervisor(), assignments, Supervisor.this.getReadClusterState());
                Supervisor.this.getEventManger().add(syn);
            }

            public Assignment getLocalAssignmentForStorm(String id) throws NotAliveException, AuthorizationException, TException {
                Map topoConf = null;
                try {
                    topoConf = ConfigUtils.readSupervisorStormConf(Supervisor.this.conf, (String)id);
                }
                catch (IOException e) {
                    LOG.warn("Topology config is not localized yet...");
                }
                Supervisor.this.checkAuthorization(id, topoConf, "getLocalAssignmentForStorm");
                Assignment assignment = Supervisor.this.getStormClusterState().assignmentInfo(id, null);
                if (null == assignment) {
                    throw new WrappedNotAliveException("No local assignment assigned for storm: " + id + " for node: " + Supervisor.this.getHostName());
                }
                return assignment;
            }

            public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat) throws AuthorizationException, NotAliveException, TException {
                String id = heartbeat.get_storm_id();
                Map topoConf = null;
                try {
                    topoConf = ConfigUtils.readSupervisorStormConf(Supervisor.this.conf, (String)id);
                }
                catch (IOException e) {
                    LOG.warn("Topology config is not localized yet...");
                    throw new WrappedNotAliveException(id + " does not appear to be alive, you should probably exit");
                }
                Supervisor.this.checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat");
            }
        };
    }

    public Supervisor.Iface getSupervisorThriftInterface() {
        return this.supervisorThriftInterface;
    }

    private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException {
        int port = this.getThriftServerPort();
        try {
            ServerSocket socket = new ServerSocket(port);
            socket.close();
        }
        catch (BindException e) {
            LOG.error("{} is not available. Check if another process is already listening on {}", (Object)port, (Object)port);
            throw new RuntimeException(e);
        }
        Supervisor.Processor processor = new Supervisor.Processor(this.supervisorThriftInterface);
        this.thriftServer = new ThriftServer(conf, (TProcessor)processor, ThriftConnectionType.SUPERVISOR);
        this.thriftServer.serve();
    }

    public void sendSupervisorAssignments(SupervisorAssignments assignments) {
        if (Time.isSimulating() && !((Boolean)this.conf.get("supervisor.enable")).booleanValue()) {
            return;
        }
        SynchronizeAssignments syn = new SynchronizeAssignments(this, assignments, this.readState);
        this.eventManager.add(syn);
    }

    @Override
    public void close() {
        try {
            LOG.info("Shutting down supervisor {}", (Object)this.getId());
            this.active = false;
            this.heartbeatTimer.close();
            this.workerHeartbeatTimer.close();
            this.eventTimer.close();
            if (this.eventManager != null) {
                this.eventManager.close();
            }
            if (this.readState != null) {
                this.readState.close();
            }
            this.asyncLocalizer.close();
            this.getStormClusterState().disconnect();
            if (this.thriftServer != null) {
                this.thriftServer.stop();
            }
        }
        catch (Exception e) {
            LOG.error("Error Shutting down", (Throwable)e);
        }
    }

    void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) throws InterruptedException, IOException {
        HashSet<Killable> containers = new HashSet<Killable>();
        for (String workerId : workerIds) {
            try {
                Killable k = launcher.recoverContainer(workerId, this.localState);
                if (!k.areAllProcessesDead()) {
                    k.kill();
                    containers.add(k);
                    continue;
                }
                k.cleanUp();
            }
            catch (Exception e) {
                LOG.error("Error trying to kill {}", (Object)workerId, (Object)e);
            }
        }
        int shutdownSleepSecs = ObjectReader.getInt((Object)this.conf.get("supervisor.worker.shutdown.sleep.secs"));
        if (!containers.isEmpty()) {
            Time.sleepSecs((long)shutdownSleepSecs);
        }
        for (Killable k : containers) {
            try {
                long start = Time.currentTimeMillis();
                while (!k.areAllProcessesDead()) {
                    if (Time.currentTimeMillis() - start > 10000L) {
                        if (this.killErrorMeter != null) {
                            this.killErrorMeter.mark();
                        }
                        throw new RuntimeException("Giving up on killing " + k + " after " + (Time.currentTimeMillis() - start) + " ms");
                    }
                    k.forceKill();
                    Time.sleep((long)100L);
                }
                k.cleanUp();
            }
            catch (Exception e) {
                LOG.error("Error trying to clean up {}", (Object)k, (Object)e);
            }
        }
    }

    public void shutdownAllWorkers(BiConsumer<Slot, Long> onWarnTimeout, UniFunc<Slot> onErrorTimeout) {
        if (this.readState != null) {
            this.readState.shutdownAllWorkers(onWarnTimeout, onErrorTimeout);
        } else {
            try {
                ContainerLauncher launcher = ContainerLauncher.make(this.getConf(), this.getId(), this.getThriftServerPort(), this.getSharedContext(), this.getMetricsRegistry(), this.getContainerMemoryTracker(), this.supervisorThriftInterface);
                this.killWorkers(SupervisorUtils.supervisorWorkerIds(this.conf), launcher);
            }
            catch (Exception e) {
                throw Utils.wrapInRuntime((Exception)e);
            }
        }
    }

    public boolean isWaiting() {
        if (!this.active) {
            return true;
        }
        return this.heartbeatTimer.isTimerWaiting() && this.workerHeartbeatTimer.isTimerWaiting() && this.eventTimer.isTimerWaiting() && this.eventManager.waiting();
    }
}

