/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.executor;

import java.util.ArrayList;
import java.util.List;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.Task;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.IRunningExecutor;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.hooks.ITaskHook;
import org.apache.storm.spout.ISpout;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorShutdown
implements Shutdownable,
IRunningExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);
    private final Executor executor;
    private final List<Utils.SmartThread> threads;
    private final ArrayList<Task> taskDatas;
    private final JCQueue receiveQueue;

    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, ArrayList<Task> taskDatas, JCQueue recvQueue) {
        this.executor = executor;
        this.threads = threads;
        this.taskDatas = taskDatas;
        this.receiveQueue = recvQueue;
    }

    @Override
    public ExecutorStats renderStats() {
        return this.executor.getStats().renderStats();
    }

    @Override
    public List<Long> getExecutorId() {
        return this.executor.getExecutorId();
    }

    @Override
    public void credentialsChanged(Credentials credentials) {
        this.executor.setNeedToRefreshCreds();
    }

    @Override
    public void loadChanged(LoadMapping loadMapping) {
        this.executor.reflectNewLoadMapping(loadMapping);
    }

    @Override
    public JCQueue getReceiveQueue() {
        return this.receiveQueue;
    }

    @Override
    public boolean publishFlushTuple() {
        return this.executor.publishFlushTuple();
    }

    @Override
    public void shutdown() {
        try {
            LOG.info("Shutting down executor " + this.executor.getComponentId() + ":" + this.executor.getExecutorId());
            this.executor.getReceiveQueue().close();
            for (Utils.SmartThread t : this.threads) {
                t.interrupt();
            }
            for (Utils.SmartThread t : this.threads) {
                LOG.debug("Executor " + this.executor.getComponentId() + ":" + this.executor.getExecutorId() + " joining thread " + t.getName());
                long waitMs = 100L;
                t.join(waitMs);
                if (!t.isAlive()) continue;
                LOG.warn("Thread {} is still alive ({} ms after interruption). Stop waiting for it.", (Object)t.getName(), (Object)waitMs);
            }
            this.executor.getStats().cleanupStats();
            for (Task task : this.taskDatas) {
                if (task == null) continue;
                TopologyContext userContext = task.getUserContext();
                for (ITaskHook hook : userContext.getHooks()) {
                    hook.cleanup();
                }
            }
            this.executor.getStormClusterState().disconnect();
            if (this.executor.getOpenOrPrepareWasCalled().get()) {
                for (Task task : this.taskDatas) {
                    if (task == null) continue;
                    Object object = task.getTaskObject();
                    if (object instanceof ISpout) {
                        ((ISpout)object).close();
                        continue;
                    }
                    if (object instanceof IBolt) {
                        ((IBolt)object).cleanup();
                        continue;
                    }
                    LOG.error("unknown component object");
                }
            }
            LOG.info("Shut down executor " + this.executor.getComponentId() + ":" + this.executor.getExecutorId());
        }
        catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}

