/*
 * Decompiled with CFR 0.152.
 */
package com.github.myzhan.locust4j.runtime;

import com.github.myzhan.locust4j.AbstractTask;
import com.github.myzhan.locust4j.Locust;
import com.github.myzhan.locust4j.Log;
import com.github.myzhan.locust4j.message.Message;
import com.github.myzhan.locust4j.rpc.Client;
import com.github.myzhan.locust4j.runtime.RunnerState;
import com.github.myzhan.locust4j.stats.Stats;
import com.github.myzhan.locust4j.utils.Utils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Runner {
    protected String nodeID;
    protected int numClients = 0;
    private RunnerState state;
    private List<AbstractTask> tasks;
    private Client rpcClient;
    private int hatchRate = 0;
    private ExecutorService taskExecutor;
    private ExecutorService executor;
    private Stats stats;
    private AtomicInteger threadNumber = new AtomicInteger();

    public Runner() {
        this.nodeID = Utils.getNodeID();
    }

    public RunnerState getState() {
        return this.state;
    }

    public String getNodeID() {
        return this.nodeID;
    }

    public void setRPCClient(Client client) {
        this.rpcClient = client;
    }

    public void setStats(Stats stats) {
        this.stats = stats;
    }

    public void setTasks(List<AbstractTask> tasks) {
        this.tasks = tasks;
    }

    private void spawnWorkers(int spawnCount) {
        Log.debug(String.format("Hatching and swarming %d clients at the rate %d clients/s...", spawnCount, this.hatchRate));
        float weightSum = 0.0f;
        for (AbstractTask task : this.tasks) {
            weightSum += (float)task.getWeight();
        }
        for (AbstractTask task : this.tasks) {
            int amount;
            if (0.0f == weightSum) {
                amount = spawnCount / this.tasks.size();
            } else {
                float percent = (float)task.getWeight() / weightSum;
                amount = Math.round((float)spawnCount * percent);
            }
            Log.debug(String.format("Allocating %d threads to task, which name is %s", amount, task.getName()));
            for (int i = 1; i <= amount; ++i) {
                if (i % this.hatchRate == 0) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (Exception ex) {
                        Log.error(ex.getMessage());
                    }
                }
                ++this.numClients;
                this.taskExecutor.submit(task);
            }
        }
    }

    protected void startHatching(int spawnCount, int hatchRate) {
        this.stats.getClearStatsQueue().offer(true);
        Stats.getInstance().wakeMeUp();
        this.hatchRate = hatchRate;
        this.numClients = 0;
        this.threadNumber.set(0);
        this.taskExecutor = new ThreadPoolExecutor(spawnCount, spawnCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("locust4j-worker#" + Runner.this.threadNumber.getAndIncrement());
                return thread;
            }
        });
        this.spawnWorkers(spawnCount);
    }

    protected void hatchComplete() {
        HashMap<String, Object> data = new HashMap<String, Object>(1);
        data.put("count", this.numClients);
        try {
            this.rpcClient.send(new Message("hatch_complete", data, this.nodeID));
        }
        catch (IOException ex) {
            Log.error(ex);
        }
    }

    public void quit() {
        try {
            this.rpcClient.send(new Message("quit", null, this.nodeID));
            this.executor.shutdownNow();
        }
        catch (IOException ex) {
            Log.error(ex);
        }
    }

    private void shutdownThreadPool() {
        this.taskExecutor.shutdownNow();
        try {
            this.taskExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            Log.error(ex.getMessage());
        }
        this.taskExecutor = null;
    }

    protected void stop() {
        this.shutdownThreadPool();
    }

    private boolean hatchMessageIsValid(Message message) {
        Float hatchRate = Float.valueOf(message.getData().get("hatch_rate").toString());
        int numClients = Integer.valueOf(message.getData().get("num_clients").toString());
        if (hatchRate.intValue() == 0 || numClients == 0) {
            Log.debug(String.format("Invalid message (hatch_rate: %d, num_clients: %d) from master, ignored.", hatchRate.intValue(), numClients));
            return false;
        }
        return true;
    }

    private void onHatchMessage(Message message) {
        Float hatchRate = Float.valueOf(message.getData().get("hatch_rate").toString());
        int numClients = Integer.valueOf(message.getData().get("num_clients").toString());
        try {
            this.rpcClient.send(new Message("hatching", null, this.nodeID));
        }
        catch (IOException ex) {
            Log.error(ex);
        }
        this.startHatching(numClients, hatchRate.intValue());
        this.hatchComplete();
    }

    private void onMessage(Message message) {
        String type = message.getType();
        if (!("hatch".equals(type) || "stop".equals(type) || "quit".equals(type))) {
            Log.error(String.format("Got %s message from master, which is not supported, please report an issue to locust4j.", type));
            return;
        }
        if ("quit".equals(type)) {
            Log.debug("Got quit message from master, shutting down...");
            System.exit(0);
        }
        if (this.state == RunnerState.Ready) {
            if ("hatch".equals(type) && this.hatchMessageIsValid(message)) {
                this.state = RunnerState.Hatching;
                this.onHatchMessage(message);
                if (null != Locust.getInstance().getRateLimiter()) {
                    Locust.getInstance().getRateLimiter().start();
                }
                this.state = RunnerState.Running;
            }
        } else if (this.state == RunnerState.Hatching || this.state == RunnerState.Running) {
            if ("hatch".equals(type) && this.hatchMessageIsValid(message)) {
                this.stop();
                this.state = RunnerState.Hatching;
                this.onHatchMessage(message);
                this.state = RunnerState.Running;
            } else if ("stop".equals(type)) {
                this.stop();
                if (null != Locust.getInstance().getRateLimiter()) {
                    Locust.getInstance().getRateLimiter().stop();
                }
                this.state = RunnerState.Stopped;
                Log.debug("Recv stop message from master, all the workers are stopped");
                try {
                    this.rpcClient.send(new Message("client_stopped", null, this.nodeID));
                    this.rpcClient.send(new Message("client_ready", null, this.nodeID));
                    this.state = RunnerState.Ready;
                }
                catch (IOException ex) {
                    Log.error(ex);
                }
            }
        } else if (this.state == RunnerState.Stopped && "hatch".equals(type) && this.hatchMessageIsValid(message)) {
            this.state = RunnerState.Hatching;
            this.onHatchMessage(message);
            if (null != Locust.getInstance().getRateLimiter()) {
                Locust.getInstance().getRateLimiter().start();
            }
            this.state = RunnerState.Running;
        }
    }

    public void getReady() {
        this.executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });
        this.state = RunnerState.Ready;
        this.executor.submit(new Receiver(this));
        try {
            this.rpcClient.send(new Message("client_ready", null, this.nodeID));
        }
        catch (IOException ex) {
            Log.error(ex);
        }
        this.executor.submit(new Sender(this));
        this.executor.submit(new Heartbeat(this));
    }

    private class Heartbeat
    implements Runnable {
        private static final int HEARTBEAT_INTERVAL = 1000;
        private Runner runner;

        private Heartbeat(Runner runner2) {
            this.runner = runner2;
        }

        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name + "heartbeat");
            while (true) {
                try {
                    while (true) {
                        Thread.sleep(1000L);
                        HashMap<String, Object> data = new HashMap<String, Object>(1);
                        data.put("state", this.runner.state.name().toLowerCase());
                        this.runner.rpcClient.send(new Message("heartbeat", data, this.runner.nodeID));
                    }
                }
                catch (InterruptedException ex) {
                    return;
                }
                catch (Exception ex) {
                    Log.error(ex);
                    continue;
                }
                break;
            }
        }
    }

    private class Sender
    implements Runnable {
        private Runner runner;

        private Sender(Runner runner2) {
            this.runner = runner2;
        }

        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name + "send-to-client");
            while (true) {
                try {
                    while (true) {
                        Map<String, Object> data = this.runner.stats.getMessageToRunnerQueue().take();
                        if (this.runner.state == RunnerState.Ready || this.runner.state == RunnerState.Stopped) continue;
                        data.put("user_count", this.runner.numClients);
                        this.runner.rpcClient.send(new Message("stats", data, this.runner.nodeID));
                    }
                }
                catch (InterruptedException ex) {
                    return;
                }
                catch (Exception ex) {
                    Log.error(ex);
                    continue;
                }
                break;
            }
        }
    }

    private class Receiver
    implements Runnable {
        private Runner runner;

        private Receiver(Runner runner2) {
            this.runner = runner2;
        }

        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name + "receive-from-client");
            while (true) {
                try {
                    while (true) {
                        Message message = Runner.this.rpcClient.recv();
                        this.runner.onMessage(message);
                    }
                }
                catch (Exception ex) {
                    Log.error(ex);
                    continue;
                }
                break;
            }
        }
    }
}

