/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.testutils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.core.testutils.ScheduledTask;

public class ManuallyTriggeredScheduledExecutorService
implements ScheduledExecutorService {
    private final BlockingQueue<Runnable> queuedRunnables = new LinkedBlockingQueue<Runnable>();
    private final ConcurrentLinkedQueue<ScheduledTask<?>> nonPeriodicScheduledTasks = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<ScheduledTask<?>> periodicScheduledTasks = new ConcurrentLinkedQueue();
    private boolean shutdown;

    @Override
    public void execute(@Nonnull Runnable command) {
        this.queuedRunnables.add(command);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return this.insertNonPeriodicTask(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return this.insertNonPeriodicTask(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return this.insertPeriodicRunnable(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return this.insertPeriodicRunnable(command, initialDelay, delay, unit);
    }

    @Override
    public void shutdown() {
        this.shutdown = true;
    }

    @Override
    public List<Runnable> shutdownNow() {
        this.shutdown();
        return Collections.emptyList();
    }

    @Override
    public boolean isShutdown() {
        return false;
    }

    @Override
    public boolean isTerminated() {
        return this.shutdown;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return true;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        throw new UnsupportedOperationException();
    }

    @Override
    public Future<?> submit(Runnable task) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    public void triggerAllNonPeriodicTasks() {
        while (this.numQueuedRunnables() > 0 || !this.nonPeriodicScheduledTasks.isEmpty()) {
            this.triggerAll();
            this.triggerNonPeriodicScheduledTasks();
        }
    }

    public void triggerAll() {
        while (this.numQueuedRunnables() > 0) {
            this.trigger();
        }
    }

    public void trigger(Duration timeout) {
        try {
            Runnable task = this.queuedRunnables.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
            if (task == null) {
                throw new IllegalStateException("No task was scheduled.");
            }
            task.run();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void trigger() {
        this.trigger(Duration.ZERO);
    }

    public int numQueuedRunnables() {
        return this.queuedRunnables.size();
    }

    public Collection<ScheduledFuture<?>> getActiveScheduledTasks() {
        ArrayList scheduledTasks = new ArrayList(this.nonPeriodicScheduledTasks.size() + this.periodicScheduledTasks.size());
        scheduledTasks.addAll(this.getActiveNonPeriodicScheduledTask());
        scheduledTasks.addAll(this.getActivePeriodicScheduledTask());
        return scheduledTasks;
    }

    public Collection<ScheduledFuture<?>> getActivePeriodicScheduledTask() {
        return this.periodicScheduledTasks.stream().filter(scheduledTask -> !scheduledTask.isCancelled()).collect(Collectors.toList());
    }

    public Collection<ScheduledFuture<?>> getActiveNonPeriodicScheduledTask() {
        return this.nonPeriodicScheduledTasks.stream().filter(scheduledTask -> !scheduledTask.isCancelled()).collect(Collectors.toList());
    }

    public List<ScheduledFuture<?>> getAllScheduledTasks() {
        ArrayList scheduledTasks = new ArrayList(this.nonPeriodicScheduledTasks.size() + this.periodicScheduledTasks.size());
        scheduledTasks.addAll(this.getAllNonPeriodicScheduledTask());
        scheduledTasks.addAll(this.getAllPeriodicScheduledTask());
        return scheduledTasks;
    }

    public List<ScheduledFuture<?>> getAllPeriodicScheduledTask() {
        return new ArrayList(this.periodicScheduledTasks);
    }

    public List<ScheduledFuture<?>> getAllNonPeriodicScheduledTask() {
        return new ArrayList(this.nonPeriodicScheduledTasks);
    }

    public void triggerScheduledTasks() {
        this.triggerPeriodicScheduledTasks();
        this.triggerNonPeriodicScheduledTasks();
    }

    public void triggerNonPeriodicScheduledTask() {
        ScheduledTask poll = (ScheduledTask)this.nonPeriodicScheduledTasks.remove();
        if (poll != null) {
            poll.execute();
        }
    }

    public void triggerNonPeriodicScheduledTasksWithRecursion() {
        while (!this.nonPeriodicScheduledTasks.isEmpty()) {
            ScheduledTask<?> scheduledTask = this.nonPeriodicScheduledTasks.poll();
            if (scheduledTask.isCancelled()) continue;
            scheduledTask.execute();
        }
    }

    public void triggerNonPeriodicScheduledTasks() {
        Iterator<ScheduledTask<?>> iterator = this.nonPeriodicScheduledTasks.iterator();
        while (iterator.hasNext()) {
            ScheduledTask<?> scheduledTask = iterator.next();
            if (!scheduledTask.isCancelled()) {
                scheduledTask.execute();
            }
            iterator.remove();
        }
    }

    public void triggerNonPeriodicScheduledTasks(Class<?> taskClazz) {
        int numTasksBeforeTrigger = this.nonPeriodicScheduledTasks.size();
        Iterator<ScheduledTask<?>> iterator = this.nonPeriodicScheduledTasks.iterator();
        for (int i = 0; i < numTasksBeforeTrigger; ++i) {
            ScheduledTask<?> scheduledTask = iterator.next();
            Callable<?> callable = scheduledTask.getCallable();
            if (!(callable instanceof RunnableCaller) || !((RunnableCaller)callable).command.getClass().equals(taskClazz)) continue;
            if (!scheduledTask.isCancelled()) {
                scheduledTask.execute();
            }
            iterator.remove();
        }
    }

    public void triggerPeriodicScheduledTasks() {
        for (ScheduledTask<?> scheduledTask : this.periodicScheduledTasks) {
            if (scheduledTask.isCancelled()) continue;
            scheduledTask.execute();
        }
    }

    private ScheduledFuture<?> insertPeriodicRunnable(Runnable command, long delay, long period, TimeUnit unit) {
        ScheduledTask<Object> scheduledTask = new ScheduledTask<Object>(() -> {
            command.run();
            return null;
        }, unit.convert(delay, TimeUnit.MILLISECONDS), unit.convert(period, TimeUnit.MILLISECONDS));
        this.periodicScheduledTasks.offer(scheduledTask);
        return scheduledTask;
    }

    private ScheduledFuture<?> insertNonPeriodicTask(Runnable command, long delay, TimeUnit unit) {
        return this.insertNonPeriodicTask(new RunnableCaller(command), delay, unit);
    }

    private <V> ScheduledFuture<V> insertNonPeriodicTask(Callable<V> callable, long delay, TimeUnit unit) {
        ScheduledTask<V> scheduledTask = new ScheduledTask<V>(callable, unit.convert(delay, TimeUnit.MILLISECONDS));
        if (this.shutdown) {
            throw new RejectedExecutionException();
        }
        this.nonPeriodicScheduledTasks.offer(scheduledTask);
        return scheduledTask;
    }

    public static class RunnableCaller<T>
    implements Callable<T> {
        public final Runnable command;

        private RunnableCaller(Runnable command) {
            this.command = command;
        }

        @Override
        public T call() {
            this.command.run();
            return null;
        }
    }
}

