/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.test;

import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.test.JetAssert;
import com.hazelcast.jet.core.test.TestInbox;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.core.test.TestProcessorMetaSupplierContext;
import com.hazelcast.jet.core.test.TestProcessorSupplierContext;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingServiceImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public final class TestSupport {
    private static final Address LOCAL_ADDRESS;
    private static final long COOPERATIVE_TIME_LIMIT_MS_FAIL = 1000L;
    private static final long COOPERATIVE_TIME_LIMIT_MS_WARN = 5L;
    private static final long BLOCKING_TIME_LIMIT_MS_WARN = 10000L;
    private static final LoggingServiceImpl LOGGING_SERVICE;
    private Supplier<Processor> supplier;
    private List<?> input = Collections.emptyList();
    private List<?> expectedOutput = Collections.emptyList();
    private boolean assertProgress = true;
    private boolean doSnapshots = true;
    private boolean logInputOutput = true;
    private boolean callComplete = true;
    private long cooperativeTimeout = 1000L;
    private long runUntilCompletedTimeout;
    private BiPredicate<? super List<?>, ? super List<?>> outputChecker = Objects::equals;

    private TestSupport(@Nonnull Supplier<Processor> supplier) {
        this.supplier = supplier;
    }

    public static TestSupport verifyProcessor(Processor processor) {
        return new TestSupport(TestSupport.singletonSupplier(processor)).disableSnapshots();
    }

    public static TestSupport verifyProcessor(@Nonnull Supplier<Processor> supplier) {
        return new TestSupport(supplier);
    }

    public static TestSupport verifyProcessor(@Nonnull ProcessorSupplier supplier) {
        return new TestSupport(TestSupport.supplierFrom(supplier));
    }

    public static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier supplier) {
        return new TestSupport(TestSupport.supplierFrom(supplier));
    }

    public TestSupport input(@Nonnull List<?> input) {
        this.input = input;
        return this;
    }

    public void expectOutput(@Nonnull List<?> expectedOutput) {
        this.expectedOutput = expectedOutput;
        this.runTest(this.doSnapshots, this.doSnapshots);
    }

    public TestSupport disableProgressAssertion() {
        this.assertProgress = false;
        return this;
    }

    public TestSupport disableRunUntilCompleted(long timeoutMillis) {
        this.runUntilCompletedTimeout = timeoutMillis;
        return this;
    }

    public TestSupport disableSnapshots() {
        this.doSnapshots = false;
        return this;
    }

    public TestSupport disableLogging() {
        this.logInputOutput = false;
        return this;
    }

    public TestSupport disableCompleteCall() {
        this.callComplete = false;
        return this;
    }

    public TestSupport cooperativeTimeout(long timeout) {
        this.cooperativeTimeout = timeout;
        return this;
    }

    public TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>, ? super List<?>> outputChecker) {
        this.outputChecker = outputChecker;
        return this;
    }

    private void runTest(boolean doSnapshots, boolean doRestore) {
        assert (doSnapshots || !doRestore) : "Illegal combination: don't do snapshots, but do restore";
        BackoffIdleStrategy idler = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(1L));
        int idleCount = 0;
        if (doSnapshots && doRestore) {
            System.out.println("### Running the test with doSnapshots=false");
            this.runTest(false, false);
            System.out.println("### Running the test with doSnapshots=true, doRestore=false");
            this.runTest(true, false);
            System.out.println("### Running the test with doSnapshots=true, doRestore=true");
        }
        TestInbox inbox = new TestInbox();
        Processor[] processor = new Processor[]{this.supplier.get()};
        boolean isCooperative = processor[0].isCooperative();
        TestOutbox outbox = new TestOutbox(new int[]{1}, 1);
        ArrayList<Object> actualOutput = new ArrayList<Object>();
        this.initProcessor(processor[0], outbox);
        this.snapshotAndRestore(processor, outbox, actualOutput, doSnapshots, doRestore);
        Iterator<?> inputIterator = this.input.iterator();
        while (inputIterator.hasNext() || !inbox.isEmpty()) {
            if (inbox.isEmpty()) {
                inbox.add(inputIterator.next());
                if (this.logInputOutput) {
                    System.out.println("Input: " + inbox.peek());
                }
            }
            this.checkTime("process", isCooperative, () -> processor[0].process(0, inbox));
            boolean madeProgress = inbox.isEmpty() || !outbox.queueWithOrdinal(0).isEmpty();
            JetAssert.assertTrue("process() call without progress", !this.assertProgress || madeProgress);
            idleCount = this.idle((IdleStrategy)idler, idleCount, madeProgress);
            if (outbox.queueWithOrdinal(0).size() == 1 && !inbox.isEmpty()) {
                this.checkTime("process", isCooperative, () -> processor[0].process(0, inbox));
            }
            TestSupport.drainOutbox(outbox.queueWithOrdinal(0), actualOutput, this.logInputOutput);
            if (!inbox.isEmpty()) continue;
            this.snapshotAndRestore(processor, outbox, actualOutput, doSnapshots, doRestore);
        }
        if (this.callComplete) {
            double elapsed;
            long completeStart = System.nanoTime();
            boolean[] done = new boolean[]{false};
            do {
                this.checkTime("complete", isCooperative, () -> {
                    done[0] = processor[0].complete();
                });
                boolean madeProgress = done[0] || !outbox.queueWithOrdinal(0).isEmpty();
                JetAssert.assertTrue("complete() call without progress", !this.assertProgress || madeProgress);
                TestSupport.drainOutbox(outbox.queueWithOrdinal(0), actualOutput, this.logInputOutput);
                this.snapshotAndRestore(processor, outbox, actualOutput, madeProgress && doSnapshots && !done[0], doRestore);
                idleCount = this.idle((IdleStrategy)idler, idleCount, madeProgress);
            } while (!(this.runUntilCompletedTimeout > 0L && (elapsed = TestSupport.toMillis(System.nanoTime() - completeStart)) > (double)this.runUntilCompletedTimeout) && !done[0]);
            JetAssert.assertTrue("complete returned true", !done[0] || this.runUntilCompletedTimeout <= 0L);
        }
        if (!this.outputChecker.test(this.expectedOutput, actualOutput)) {
            JetAssert.assertEquals("processor output with doSnapshots=" + doSnapshots + " doesn't match", TestSupport.listToString(this.expectedOutput), TestSupport.listToString(actualOutput));
        }
    }

    private int idle(IdleStrategy idler, int idleCount, boolean madeProgress) {
        if (!madeProgress) {
            idler.idle((long)(++idleCount));
        } else {
            idleCount = 0;
        }
        return idleCount;
    }

    private void snapshotAndRestore(Processor[] processor, TestOutbox outbox, List<Object> actualOutput, boolean doSnapshot, boolean doRestore) {
        if (!doSnapshot) {
            return;
        }
        TestInbox snapshotInbox = new TestInbox();
        boolean[] done = new boolean[]{false};
        boolean isCooperative = processor[0].isCooperative();
        HashSet<Object> keys = new HashSet<Object>();
        do {
            this.checkTime("saveSnapshot", isCooperative, () -> {
                done[0] = processor[0].saveToSnapshot();
            });
            for (Map.Entry entry : outbox.snapshotQueue()) {
                Object key = ((TestOutbox.MockData)entry.getKey()).getObject();
                JetAssert.assertTrue("Duplicate key produced in saveToSnapshot()\n  Duplicate: " + key + "\n  Keys so far: " + keys, keys.add(key));
                snapshotInbox.add(Util.entry(key, ((TestOutbox.MockData)entry.getValue()).getObject()));
            }
            JetAssert.assertTrue("saveToSnapshot() call without progress", !this.assertProgress || done[0] || !outbox.snapshotQueue().isEmpty() || !outbox.queueWithOrdinal(0).isEmpty());
            TestSupport.drainOutbox(outbox.queueWithOrdinal(0), actualOutput, this.logInputOutput);
            outbox.snapshotQueue().clear();
        } while (!done[0]);
        if (!doRestore) {
            return;
        }
        processor[0] = this.supplier.get();
        this.initProcessor(processor[0], outbox);
        int lastInboxSize = snapshotInbox.size();
        while (!snapshotInbox.isEmpty()) {
            this.checkTime("restoreSnapshot", isCooperative, () -> processor[0].restoreFromSnapshot(snapshotInbox));
            JetAssert.assertTrue("restoreFromSnapshot() call without progress", !this.assertProgress || lastInboxSize > snapshotInbox.size() || !outbox.queueWithOrdinal(0).isEmpty());
            TestSupport.drainOutbox(outbox.queueWithOrdinal(0), actualOutput, this.logInputOutput);
            lastInboxSize = snapshotInbox.size();
        }
        do {
            this.checkTime("finishSnapshotRestore", isCooperative, () -> {
                done[0] = processor[0].finishSnapshotRestore();
            });
            JetAssert.assertTrue("finishSnapshotRestore() call without progress", !this.assertProgress || done[0] || !outbox.queueWithOrdinal(0).isEmpty());
            TestSupport.drainOutbox(outbox.queueWithOrdinal(0), actualOutput, this.logInputOutput);
        } while (!done[0]);
    }

    private void checkTime(String methodName, boolean isCooperative, Runnable r) {
        long start = System.nanoTime();
        r.run();
        long elapsed = System.nanoTime() - start;
        if (isCooperative) {
            if (this.cooperativeTimeout > 0L) {
                JetAssert.assertTrue(String.format("call to %s() took %.1fms, it should be <%dms", methodName, TestSupport.toMillis(elapsed), 1000L), elapsed < TimeUnit.MILLISECONDS.toNanos(1000L));
            }
            if (elapsed > TimeUnit.MILLISECONDS.toNanos(5L)) {
                System.out.println(String.format("Warning: call to %s() took %.2fms, it should be <%dms normally", methodName, TestSupport.toMillis(elapsed), 5L));
            }
        } else if (elapsed > TimeUnit.MILLISECONDS.toNanos(10000L)) {
            System.out.println(String.format("Warning: call to %s() took %.2fms in non-cooperative processor. Is this to be expected?", methodName, TestSupport.toMillis(elapsed)));
        }
    }

    private void initProcessor(Processor processor, TestOutbox outbox) {
        TestProcessorContext context = new TestProcessorContext().setLogger(TestSupport.getLogger(processor.getClass().getName()));
        processor.init(outbox, context);
    }

    private static double toMillis(long nanos) {
        return (double)nanos / (double)TimeUnit.MILLISECONDS.toNanos(1L);
    }

    public static <T> void drainOutbox(Queue<T> outboxBucket, Collection<? super T> target, boolean logItems) {
        T o;
        while ((o = outboxBucket.poll()) != null) {
            target.add(o);
            if (!logItems) continue;
            System.out.println("Output: " + o);
        }
    }

    public static Supplier<Processor> supplierFrom(ProcessorSupplier supplier) {
        supplier.init(new TestProcessorSupplierContext());
        return () -> supplier.get(1).iterator().next();
    }

    public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier) {
        supplier.init(new TestProcessorMetaSupplierContext());
        return TestSupport.supplierFrom(supplier.get(Collections.singletonList(LOCAL_ADDRESS)).apply(LOCAL_ADDRESS));
    }

    static ILogger getLogger(String name) {
        return LOGGING_SERVICE.getLogger(name);
    }

    static ILogger getLogger(Class clazz) {
        return LOGGING_SERVICE.getLogger(clazz);
    }

    private static String listToString(List<?> list) {
        return list.stream().map(String::valueOf).collect(Collectors.joining("\n"));
    }

    private static Supplier<Processor> singletonSupplier(Processor processor) {
        Processor[] processor1 = new Processor[]{processor};
        return () -> {
            if (processor1[0] == null) {
                throw new RuntimeException("More than one instance requested");
            }
            try {
                Processor processor = processor1[0];
                return processor;
            }
            finally {
                processor1[0] = null;
            }
        };
    }

    static {
        LOGGING_SERVICE = new LoggingServiceImpl("test-group", null, BuildInfoProvider.getBuildInfo());
        try {
            LOCAL_ADDRESS = new Address("localhost", 5701);
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }
}

