/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.test;

import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorMetaSupplier;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.test.JetAssert;
import com.hazelcast.jet.test.TestInbox;
import com.hazelcast.jet.test.TestOutbox;
import com.hazelcast.jet.test.TestProcessorContext;
import com.hazelcast.jet.test.TestProcessorMetaSupplierContext;
import com.hazelcast.jet.test.TestProcessorSupplierContext;
import com.hazelcast.nio.Address;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nonnull;

public final class TestSupport {
    private static final Address LOCAL_ADDRESS;

    private TestSupport() {
    }

    public static <T, U> void testProcessor(@Nonnull DistributedSupplier<Processor> supplier, @Nonnull List<T> input, @Nonnull List<U> expectedOutput) {
        TestSupport.testProcessor(TestSupport.processorFrom(supplier), input, expectedOutput, true);
    }

    public static <T, U> void testProcessor(@Nonnull ProcessorSupplier supplier, @Nonnull List<T> input, @Nonnull List<U> expectedOutput) {
        TestSupport.testProcessor(TestSupport.processorFrom(supplier), input, expectedOutput, true);
    }

    public static <T, U> void testProcessor(@Nonnull ProcessorMetaSupplier supplier, @Nonnull List<T> input, @Nonnull List<U> expectedOutput) {
        TestSupport.testProcessor(TestSupport.processorFrom(supplier), input, expectedOutput, true);
    }

    public static <T, U> void testProcessor(@Nonnull Processor processor, @Nonnull List<T> input, @Nonnull List<U> expectedOutput) {
        TestSupport.testProcessor(processor, input, expectedOutput, true);
    }

    public static <T, U> void testProcessor(@Nonnull DistributedSupplier<Processor> supplier, @Nonnull List<T> input, @Nonnull List<U> expectedOutput, boolean assertProgress) {
        TestSupport.testProcessor(TestSupport.processorFrom(supplier), input, expectedOutput, assertProgress);
    }

    public static <T, U> void testProcessor(@Nonnull ProcessorSupplier supplier, @Nonnull List<T> input, @Nonnull List<U> expectedOutput, boolean assertProgress) {
        TestSupport.testProcessor(TestSupport.processorFrom(supplier), input, expectedOutput, assertProgress);
    }

    public static <T, U> void testProcessor(@Nonnull ProcessorMetaSupplier supplier, @Nonnull List<T> input, @Nonnull List<U> expectedOutput, boolean assertProgress) {
        TestSupport.testProcessor(TestSupport.processorFrom(supplier), input, expectedOutput, assertProgress);
    }

    public static <T, U> void testProcessor(@Nonnull Processor processor, @Nonnull List<T> input, @Nonnull List<U> expectedOutput, boolean assertProgress) {
        boolean done;
        TestInbox inbox = new TestInbox();
        inbox.addAll(input);
        int outboxCapacity = processor.isCooperative() ? 1 : Integer.MAX_VALUE;
        TestOutbox outbox = new TestOutbox(outboxCapacity);
        Queue<Object> bucket = outbox.queueWithOrdinal(0);
        ArrayList<Object> actualOutput = new ArrayList<Object>();
        processor.init(outbox, new TestProcessorContext());
        int lastInboxSize = inbox.size();
        int lastOutboxSize = bucket.size();
        while (!inbox.isEmpty()) {
            processor.process(0, inbox);
            if (processor.isCooperative() && bucket.size() == 1) {
                processor.process(0, inbox);
            }
            TestSupport.drainOutbox(bucket, actualOutput);
            if (assertProgress) {
                JetAssert.assertTrue("process() call without progress", lastInboxSize > inbox.size() || lastOutboxSize < actualOutput.size());
            }
            lastInboxSize = inbox.size();
            lastOutboxSize = actualOutput.size();
        }
        do {
            done = processor.complete();
            TestSupport.drainOutbox(bucket, actualOutput);
            if (assertProgress) {
                JetAssert.assertTrue("complete() call without progress", done || lastOutboxSize < actualOutput.size());
            }
            lastOutboxSize = actualOutput.size();
        } while (!done);
        JetAssert.assertEquals("processor output doesn't match", expectedOutput, actualOutput);
    }

    public static void drainOutbox(Queue<Object> outboxBucket, List<Object> outputList) {
        Object o;
        while ((o = outboxBucket.poll()) != null) {
            outputList.add(o);
        }
    }

    public static Processor processorFrom(DistributedSupplier<Processor> supplier) {
        return supplier.get();
    }

    public static Processor processorFrom(ProcessorSupplier supplier) {
        supplier.init(new TestProcessorSupplierContext());
        return supplier.get(1).iterator().next();
    }

    public static Processor processorFrom(ProcessorMetaSupplier supplier) {
        supplier.init(new TestProcessorMetaSupplierContext());
        return TestSupport.processorFrom(supplier.get(Collections.singletonList(LOCAL_ADDRESS)).apply(LOCAL_ADDRESS));
    }

    static {
        try {
            LOCAL_ADDRESS = new Address("localhost", 5701);
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }
}

