/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.queue;

import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=60L)
public class KafkaEventQueueTest {
    private static final long ONE_HOUR_NS = TimeUnit.NANOSECONDS.convert(1L, TimeUnit.HOURS);

    @Test
    public void testCreateAndClose() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testCreateAndClose");
        queue.close();
    }

    @Test
    public void testHandleEvents() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testHandleEvents");
        AtomicInteger numEventsExecuted = new AtomicInteger(0);
        CompletableFuture future1 = new CompletableFuture();
        queue.prepend(new FutureEvent<Integer>(future1, () -> {
            Assertions.assertEquals((int)1, (int)numEventsExecuted.incrementAndGet());
            return 1;
        }));
        CompletableFuture future2 = new CompletableFuture();
        queue.appendWithDeadline(Time.SYSTEM.nanoseconds() + TimeUnit.SECONDS.toNanos(60L), new FutureEvent<Integer>(future2, () -> {
            Assertions.assertEquals((int)2, (int)numEventsExecuted.incrementAndGet());
            return 2;
        }));
        CompletableFuture future3 = new CompletableFuture();
        queue.append(new FutureEvent<Integer>(future3, () -> {
            Assertions.assertEquals((int)3, (int)numEventsExecuted.incrementAndGet());
            return 3;
        }));
        Assertions.assertEquals((Integer)1, (Integer)((Integer)future1.get()));
        Assertions.assertEquals((Integer)3, (Integer)((Integer)future3.get()));
        Assertions.assertEquals((Integer)2, (Integer)((Integer)future2.get()));
        CompletableFuture future4 = new CompletableFuture();
        queue.appendWithDeadline(Time.SYSTEM.nanoseconds() + TimeUnit.SECONDS.toNanos(60L), new FutureEvent<Integer>(future4, () -> {
            Assertions.assertEquals((int)4, (int)numEventsExecuted.incrementAndGet());
            return 4;
        }));
        future4.get();
        queue.beginShutdown("testHandleEvents");
        queue.close();
    }

    @Test
    public void testTimeouts() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testTimeouts");
        AtomicInteger numEventsExecuted = new AtomicInteger(0);
        CompletableFuture future1 = new CompletableFuture();
        queue.append(new FutureEvent<Integer>(future1, () -> {
            Assertions.assertEquals((int)1, (int)numEventsExecuted.incrementAndGet());
            return 1;
        }));
        CompletableFuture future2 = new CompletableFuture();
        queue.append(new FutureEvent<Integer>(future2, () -> {
            Assertions.assertEquals((int)2, (int)numEventsExecuted.incrementAndGet());
            Time.SYSTEM.sleep(1L);
            return 2;
        }));
        CompletableFuture future3 = new CompletableFuture();
        queue.appendWithDeadline(Time.SYSTEM.nanoseconds() + 1L, new FutureEvent<Integer>(future3, () -> {
            numEventsExecuted.incrementAndGet();
            return 3;
        }));
        CompletableFuture future4 = new CompletableFuture();
        queue.append(new FutureEvent<Integer>(future4, () -> {
            numEventsExecuted.incrementAndGet();
            return 4;
        }));
        Assertions.assertEquals((Integer)1, (Integer)((Integer)future1.get()));
        Assertions.assertEquals((Integer)2, (Integer)((Integer)future2.get()));
        Assertions.assertEquals((Integer)4, (Integer)((Integer)future4.get()));
        Assertions.assertEquals(TimeoutException.class, ((ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Integer cfr_ignored_0 = (Integer)future3.get();
        })).getCause().getClass());
        queue.close();
        Assertions.assertEquals((int)3, (int)numEventsExecuted.get());
    }

    @Test
    public void testScheduleDeferred() throws Exception {
        CompletableFuture future1;
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testAppendDeferred");
        AtomicLong counter = new AtomicLong(0L);
        do {
            counter.addAndGet(1L);
            future1 = new CompletableFuture();
            queue.scheduleDeferred(null, __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + 1000000L), new FutureEvent<Boolean>(future1, () -> counter.get() % 2L == 0L));
            CompletableFuture future2 = new CompletableFuture();
            queue.append(new FutureEvent<Long>(future2, () -> counter.addAndGet(1L)));
            future2.get();
        } while (!((Boolean)future1.get()).booleanValue());
        queue.close();
    }

    @Test
    public void testScheduleDeferredWithTagReplacement() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testScheduleDeferredWithTagReplacement");
        AtomicInteger ai = new AtomicInteger(0);
        CompletableFuture future1 = new CompletableFuture();
        queue.scheduleDeferred("foo", __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + ONE_HOUR_NS), new FutureEvent<Integer>(future1, () -> ai.addAndGet(1000)));
        CompletableFuture future2 = new CompletableFuture();
        queue.scheduleDeferred("foo", prev -> OptionalLong.of(prev.orElse(0L) - ONE_HOUR_NS), new FutureEvent<Integer>(future2, () -> ai.addAndGet(1)));
        Assertions.assertFalse((boolean)future1.isDone());
        Assertions.assertEquals((Integer)1, (Integer)((Integer)future2.get()));
        Assertions.assertEquals((int)1, (int)ai.get());
        queue.close();
    }

    @Test
    public void testDeferredIsQueuedAfterTriggering() throws Exception {
        MockTime time = new MockTime(0L, 100000L, 1L);
        KafkaEventQueue queue = new KafkaEventQueue((Time)time, new LogContext(), "testDeferredIsQueuedAfterTriggering");
        AtomicInteger count = new AtomicInteger(0);
        List<CompletableFuture> futures = Arrays.asList(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
        queue.scheduleDeferred("foo", __ -> OptionalLong.of(2L), new FutureEvent<Integer>(futures.get(0), () -> count.getAndIncrement()));
        queue.append(new FutureEvent<Integer>(futures.get(1), () -> count.getAndAdd(1)));
        Assertions.assertEquals((Integer)0, (Integer)((Integer)futures.get(1).get()));
        time.sleep(1L);
        queue.append(new FutureEvent<Integer>(futures.get(2), () -> count.getAndAdd(1)));
        Assertions.assertEquals((Integer)1, (Integer)((Integer)futures.get(0).get()));
        Assertions.assertEquals((Integer)2, (Integer)((Integer)futures.get(2).get()));
        queue.close();
    }

    @Test
    public void testShutdownBeforeDeferred() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testShutdownBeforeDeferred");
        AtomicInteger count = new AtomicInteger(0);
        CompletableFuture future = new CompletableFuture();
        queue.scheduleDeferred("myDeferred", __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(1L)), new FutureEvent<Integer>(future, () -> count.getAndAdd(1)));
        queue.beginShutdown("testShutdownBeforeDeferred");
        Assertions.assertEquals(RejectedExecutionException.class, ((ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Integer cfr_ignored_0 = (Integer)future.get();
        })).getCause().getClass());
        Assertions.assertEquals((int)0, (int)count.get());
        queue.close();
    }

    @Test
    public void testRejectedExecutionException() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testRejectedExecutionException");
        queue.close();
        final CompletableFuture future = new CompletableFuture();
        queue.append(new EventQueue.Event(){

            public void run() throws Exception {
                future.complete(null);
            }

            public void handleException(Throwable e) {
                future.completeExceptionally(e);
            }
        });
        Assertions.assertEquals(RejectedExecutionException.class, ((ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)future.get();
        })).getCause().getClass());
    }

    @Test
    public void testSize() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testEmpty");
        Assertions.assertTrue((boolean)queue.isEmpty());
        CompletableFuture<Object> future = new CompletableFuture<Object>();
        queue.append(() -> {
            Void cfr_ignored_0 = (Void)future.get();
        });
        Assertions.assertFalse((boolean)queue.isEmpty());
        Assertions.assertEquals((int)1, (int)queue.size());
        queue.append(() -> {
            Void cfr_ignored_0 = (Void)future.get();
        });
        Assertions.assertEquals((int)2, (int)queue.size());
        future.complete(null);
        TestUtils.waitForCondition(() -> queue.isEmpty(), (String)"Failed to see the queue become empty.");
        queue.scheduleDeferred("later", __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(1L)), () -> {});
        Assertions.assertFalse((boolean)queue.isEmpty());
        queue.scheduleDeferred("soon", __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(1L)), () -> {});
        Assertions.assertFalse((boolean)queue.isEmpty());
        queue.cancelDeferred("later");
        queue.cancelDeferred("soon");
        TestUtils.waitForCondition(() -> queue.isEmpty(), (String)"Failed to see the queue become empty.");
        queue.close();
        Assertions.assertTrue((boolean)queue.isEmpty());
    }

    @Test
    public void testHandleExceptionThrowingAnException() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testHandleExceptionThrowingAnException");
        CompletableFuture<Object> initialFuture = new CompletableFuture<Object>();
        queue.append(() -> {
            Void cfr_ignored_0 = (Void)initialFuture.get();
        });
        final AtomicInteger counter = new AtomicInteger(0);
        queue.append(new EventQueue.Event(){

            public void run() throws Exception {
                counter.incrementAndGet();
                throw new IllegalStateException("First exception");
            }

            public void handleException(Throwable e) {
                if (e instanceof IllegalStateException) {
                    counter.incrementAndGet();
                    throw new RuntimeException("Second exception");
                }
            }
        });
        queue.append(() -> counter.incrementAndGet());
        Assertions.assertEquals((int)3, (int)queue.size());
        initialFuture.complete(null);
        TestUtils.waitForCondition(() -> counter.get() == 3, (String)"Failed to see all events execute as planned.");
        queue.close();
    }

    @Test
    public void testInterruptedExceptionHandling() throws Exception {
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testInterruptedExceptionHandling");
        CompletableFuture<Thread> queueThread = new CompletableFuture<Thread>();
        AtomicInteger numCallsToRun = new AtomicInteger(0);
        AtomicInteger numInterruptedExceptionsSeen = new AtomicInteger(0);
        queue.append((EventQueue.Event)new InterruptibleEvent(queueThread, numCallsToRun, numInterruptedExceptionsSeen));
        queue.append((EventQueue.Event)new InterruptibleEvent(queueThread, numCallsToRun, numInterruptedExceptionsSeen));
        queue.append((EventQueue.Event)new InterruptibleEvent(queueThread, numCallsToRun, numInterruptedExceptionsSeen));
        queue.append((EventQueue.Event)new InterruptibleEvent(queueThread, numCallsToRun, numInterruptedExceptionsSeen));
        queueThread.get().interrupt();
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertEquals((int)1, (int)numCallsToRun.get()));
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertEquals((int)3, (int)numInterruptedExceptionsSeen.get()));
        queue.close();
    }

    @Test
    public void testInterruptedWithEmptyQueue() throws Exception {
        CompletableFuture cleanupFuture = new CompletableFuture();
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testInterruptedWithEmptyQueue", () -> cleanupFuture.complete(null));
        CompletableFuture queueThread = new CompletableFuture();
        queue.append(() -> queueThread.complete(Thread.currentThread()));
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertEquals((int)0, (int)queue.size()));
        ((Thread)queueThread.get()).interrupt();
        cleanupFuture.get();
        ExceptionTrapperEvent ieTrapper = new ExceptionTrapperEvent();
        queue.append((EventQueue.Event)ieTrapper);
        Assertions.assertEquals(InterruptedException.class, ieTrapper.exception.get().getClass());
        queue.close();
        ExceptionTrapperEvent reTrapper = new ExceptionTrapperEvent();
        queue.append((EventQueue.Event)reTrapper);
        Assertions.assertEquals(RejectedExecutionException.class, reTrapper.exception.get().getClass());
    }

    @Test
    public void testInterruptedWithDeferredEvents() throws Exception {
        CompletableFuture cleanupFuture = new CompletableFuture();
        KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testInterruptedWithDeferredEvents", () -> cleanupFuture.complete(null));
        CompletableFuture queueThread = new CompletableFuture();
        queue.append(() -> queueThread.complete(Thread.currentThread()));
        ExceptionTrapperEvent ieTrapper1 = new ExceptionTrapperEvent();
        ExceptionTrapperEvent ieTrapper2 = new ExceptionTrapperEvent();
        queue.scheduleDeferred("ie2", __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(2L)), (EventQueue.Event)ieTrapper2);
        queue.scheduleDeferred("ie1", __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(1L)), (EventQueue.Event)ieTrapper1);
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertEquals((int)2, (int)queue.size()));
        ((Thread)queueThread.get()).interrupt();
        cleanupFuture.get();
        Assertions.assertEquals(InterruptedException.class, ieTrapper1.exception.get().getClass());
        Assertions.assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
        queue.close();
    }

    static class ExceptionTrapperEvent
    implements EventQueue.Event {
        final CompletableFuture<Throwable> exception = new CompletableFuture();

        ExceptionTrapperEvent() {
        }

        public void run() throws Exception {
            this.exception.complete(null);
        }

        public void handleException(Throwable e) {
            this.exception.complete(e);
        }
    }

    private static class InterruptibleEvent
    implements EventQueue.Event {
        private final CompletableFuture<Void> runFuture = new CompletableFuture();
        private final CompletableFuture<Thread> queueThread;
        private final AtomicInteger numCallsToRun;
        private final AtomicInteger numInterruptedExceptionsSeen;

        InterruptibleEvent(CompletableFuture<Thread> queueThread, AtomicInteger numCallsToRun, AtomicInteger numInterruptedExceptionsSeen) {
            this.queueThread = queueThread;
            this.numCallsToRun = numCallsToRun;
            this.numInterruptedExceptionsSeen = numInterruptedExceptionsSeen;
        }

        public void run() throws Exception {
            this.numCallsToRun.incrementAndGet();
            this.queueThread.complete(Thread.currentThread());
            this.runFuture.get();
        }

        public void handleException(Throwable e) {
            if (e instanceof InterruptedException) {
                this.numInterruptedExceptionsSeen.incrementAndGet();
                Thread.currentThread().interrupt();
            }
        }
    }

    private static class FutureEvent<T>
    implements EventQueue.Event {
        private final CompletableFuture<T> future;
        private final Supplier<T> supplier;

        FutureEvent(CompletableFuture<T> future, Supplier<T> supplier) {
            this.future = future;
            this.supplier = supplier;
        }

        public void run() throws Exception {
            T value = this.supplier.get();
            this.future.complete(value);
        }

        public void handleException(Throwable e) {
            this.future.completeExceptionally(e);
        }
    }
}

