/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.test;

import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.test.JetAssert;
import com.hazelcast.jet.core.test.TestInbox;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.core.test.TestProcessorMetaSupplierContext;
import com.hazelcast.jet.core.test.TestProcessorSupplierContext;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingServiceImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.net.UnknownHostException;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public final class TestSupport {
    public static final BiPredicate<List<?>, List<?>> SAME_ITEMS_ANY_ORDER = (expected, actual) -> {
        if (expected.size() != actual.size()) {
            return false;
        }
        Map expectedMap = expected.stream().collect(Collectors.toMap(DistributedFunction.identity(), e -> 1, Integer::sum));
        Map actualMap = actual.stream().collect(Collectors.toMap(DistributedFunction.identity(), e -> 1, Integer::sum));
        return expectedMap.equals(actualMap);
    };
    private static final Address LOCAL_ADDRESS;
    private static final long COOPERATIVE_TIME_LIMIT_MS_FAIL = 1000L;
    private static final long COOPERATIVE_TIME_LIMIT_MS_WARN = 5L;
    private static final long BLOCKING_TIME_LIMIT_MS_WARN = 10000L;
    private static final LoggingServiceImpl LOGGING_SERVICE;
    private ProcessorSupplier supplier;
    private List<List<?>> inputs = Collections.emptyList();
    private List<List<?>> expectedOutputs = Collections.emptyList();
    private int[] priorities = new int[0];
    private boolean assertProgress = true;
    private boolean doSnapshots = true;
    private boolean logInputOutput = true;
    private boolean callComplete = true;
    private JetInstance jetInstance;
    private long cooperativeTimeout = 1000L;
    private long runUntilCompletedTimeout;
    private BiPredicate<? super List<?>, ? super List<?>> outputChecker = Objects::equals;

    private TestSupport(@Nonnull ProcessorSupplier supplier) {
        this.supplier = supplier;
    }

    public static TestSupport verifyProcessor(Processor processor) {
        return new TestSupport(ProcessorSupplier.of(TestSupport.singletonSupplier(processor))).disableSnapshots();
    }

    public static TestSupport verifyProcessor(@Nonnull DistributedSupplier<Processor> supplier) {
        return new TestSupport(ProcessorSupplier.of(supplier));
    }

    public static TestSupport verifyProcessor(@Nonnull ProcessorSupplier supplier) {
        return new TestSupport(supplier);
    }

    public static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier supplier) {
        supplier.init(new TestProcessorMetaSupplierContext());
        return new TestSupport(supplier.get(Collections.singletonList(LOCAL_ADDRESS)).apply(LOCAL_ADDRESS));
    }

    public TestSupport input(@Nonnull List<?> input) {
        this.inputs = Collections.singletonList(input);
        this.priorities = new int[]{0};
        return this;
    }

    public TestSupport inputs(@Nonnull List<List<?>> inputs) {
        return this.inputs(inputs, new int[inputs.size()]);
    }

    public TestSupport inputs(@Nonnull List<List<?>> inputs, int[] priorities) {
        if (inputs.size() != priorities.length) {
            throw new IllegalArgumentException("Number of inputs must be equal to number of priorities");
        }
        this.inputs = inputs;
        this.priorities = priorities;
        return this;
    }

    public void expectOutput(@Nonnull List<?> expectedOutput) {
        this.expectOutputs(Collections.singletonList(expectedOutput));
    }

    public void expectOutputs(@Nonnull List<List<?>> expectedOutputs) {
        try {
            this.supplier.init(new TestProcessorSupplierContext());
            this.expectedOutputs = expectedOutputs;
            this.runTest(this.doSnapshots, this.doSnapshots ? 1 : 0);
            this.supplier.close(null);
        }
        catch (Exception e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    public TestSupport disableProgressAssertion() {
        this.assertProgress = false;
        return this;
    }

    public TestSupport disableRunUntilCompleted(long timeoutMillis) {
        this.runUntilCompletedTimeout = timeoutMillis;
        return this;
    }

    public TestSupport disableSnapshots() {
        this.doSnapshots = false;
        return this;
    }

    public TestSupport disableLogging() {
        this.logInputOutput = false;
        return this;
    }

    public TestSupport disableCompleteCall() {
        this.callComplete = false;
        return this;
    }

    public TestSupport cooperativeTimeout(long timeout) {
        this.cooperativeTimeout = timeout;
        return this;
    }

    public TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>, ? super List<?>> outputChecker) {
        this.outputChecker = outputChecker;
        return this;
    }

    public TestSupport jetInstance(@Nonnull JetInstance jetInstance) {
        this.jetInstance = jetInstance;
        return this;
    }

    private static String modeDescription(boolean doSnapshots, int doRestoreEvery) {
        if (!doSnapshots && doRestoreEvery == 0) {
            return "snapshots disabled";
        }
        if (doSnapshots && doRestoreEvery == 1) {
            return "snapshots enabled, restoring every snapshot";
        }
        if (doSnapshots && doRestoreEvery == 2) {
            return "snapshots enabled, restoring every other snapshot";
        }
        if (doSnapshots && doRestoreEvery == Integer.MAX_VALUE) {
            return "snapshots enabled, never restoring them";
        }
        throw new IllegalArgumentException("Unknown mode, doSnapshots=" + doSnapshots + ", doRestoreEvery=" + doRestoreEvery);
    }

    private void runTest(boolean doSnapshots, int doRestoreEvery) throws Exception {
        assert (doSnapshots || doRestoreEvery == 0) : "Illegal combination: don't do snapshots, but do restore";
        BackoffIdleStrategy idler = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(1L));
        int idleCount = 0;
        if (doSnapshots && doRestoreEvery == 1) {
            this.runTest(false, 0);
            this.runTest(true, Integer.MAX_VALUE);
            this.runTest(true, 2);
        }
        System.out.println("### Running the test, mode=" + TestSupport.modeDescription(doSnapshots, doRestoreEvery));
        TestInbox inbox = new TestInbox();
        int inboxOrdinal = -1;
        Processor[] processor = new Processor[]{this.newProcessorFromSupplier()};
        boolean isCooperative = processor[0].isCooperative();
        TestOutbox[] outbox = new TestOutbox[]{this.createOutbox()};
        ArrayList<List<Object>> actualOutputs = new ArrayList<List<Object>>(this.expectedOutputs.size());
        for (int i = 0; i < this.expectedOutputs.size(); ++i) {
            actualOutputs.add(new ArrayList());
        }
        this.initProcessor(processor[0], outbox[0]);
        int[] restoreCount = new int[]{0};
        this.snapshotAndRestore(processor, outbox, actualOutputs, doSnapshots, doRestoreEvery, restoreCount);
        List<ObjectWithOrdinal> input = TestSupport.mixInputs(this.inputs, this.priorities);
        Iterator<ObjectWithOrdinal> inputIterator = input.iterator();
        Watermark[] wmToProcess = new Watermark[]{null};
        while (inputIterator.hasNext() || !inbox.isEmpty() || wmToProcess[0] != null) {
            String methodName;
            if (inbox.isEmpty() && wmToProcess[0] == null && inputIterator.hasNext()) {
                ObjectWithOrdinal objectWithOrdinal = inputIterator.next();
                inbox.queue().add(objectWithOrdinal.item);
                inboxOrdinal = objectWithOrdinal.ordinal;
                if (this.logInputOutput) {
                    System.out.println(LocalTime.now() + " Input-" + objectWithOrdinal.ordinal + ": " + inbox.peek());
                }
            }
            if (wmToProcess[0] != null) {
                methodName = "offer";
                if (outbox[0].offer(wmToProcess[0])) {
                    wmToProcess[0] = null;
                }
            } else {
                methodName = this.processInbox(inbox, inboxOrdinal, isCooperative, processor, wmToProcess);
            }
            boolean madeProgress = inbox.isEmpty() || !outbox[0].queue(0).isEmpty();
            JetAssert.assertTrue(methodName + "() call without progress", !this.assertProgress || madeProgress);
            idleCount = this.idle(idler, idleCount, madeProgress);
            if (outbox[0].queue(0).size() == 1 && !inbox.isEmpty()) {
                outbox[0].reset();
                this.processInbox(inbox, inboxOrdinal, isCooperative, processor, wmToProcess);
            }
            outbox[0].drainQueuesAndReset(actualOutputs, this.logInputOutput);
            if (!inbox.isEmpty() || wmToProcess[0] != null) continue;
            this.snapshotAndRestore(processor, outbox, actualOutputs, doSnapshots, doRestoreEvery, restoreCount);
        }
        if (this.logInputOutput && !this.inputs.isEmpty()) {
            System.out.println(LocalTime.now() + " Input processed, calling complete()");
        }
        if (this.callComplete) {
            double elapsed;
            long completeStart = System.nanoTime();
            boolean[] done = new boolean[]{false};
            do {
                this.checkTime("complete", isCooperative, () -> {
                    done[0] = processor[0].complete();
                });
                boolean madeProgress = done[0] || !outbox[0].queue(0).isEmpty();
                JetAssert.assertTrue("complete() call without progress", !this.assertProgress || madeProgress);
                outbox[0].drainQueuesAndReset(actualOutputs, this.logInputOutput);
                this.snapshotAndRestore(processor, outbox, actualOutputs, madeProgress && doSnapshots && !done[0], doRestoreEvery, restoreCount);
                idleCount = this.idle(idler, idleCount, madeProgress);
            } while (!(this.runUntilCompletedTimeout > 0L && (elapsed = TestSupport.toMillis(System.nanoTime() - completeStart)) > (double)this.runUntilCompletedTimeout) && !done[0]);
            JetAssert.assertTrue("complete returned true", !done[0] || this.runUntilCompletedTimeout <= 0L);
        }
        processor[0].close(null);
        for (int i = 0; i < this.expectedOutputs.size(); ++i) {
            List actualOutput;
            List<?> expectedOutput = this.expectedOutputs.get(i);
            if (this.outputChecker.test(expectedOutput, actualOutput = (List)actualOutputs.get(i))) continue;
            JetAssert.assertEquals("processor output in mode \"" + TestSupport.modeDescription(doSnapshots, doRestoreEvery) + "\" doesn't match", TestSupport.listToString(expectedOutput), TestSupport.listToString(actualOutput));
        }
    }

    private Processor newProcessorFromSupplier() {
        return this.supplier.get(1).iterator().next();
    }

    private static List<ObjectWithOrdinal> mixInputs(List<List<?>> inputs, int[] priorities) {
        TreeMap<Integer, List> ordinalsByPriority = new TreeMap<Integer, List>();
        for (int i = 0; i < priorities.length; ++i) {
            ordinalsByPriority.computeIfAbsent(priorities[i], k -> new ArrayList()).add(i);
        }
        ArrayList<ObjectWithOrdinal> result = new ArrayList<ObjectWithOrdinal>();
        for (List ordinals : ordinalsByPriority.values()) {
            boolean allDone;
            int index = 0;
            do {
                allDone = true;
                for (Integer ordinal : ordinals) {
                    if (inputs.get(ordinal).size() <= index) continue;
                    Object item = inputs.get(ordinal).get(index);
                    result.add(new ObjectWithOrdinal(ordinal, item));
                    allDone = false;
                }
                ++index;
            } while (!allDone);
        }
        return result;
    }

    private TestOutbox createOutbox() {
        return new TestOutbox(IntStream.generate(() -> 1).limit(this.expectedOutputs.size()).toArray(), 1);
    }

    private String processInbox(TestInbox inbox, int inboxOrdinal, boolean isCooperative, Processor[] processor, Watermark[] wmToEmit) {
        if (inbox.peek() instanceof Watermark) {
            Watermark wm = (Watermark)inbox.peek();
            this.checkTime("tryProcessWatermark", isCooperative, () -> {
                if (processor[0].tryProcessWatermark(wm)) {
                    inbox.remove();
                    wmToEmit[0] = wm;
                }
            });
            return "tryProcessWatermark";
        }
        this.checkTime("process", isCooperative, () -> processor[0].process(inboxOrdinal, inbox));
        return "process";
    }

    private int idle(IdleStrategy idler, int idleCount, boolean madeProgress) {
        if (!madeProgress) {
            idler.idle(++idleCount);
        } else {
            idleCount = 0;
        }
        return idleCount;
    }

    private void snapshotAndRestore(Processor[] processor, TestOutbox[] outbox, List<List<Object>> actualOutput, boolean doSnapshot, int doRestoreEvery, int[] restoreCount) throws Exception {
        boolean willRestore;
        if (!doSnapshot) {
            return;
        }
        restoreCount[0] = restoreCount[0] + 1;
        boolean bl = willRestore = restoreCount[0] % doRestoreEvery == 0;
        if (this.logInputOutput) {
            System.out.println(LocalTime.now() + (willRestore ? " Saving & restoring snapshot" : " Saving snapshot without restoring it"));
        }
        TestInbox snapshotInbox = new TestInbox();
        boolean[] done = new boolean[]{false};
        boolean isCooperative = processor[0].isCooperative();
        HashSet keys = new HashSet();
        do {
            this.checkTime("saveSnapshot", isCooperative, () -> {
                done[0] = processor[0].saveToSnapshot();
            });
            JetAssert.assertTrue("saveToSnapshot() call without progress", !this.assertProgress || done[0] || !outbox[0].snapshotQueue().isEmpty() || !outbox[0].queue(0).isEmpty());
            outbox[0].drainSnapshotQueueAndReset(snapshotInbox.queue(), false);
            outbox[0].drainQueuesAndReset(actualOutput, this.logInputOutput);
        } while (!done[0]);
        for (Object item : snapshotInbox.queue()) {
            Map.Entry item2 = (Map.Entry)item;
            JetAssert.assertTrue("Duplicate key produced in saveToSnapshot()\n  Duplicate: " + item2.getKey() + "\n  Keys so far: " + keys, keys.add(item2.getKey()));
        }
        if (!willRestore) {
            return;
        }
        assert (outbox[0].queue(0).isEmpty());
        assert (outbox[0].snapshotQueue().isEmpty());
        processor[0].close(null);
        processor[0] = this.newProcessorFromSupplier();
        outbox[0] = this.createOutbox();
        this.initProcessor(processor[0], outbox[0]);
        int lastInboxSize = snapshotInbox.queue().size();
        while (!snapshotInbox.isEmpty()) {
            this.checkTime("restoreSnapshot", isCooperative, () -> processor[0].restoreFromSnapshot(snapshotInbox));
            JetAssert.assertTrue("restoreFromSnapshot() call without progress", !this.assertProgress || lastInboxSize > snapshotInbox.queue().size() || !outbox[0].queue(0).isEmpty());
            outbox[0].drainQueuesAndReset(actualOutput, this.logInputOutput);
            lastInboxSize = snapshotInbox.queue().size();
        }
        do {
            this.checkTime("finishSnapshotRestore", isCooperative, () -> {
                done[0] = processor[0].finishSnapshotRestore();
            });
            JetAssert.assertTrue("finishSnapshotRestore() call without progress", !this.assertProgress || done[0] || !outbox[0].queue(0).isEmpty());
            outbox[0].drainQueuesAndReset(actualOutput, this.logInputOutput);
        } while (!done[0]);
    }

    private void checkTime(String methodName, boolean isCooperative, Runnable r) {
        long start = System.nanoTime();
        r.run();
        long elapsed = System.nanoTime() - start;
        if (isCooperative) {
            if (this.cooperativeTimeout > 0L) {
                JetAssert.assertTrue(String.format("call to %s() took %.1fms, it should be <%dms", methodName, TestSupport.toMillis(elapsed), 1000L), elapsed < TimeUnit.MILLISECONDS.toNanos(1000L));
            }
            if (elapsed > TimeUnit.MILLISECONDS.toNanos(5L)) {
                System.out.println(String.format("Warning: call to %s() took %.2fms, it should be <%dms normally", methodName, TestSupport.toMillis(elapsed), 5L));
            }
        } else if (elapsed > TimeUnit.MILLISECONDS.toNanos(10000L)) {
            System.out.println(String.format("Warning: call to %s() took %.2fms in non-cooperative processor. Is this expected?", methodName, TestSupport.toMillis(elapsed)));
        }
    }

    private void initProcessor(Processor processor, TestOutbox outbox) {
        TestProcessorContext context = new TestProcessorContext().setLogger(TestSupport.getLogger(processor.getClass().getName()));
        if (this.jetInstance != null) {
            context.setJetInstance(this.jetInstance);
        }
        processor.init(outbox, context);
    }

    private static double toMillis(long nanos) {
        return (double)nanos / (double)TimeUnit.MILLISECONDS.toNanos(1L);
    }

    public static Supplier<Processor> supplierFrom(ProcessorSupplier supplier) {
        supplier.init(new TestProcessorSupplierContext());
        return () -> supplier.get(1).iterator().next();
    }

    public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier) {
        supplier.init(new TestProcessorMetaSupplierContext());
        return TestSupport.supplierFrom(supplier.get(Collections.singletonList(LOCAL_ADDRESS)).apply(LOCAL_ADDRESS));
    }

    static ILogger getLogger(String name) {
        return LOGGING_SERVICE.getLogger(name);
    }

    static ILogger getLogger(Class clazz) {
        return LOGGING_SERVICE.getLogger(clazz);
    }

    public static String listToString(List<?> list) {
        return list.stream().map(String::valueOf).collect(Collectors.joining("\n"));
    }

    private static DistributedSupplier<Processor> singletonSupplier(Processor processor) {
        Processor[] processor1 = new Processor[]{processor};
        return () -> {
            if (processor1[0] == null) {
                throw new RuntimeException("More than one instance requested");
            }
            try {
                Processor processor = processor1[0];
                return processor;
            }
            finally {
                processor1[0] = null;
            }
        };
    }

    static {
        LOGGING_SERVICE = new LoggingServiceImpl("test-group", null, BuildInfoProvider.getBuildInfo());
        try {
            LOCAL_ADDRESS = new Address("localhost", 5701);
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    private static class ObjectWithOrdinal {
        final int ordinal;
        final Object item;

        ObjectWithOrdinal(int ordinal, Object item) {
            this.ordinal = ordinal;
            this.item = item;
        }
    }
}

