/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.coordinator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.coordinator.CoordinatorRestResource;
import org.apache.kafka.trogdor.coordinator.NodeManager;
import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.SendingState;
import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.apache.kafka.trogdor.rest.FaultDataMap;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Coordinator {
    private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
    private final Time time;
    private final long startTimeMs;
    private final Platform platform;
    private final Map<String, NodeManager> nodeManagers;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition cond = this.lock.newCondition();
    private final CoordinatorRunnable runnable;
    private final JsonRestServer restServer;
    private final KafkaThread thread;
    private boolean shutdown = false;
    private final FaultSet pendingFaults = new FaultSet();
    private final FaultSet processedFaults = new FaultSet();

    public Coordinator(Platform platform, Time time, JsonRestServer restServer, CoordinatorRestResource resource) {
        this.platform = platform;
        this.time = time;
        this.startTimeMs = time.milliseconds();
        this.runnable = new CoordinatorRunnable();
        this.restServer = restServer;
        this.nodeManagers = new HashMap<String, NodeManager>();
        for (Node node : platform.topology().nodes().values()) {
            if (Node.Util.getTrogdorAgentPort(node) <= 0) continue;
            this.nodeManagers.put(node.name(), new NodeManager(time, node));
        }
        if (this.nodeManagers.isEmpty()) {
            log.warn("No agent nodes configured.");
        }
        this.thread = new KafkaThread("TrogdorCoordinatorThread", (Runnable)this.runnable, false);
        this.thread.start();
        resource.setCoordinator(this);
    }

    public int port() {
        return this.restServer.port();
    }

    private void startFault(long now, Fault fault) {
        Set<String> affectedNodes = fault.targetNodes(this.platform.topology());
        HashSet<NodeManager> affectedManagers = new HashSet<NodeManager>();
        HashSet<String> nonexistentNodes = new HashSet<String>();
        HashSet<String> nodeNames = new HashSet<String>();
        for (String affectedNode : affectedNodes) {
            NodeManager nodeManager = this.nodeManagers.get(affectedNode);
            if (nodeManager == null) {
                nonexistentNodes.add(affectedNode);
                continue;
            }
            affectedManagers.add(nodeManager);
            nodeNames.add(affectedNode);
        }
        if (!nonexistentNodes.isEmpty()) {
            log.warn("Fault {} refers to {} non-existent node(s): {}", new Object[]{fault.id(), nonexistentNodes.size(), Utils.join(nonexistentNodes, (String)", ")});
        }
        log.info("Applying fault {} on {} node(s): {}", new Object[]{fault.id(), nodeNames.size(), Utils.join(nodeNames, (String)", ")});
        if (nodeNames.isEmpty()) {
            fault.setState(new DoneState(now, ""));
        } else {
            fault.setState(new SendingState(nodeNames));
        }
        for (NodeManager nodeManager : affectedManagers) {
            nodeManager.enqueueFault(fault);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void beginShutdown() {
        this.lock.lock();
        try {
            this.shutdown = true;
            this.cond.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    public void waitForShutdown() {
        try {
            this.thread.join();
        }
        catch (InterruptedException e) {
            log.error("Interrupted while waiting for thread shutdown", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    public long startTimeMs() {
        return this.startTimeMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CoordinatorFaultsResponse getFaults() {
        TreeMap<String, FaultDataMap.FaultData> faultData = new TreeMap<String, FaultDataMap.FaultData>();
        this.lock.lock();
        try {
            this.getFaultsImpl(faultData, this.pendingFaults);
            this.getFaultsImpl(faultData, this.processedFaults);
        }
        finally {
            this.lock.unlock();
        }
        return new CoordinatorFaultsResponse(faultData);
    }

    private void getFaultsImpl(Map<String, FaultDataMap.FaultData> faultData, FaultSet faultSet) {
        FaultSet.FaultSetIterator iter = faultSet.iterateByStart();
        while (iter.hasNext()) {
            Fault fault = (Fault)iter.next();
            FaultDataMap.FaultData data = new FaultDataMap.FaultData(fault.spec(), fault.state());
            faultData.put(fault.id(), data);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void createFault(CreateCoordinatorFaultRequest request) throws ClassNotFoundException {
        this.lock.lock();
        try {
            Fault fault = FaultSpec.Util.createFault(request.id(), request.spec());
            this.pendingFaults.add(fault);
            this.cond.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    public static void main(String[] args) throws Exception {
        ArgumentParser parser = ArgumentParsers.newArgumentParser((String)"trogdor-coordinator").defaultHelp(true).description("The Trogdor fault injection coordinator");
        parser.addArgument(new String[]{"--coordinator.config"}).action((ArgumentAction)Arguments.store()).required(true).type(String.class).dest("config").metavar(new String[]{"CONFIG"}).help("The configuration file to use.");
        parser.addArgument(new String[]{"--node-name"}).action((ArgumentAction)Arguments.store()).required(true).type(String.class).dest("node_name").metavar(new String[]{"NODE_NAME"}).help("The name of this node.");
        Namespace res = null;
        try {
            res = parser.parseArgs(args);
        }
        catch (ArgumentParserException e) {
            if (args.length == 0) {
                parser.printHelp();
                Exit.exit((int)0);
            }
            parser.handleError(e);
            Exit.exit((int)1);
        }
        String configPath = res.getString("config");
        String nodeName = res.getString("node_name");
        Platform platform = Platform.Config.parse(nodeName, configPath);
        JsonRestServer restServer = new JsonRestServer(Node.Util.getTrogdorCoordinatorPort(platform.curNode()));
        CoordinatorRestResource resource = new CoordinatorRestResource();
        final Coordinator coordinator = new Coordinator(platform, Time.SYSTEM, restServer, resource);
        restServer.start(resource);
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                log.error("Running shutdown hook...");
                coordinator.beginShutdown();
                coordinator.waitForShutdown();
            }
        });
        coordinator.waitForShutdown();
    }

    class CoordinatorRunnable
    implements Runnable {
        CoordinatorRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            log.info("Starting main service thread.");
            try {
                long nextWakeMs = 0L;
                block10: while (true) {
                    Fault fault;
                    long now = Coordinator.this.time.milliseconds();
                    ArrayList<Fault> toStart = new ArrayList<Fault>();
                    Coordinator.this.lock.lock();
                    try {
                        if (Coordinator.this.shutdown) break;
                        if (nextWakeMs > now) {
                            if (Coordinator.this.cond.await(nextWakeMs - now, TimeUnit.MILLISECONDS)) {
                                log.trace("CoordinatorRunnable woke up early.");
                            }
                            now = Coordinator.this.time.milliseconds();
                            if (Coordinator.this.shutdown) break;
                        }
                        nextWakeMs = now + 3600000L;
                        FaultSet.FaultSetIterator iter = Coordinator.this.pendingFaults.iterateByStart();
                        while (iter.hasNext()) {
                            fault = (Fault)iter.next();
                            if (now < fault.spec().startMs()) {
                                nextWakeMs = Math.min(nextWakeMs, fault.spec().startMs());
                                break;
                            }
                            toStart.add(fault);
                            iter.remove();
                            Coordinator.this.processedFaults.add(fault);
                        }
                    }
                    finally {
                        Coordinator.this.lock.unlock();
                    }
                    Iterator i$ = toStart.iterator();
                    while (true) {
                        if (!i$.hasNext()) continue block10;
                        fault = (Fault)i$.next();
                        Coordinator.this.startFault(now, fault);
                    }
                    break;
                }
            }
            catch (Throwable t) {
                log.error("CoordinatorRunnable shutting down with exception", t);
            }
            finally {
                log.info("CoordinatorRunnable shutting down.");
                Coordinator.this.restServer.stop();
                for (NodeManager nodeManager : Coordinator.this.nodeManagers.values()) {
                    nodeManager.beginShutdown();
                }
                for (NodeManager nodeManager : Coordinator.this.nodeManagers.values()) {
                    nodeManager.waitForShutdown();
                }
            }
        }
    }
}

