/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutor
extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
    private final ScheduledExecutorService taskExecutor;
    private final ExecutorService forkExecutor;
    private final long retryIntervalInSeconds;
    private final int queuedTaskTimeMaxSize;
    private final long queuedTaskTimeMaxAge;
    private final Map<String, Long> queuedTasks = Maps.newConcurrentMap();
    private final ConcurrentSkipListMap<Long, Long> queuedTaskTimeHistorical = new ConcurrentSkipListMap();
    private long lastCalculationTime = 0L;
    private AtomicInteger queuedTaskCount = new AtomicInteger();
    private AtomicInteger currentQueuedTaskCount = new AtomicInteger();
    private AtomicInteger historicalQueuedTaskCount = new AtomicInteger();
    private AtomicLong queuedTaskTotalTime = new AtomicLong();
    private AtomicLong currentQueuedTaskTotalTime = new AtomicLong();
    private AtomicLong historicalQueuedTaskTotalTime = new AtomicLong();
    private final Counter runningTaskCount = new Counter();
    private final Meter successfulTaskCount = new Meter();
    private final Meter failedTaskCount = new Meter();
    private final Timer taskCreateAndRunTimer;
    private final TaskExecutorQueueMetricSet metricSet = new TaskExecutorQueueMetricSet();

    private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds, int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge, int timerWindowSize) {
        Preconditions.checkArgument((taskExecutorThreadPoolSize > 0 ? 1 : 0) != 0, (Object)"Task executor thread pool size should be positive");
        Preconditions.checkArgument((retryIntervalInSeconds > 0L ? 1 : 0) != 0, (Object)"Task retry interval should be positive");
        Preconditions.checkArgument((queuedTaskTimeMaxSize > 0 ? 1 : 0) != 0, (Object)"Queued task time max size should be positive");
        Preconditions.checkArgument((queuedTaskTimeMaxAge > 0L ? 1 : 0) != 0, (Object)"Queued task time max age should be positive");
        this.taskExecutor = ExecutorsUtils.loggingDecorator((ScheduledExecutorService)Executors.newScheduledThreadPool(taskExecutorThreadPoolSize, ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)LOG), (Optional)Optional.of((Object)"TaskExecutor-%d"))));
        this.retryIntervalInSeconds = retryIntervalInSeconds;
        this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
        this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
        this.taskCreateAndRunTimer = new Timer((Reservoir)new SlidingTimeWindowReservoir((long)timerWindowSize, TimeUnit.MINUTES));
        this.forkExecutor = ExecutorsUtils.loggingDecorator((ExecutorService)new ThreadPoolExecutor(taskExecutorThreadPoolSize, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)LOG), (Optional)Optional.of((Object)"ForkExecutor-%d"))));
    }

    public TaskExecutor(Properties properties) {
        this(Integer.parseInt(properties.getProperty("taskexecutor.threadpool.size", Integer.toString(2))), Integer.parseInt(properties.getProperty("taskretry.threadpool.coresize", Integer.toString(1))), Long.parseLong(properties.getProperty("task.retry.intervalinsec", Long.toString(300L))), Integer.parseInt(properties.getProperty("taskexecutor.queued_task_time.history.max_size", Integer.toString(2048))), Long.parseLong(properties.getProperty("taskexecutor.queued_task_time.history.max_age", Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))), Integer.parseInt(properties.getProperty("metrics.timer.window.size.in.minutes", Integer.toString(15))));
    }

    public TaskExecutor(Configuration conf) {
        this(conf.getInt("taskexecutor.threadpool.size", 2), conf.getInt("taskretry.threadpool.coresize", 1), conf.getLong("task.retry.intervalinsec", 300L), conf.getInt("taskexecutor.queued_task_time.history.max_size", 2048), conf.getLong("taskexecutor.queued_task_time.history.max_age", ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE), conf.getInt("metrics.timer.window.size.in.minutes", 15));
    }

    protected void startUp() throws Exception {
        LOG.info("Starting the task executor");
        if (this.taskExecutor.isShutdown() || this.taskExecutor.isTerminated()) {
            throw new IllegalStateException("Task thread pool executor is shutdown or terminated");
        }
        if (this.forkExecutor.isShutdown() || this.forkExecutor.isTerminated()) {
            throw new IllegalStateException("Fork thread pool executor is shutdown or terminated");
        }
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping the task executor");
        try {
            ExecutorsUtils.shutdownExecutorService((ExecutorService)this.taskExecutor, (Optional)Optional.of((Object)LOG));
        }
        finally {
            ExecutorsUtils.shutdownExecutorService((ExecutorService)this.forkExecutor, (Optional)Optional.of((Object)LOG));
        }
    }

    public void execute(Task task) {
        LOG.info(String.format("Executing task %s", task.getTaskId()));
        this.taskExecutor.execute(new TrackingTask(task));
    }

    public Future<?> submit(Task task) {
        LOG.info(String.format("Submitting task %s", task.getTaskId()));
        return this.taskExecutor.submit(new TrackingTask(task));
    }

    public void execute(Fork fork) {
        LOG.info(String.format("Executing fork %d of task %s", fork.getIndex(), fork.getTaskId()));
        this.forkExecutor.execute(fork);
    }

    public Future<?> submit(Fork fork) {
        LOG.info(String.format("Submitting fork %d of task %s", fork.getIndex(), fork.getTaskId()));
        return this.forkExecutor.submit(fork);
    }

    public void retry(Task task) {
        if (GobblinMetrics.isEnabled((State)task.getTaskState().getWorkunit()) && task.getTaskState().contains("fork.branches")) {
            task.getTaskState().adjustJobMetricsOnRetry(task.getTaskState().getPropAsInt("fork.branches"));
        }
        long interval = (long)task.getRetryCount() * this.retryIntervalInSeconds;
        this.taskExecutor.schedule(new TrackingTask(task, interval, TimeUnit.SECONDS), interval, TimeUnit.SECONDS);
        LOG.info(String.format("Scheduled retry of failed task %s to run in %d seconds", task.getTaskId(), interval));
        task.incrementRetryCount();
    }

    public MetricSet getTaskExecutorQueueMetricSet() {
        return this.metricSet;
    }

    private synchronized void calculateMetrics() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastCalculationTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10L)) {
            LOG.debug("Starting metric calculation.");
            int currentQueuedTaskCount = 0;
            int futureQueuedTaskCount = 0;
            long currentQueuedTaskTotalTime = 0L;
            for (Map.Entry<String, Long> queuedTask : this.queuedTasks.entrySet()) {
                if (queuedTask.getValue() <= currentTimeMillis) {
                    ++currentQueuedTaskCount;
                    long currentQueuedTaskTime = currentTimeMillis - queuedTask.getValue();
                    currentQueuedTaskTotalTime += currentQueuedTaskTime;
                    LOG.debug(String.format("Task %s has been waiting in the queue for %d ms.", queuedTask.getKey(), currentQueuedTaskTime));
                    continue;
                }
                ++futureQueuedTaskCount;
            }
            if (futureQueuedTaskCount > 0) {
                LOG.debug(String.format("%d tasks were ignored during metric calculations because they are scheduled to run in the future.", futureQueuedTaskCount));
            }
            this.currentQueuedTaskCount.set(currentQueuedTaskCount);
            this.currentQueuedTaskTotalTime.set(currentQueuedTaskTotalTime);
            LOG.debug(String.format("%d current tasks have been waiting for a total of %d ms.", currentQueuedTaskCount, currentQueuedTaskTotalTime));
            int historicalQueuedTaskCount = 0;
            long historicalQueuedTaskTotalTime = 0L;
            long cutoff = currentTimeMillis - this.queuedTaskTimeMaxAge;
            Iterator iterator = this.queuedTaskTimeHistorical.descendingMap().entrySet().iterator();
            while (iterator.hasNext()) {
                try {
                    Map.Entry historicalQueuedTask = iterator.next();
                    if ((Long)historicalQueuedTask.getKey() < cutoff || historicalQueuedTaskCount >= this.queuedTaskTimeMaxSize) {
                        LOG.debug(String.format("Task started at %d is before the cutoff of %d and is being removed. Queue time %d will be removed from metric calculations.", historicalQueuedTask.getKey(), cutoff, historicalQueuedTask.getValue()));
                        iterator.remove();
                        continue;
                    }
                    ++historicalQueuedTaskCount;
                    historicalQueuedTaskTotalTime += ((Long)historicalQueuedTask.getValue()).longValue();
                    LOG.debug(String.format("Task started at %d is after cutoff. Queue time %d will be used in metric calculations.", historicalQueuedTask.getKey(), historicalQueuedTask.getValue()));
                }
                catch (NoSuchElementException e) {
                    LOG.warn("Ran out of items in historical task queue time set.");
                }
            }
            this.historicalQueuedTaskCount.set(historicalQueuedTaskCount);
            this.historicalQueuedTaskTotalTime.set(historicalQueuedTaskTotalTime);
            LOG.debug(String.format("%d historical tasks have been waiting for a total of %d ms.", historicalQueuedTaskCount, historicalQueuedTaskTotalTime));
            int totalQueuedTaskCount = currentQueuedTaskCount + historicalQueuedTaskCount;
            long totalQueuedTaskTime = currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime;
            this.queuedTaskCount.set(totalQueuedTaskCount);
            this.queuedTaskTotalTime.set(totalQueuedTaskTime);
            LOG.debug(String.format("%d tasks have been waiting for a total of %d ms.", totalQueuedTaskCount, totalQueuedTaskTime));
            this.lastCalculationTime = currentTimeMillis;
            LOG.debug("Finished metric calculation.");
        } else {
            LOG.debug("Skipped metric calculation because not enough time has elapsed since the last calculation.");
        }
    }

    public ExecutorService getForkExecutor() {
        return this.forkExecutor;
    }

    public long getRetryIntervalInSeconds() {
        return this.retryIntervalInSeconds;
    }

    public int getQueuedTaskTimeMaxSize() {
        return this.queuedTaskTimeMaxSize;
    }

    public long getQueuedTaskTimeMaxAge() {
        return this.queuedTaskTimeMaxAge;
    }

    public AtomicInteger getQueuedTaskCount() {
        return this.queuedTaskCount;
    }

    public AtomicInteger getCurrentQueuedTaskCount() {
        return this.currentQueuedTaskCount;
    }

    public AtomicInteger getHistoricalQueuedTaskCount() {
        return this.historicalQueuedTaskCount;
    }

    public AtomicLong getQueuedTaskTotalTime() {
        return this.queuedTaskTotalTime;
    }

    public AtomicLong getCurrentQueuedTaskTotalTime() {
        return this.currentQueuedTaskTotalTime;
    }

    public AtomicLong getHistoricalQueuedTaskTotalTime() {
        return this.historicalQueuedTaskTotalTime;
    }

    public Counter getRunningTaskCount() {
        return this.runningTaskCount;
    }

    public Meter getSuccessfulTaskCount() {
        return this.successfulTaskCount;
    }

    public Meter getFailedTaskCount() {
        return this.failedTaskCount;
    }

    public Timer getTaskCreateAndRunTimer() {
        return this.taskCreateAndRunTimer;
    }

    private class TrackingTask
    implements Runnable {
        private Task underlyingTask;

        public TrackingTask(Task task) {
            this(task, 0L, TimeUnit.SECONDS);
        }

        public TrackingTask(Task task, long interval, TimeUnit timeUnit) {
            long now = System.currentTimeMillis();
            long timeToRun = now + timeUnit.toMillis(interval);
            LOG.debug(String.format("Task %s queued to run %s.", task.getTaskId(), timeToRun <= now ? "now" : "at " + timeToRun));
            TaskExecutor.this.queuedTasks.putIfAbsent(task.getTaskId(), timeToRun);
            this.underlyingTask = task;
        }

        @Override
        public void run() {
            long startTime = System.currentTimeMillis();
            this.onStart(startTime);
            try {
                this.underlyingTask.run();
                TaskExecutor.this.successfulTaskCount.mark();
            }
            catch (Exception e) {
                TaskExecutor.this.failedTaskCount.mark();
                LOG.error(String.format("Task %s failed", this.underlyingTask.getTaskId()), (Throwable)e);
                throw e;
            }
            finally {
                TaskExecutor.this.runningTaskCount.dec();
            }
        }

        private void onStart(long startTime) {
            Long queueTime = (Long)TaskExecutor.this.queuedTasks.remove(this.underlyingTask.getTaskId());
            long workUnitCreationTime = this.underlyingTask.getTaskContext().getTaskState().getPropAsLong("workunit.creation.time.in.millis", 0L);
            long timeInQueue = startTime - queueTime;
            long timeSinceWorkUnitCreation = startTime - workUnitCreationTime;
            TaskExecutor.this.taskCreateAndRunTimer.update(timeSinceWorkUnitCreation, TimeUnit.MILLISECONDS);
            LOG.debug(String.format("Task %s started. Saving queued time of %d ms to history.", this.underlyingTask.getTaskId(), timeInQueue));
            TaskExecutor.this.queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), timeInQueue);
            TaskExecutor.this.runningTaskCount.inc();
        }
    }

    private class TaskExecutorQueueMetricSet
    implements MetricSet {
        private TaskExecutorQueueMetricSet() {
        }

        public Map<String, Metric> getMetrics() {
            HashMap<String, Object> metrics = new HashMap<String, Object>();
            metrics.put(MetricRegistry.name((String)"queued", (String[])new String[]{"current", "count"}), new Gauge<Integer>(){

                public Integer getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return TaskExecutor.this.currentQueuedTaskCount.intValue();
                }
            });
            metrics.put(MetricRegistry.name((String)"queued", (String[])new String[]{"historical", "count"}), new Gauge<Integer>(){

                public Integer getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return TaskExecutor.this.historicalQueuedTaskCount.intValue();
                }
            });
            metrics.put(MetricRegistry.name((String)"queued", (String[])new String[]{"count"}), new Gauge<Integer>(){

                public Integer getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return TaskExecutor.this.queuedTaskCount.intValue();
                }
            });
            metrics.put(MetricRegistry.name((String)"queued", (String[])new String[]{"current", "time", "total"}), new Gauge<Long>(){

                public Long getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return TaskExecutor.this.currentQueuedTaskTotalTime.longValue();
                }
            });
            metrics.put(MetricRegistry.name((String)"queued", (String[])new String[]{"historical", "time", "total"}), new Gauge<Long>(){

                public Long getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return TaskExecutor.this.historicalQueuedTaskTotalTime.longValue();
                }
            });
            metrics.put(MetricRegistry.name((String)"queued", (String[])new String[]{"time", "total"}), new Gauge<Long>(){

                public Long getValue() {
                    TaskExecutor.this.calculateMetrics();
                    return TaskExecutor.this.queuedTaskTotalTime.longValue();
                }
            });
            metrics.put(MetricRegistry.name((String)"running", (String[])new String[]{"count"}), TaskExecutor.this.runningTaskCount);
            metrics.put(MetricRegistry.name((String)"successful", (String[])new String[]{"count"}), TaskExecutor.this.successfulTaskCount);
            metrics.put(MetricRegistry.name((String)"failed", (String[])new String[]{"count"}), TaskExecutor.this.failedTaskCount);
            return Collections.unmodifiableMap(metrics);
        }
    }
}

