/*
 * Decompiled with CFR 0.152.
 */
package com.github.myzhan.locust4j;

import com.github.myzhan.locust4j.AbstractTask;
import com.github.myzhan.locust4j.Locust;
import com.github.myzhan.locust4j.Log;
import com.github.myzhan.locust4j.Message;
import com.github.myzhan.locust4j.Queues;
import com.github.myzhan.locust4j.State;
import com.github.myzhan.locust4j.Stats;
import com.github.myzhan.locust4j.Utils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Runner {
    protected String nodeID;
    protected int numClients = 0;
    private State state;
    private List<AbstractTask> tasks;
    private int hatchRate = 0;
    private ExecutorService executor;
    private AtomicInteger threadNumber = new AtomicInteger();

    private Runner() {
        this.nodeID = Utils.getNodeID();
    }

    public static Runner getInstance() {
        return RunnerInstanceHolder.RUNNER;
    }

    protected State getState() {
        return this.state;
    }

    protected void setTasks(List<AbstractTask> tasks) {
        this.tasks = tasks;
    }

    private void spawnWorkers(int spawnCount) {
        Log.debug(String.format("Hatching and swarming %d clients at the rate %d clients/s...", spawnCount, this.hatchRate));
        float weightSum = 0.0f;
        for (AbstractTask task : this.tasks) {
            weightSum += (float)task.getWeight();
        }
        for (AbstractTask task : this.tasks) {
            float percent = 0.0f == weightSum ? 1.0f / (float)this.tasks.size() : (float)task.getWeight() / weightSum;
            int amount = Math.round((float)spawnCount * percent);
            if (weightSum == 0.0f) {
                amount = spawnCount / this.tasks.size();
            }
            Log.debug(String.format("Allocating %d threads to task, which name is %s", amount, task.getName()));
            for (int i = 1; i <= amount; ++i) {
                if (i % this.hatchRate == 0) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (Exception ex) {
                        Log.error(ex.getMessage());
                    }
                }
                ++this.numClients;
                this.executor.submit(task);
            }
        }
        this.hatchComplete();
    }

    protected void startHatching(int spawnCount, int hatchRate) {
        if (this.state != State.Running && this.state != State.Hatching) {
            Queues.CLEAR_STATS.offer(true);
            Stats.getInstance().wakeMeUp();
        }
        if (this.state == State.Running) {
            this.shutdownThreadPool();
        }
        this.state = State.Hatching;
        this.hatchRate = hatchRate;
        this.numClients = 0;
        this.threadNumber.set(0);
        this.executor = new ThreadPoolExecutor(this.numClients, spawnCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("locust4j-worker#" + Runner.this.threadNumber.getAndIncrement());
                return thread;
            }
        });
        this.spawnWorkers(spawnCount);
    }

    protected void hatchComplete() {
        HashMap<String, Integer> data = new HashMap<String, Integer>(1);
        data.put("count", this.numClients);
        Queues.MESSAGE_TO_MASTER.add(new Message("hatch_complete", data, this.nodeID));
        this.state = State.Running;
    }

    protected void quit() {
        Queues.MESSAGE_TO_MASTER.add(new Message("quit", null, this.nodeID));
    }

    private void shutdownThreadPool() {
        this.executor.shutdownNow();
        this.state = State.Stopped;
        try {
            this.executor.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            Log.error(ex.getMessage());
        }
        this.executor = null;
    }

    protected void stop() {
        if (this.state == State.Running) {
            this.shutdownThreadPool();
            Log.debug("Recv stop message from master, all the workers are stopped");
        }
    }

    public void getReady() {
        this.state = State.Ready;
        Locust.getInstance().submitToCoreThreadPool(new Receiver(this));
        Queues.MESSAGE_TO_MASTER.add(new Message("client_ready", null, this.nodeID));
        Locust.getInstance().submitToCoreThreadPool(new Sender(this));
    }

    private class Sender
    implements Runnable {
        private Runner runner;

        protected Sender(Runner runner2) {
            this.runner = runner2;
        }

        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name + "send-to-client");
            while (true) {
                try {
                    while (true) {
                        Map data = Queues.REPORT_TO_RUNNER.take();
                        data.put("user_count", this.runner.numClients);
                        Queues.MESSAGE_TO_MASTER.add(new Message("stats", data, this.runner.nodeID));
                    }
                }
                catch (Exception ex) {
                    Log.error(ex);
                    continue;
                }
                break;
            }
        }
    }

    private class Receiver
    implements Runnable {
        private Runner runner;

        protected Receiver(Runner runner2) {
            this.runner = runner2;
        }

        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name + "receive-from-client");
            while (true) {
                try {
                    while (true) {
                        Message message;
                        String type;
                        if ("hatch".equals(type = (message = Queues.MESSAGE_FROM_MASTER.take()).getType())) {
                            Queues.MESSAGE_TO_MASTER.add(new Message("hatching", null, this.runner.nodeID));
                            Float hatchRate = Float.valueOf(message.getData().get("hatch_rate").toString());
                            int numClients = Integer.valueOf(message.getData().get("num_clients").toString());
                            if (hatchRate.intValue() == 0 || numClients == 0) {
                                System.out.println(String.format("Invalid message (hatch_rate: %d, num_clients: %d) from master, ignored.", hatchRate.intValue(), numClients));
                                continue;
                            }
                            this.runner.startHatching(numClients, hatchRate.intValue());
                            continue;
                        }
                        if ("stop".equals(type)) {
                            this.runner.stop();
                            Queues.MESSAGE_TO_MASTER.add(new Message("client_stopped", null, this.runner.nodeID));
                            Queues.MESSAGE_TO_MASTER.add(new Message("client_ready", null, this.runner.nodeID));
                            continue;
                        }
                        if (!"quit".equals(type)) continue;
                        Log.debug("Got quit message from master, shutting down...");
                        System.exit(0);
                    }
                }
                catch (Exception ex) {
                    Log.error(ex);
                    continue;
                }
                break;
            }
        }
    }

    private static class RunnerInstanceHolder {
        private static final Runner RUNNER = new Runner();

        private RunnerInstanceHolder() {
        }
    }
}

