/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.processors;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.test.TestSupport;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.JetJoinInfo;
import com.hazelcast.jet.sql.impl.processors.StreamToStreamJoinP;
import com.hazelcast.shaded.com.google.common.collect.ImmutableMap;
import com.hazelcast.shaded.org.apache.calcite.rel.core.JoinRelType;
import com.hazelcast.sql.impl.expression.ColumnExpression;
import com.hazelcast.sql.impl.expression.ConstantExpression;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.expression.math.MinusFunction;
import com.hazelcast.sql.impl.expression.predicate.AndPredicate;
import com.hazelcast.sql.impl.expression.predicate.ComparisonMode;
import com.hazelcast.sql.impl.expression.predicate.ComparisonPredicate;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.sql.impl.type.QueryDataType;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@Category(value={QuickTest.class, ParallelJVMTest.class})
@RunWith(value=HazelcastSerialClassRunner.class)
public class StreamToStreamJoinPInnerTest
extends SqlTestSupport {
    protected static final BiPredicate<List<?>, List<?>> SAME_ITEMS_ANY_ORDER_EQUIVALENT_WMS = (expected, actual) -> {
        Long oldVal;
        long newVal;
        Map actualMap;
        TestSupport.TestMode testMode = ((TestSupport.TestContext)TestSupport.TEST_CONTEXT.get()).getTestMode();
        if (!testMode.isSnapshotsEnabled() || testMode.snapshotRestoreInterval() == Integer.MAX_VALUE) {
            return TestSupport.SAME_ITEMS_ANY_ORDER.test(expected, actual);
        }
        Function<List, Map> transformFn = l -> l.stream().filter(o -> !(o instanceof Watermark)).collect(Collectors.toMap(FunctionEx.identity(), e -> 1, Integer::sum));
        Map expectedMap = transformFn.apply((List)expected);
        if (!expectedMap.equals(actualMap = transformFn.apply((List)actual))) {
            return false;
        }
        HashMap<Byte, Long> expectedWms = new HashMap<Byte, Long>();
        HashMap<Byte, Long> actualWms = new HashMap<Byte, Long>();
        for (Object e : expected) {
            if (!(e instanceof Watermark)) continue;
            newVal = ((Watermark)e).timestamp();
            oldVal = expectedWms.put(((Watermark)e).key(), newVal);
            if (oldVal == null || oldVal < newVal) continue;
            return false;
        }
        for (Object e : actual) {
            if (!(e instanceof Watermark)) continue;
            newVal = ((Watermark)e).timestamp();
            oldVal = actualWms.put(((Watermark)e).key(), newVal);
            if (oldVal == null || oldVal <= newVal) continue;
            return false;
        }
        for (Map.Entry entry : expectedWms.entrySet()) {
            Long actualVal = (Long)actualWms.get(entry.getKey());
            long expectedVal = (Long)entry.getValue();
            if (actualVal == null || actualVal <= expectedVal) continue;
            return false;
        }
        actualWms.keySet().removeAll(expectedWms.keySet());
        Assert.assertTrue((String)("unexpected WM keys received: " + String.valueOf(actualWms)), (boolean)actualWms.isEmpty());
        return true;
    };
    private Map<Byte, ToLongFunctionEx<JetSqlRow>> leftExtractors = Collections.singletonMap((byte)0, (ToLongFunctionEx & Serializable)l -> (Long)l.getRow().get(0));
    private Map<Byte, ToLongFunctionEx<JetSqlRow>> rightExtractors = Collections.singletonMap((byte)1, (ToLongFunctionEx & Serializable)r -> (Long)r.getRow().get(0));
    private final Map<Byte, Map<Byte, Long>> postponeTimeMap = new HashMap<Byte, Map<Byte, Long>>();

    @BeforeClass
    public static void beforeClass() {
        StreamToStreamJoinPInnerTest.initialize((int)1, null);
    }

    @Test
    public void test_equalTimes_singleWmKeyPerInput() {
        this.postponeTimeMap.put((byte)0, Collections.singletonMap((byte)1, 0L));
        this.postponeTimeMap.put((byte)1, Collections.singletonMap((byte)0, 0L));
        ProcessorSupplier processorSupplier = ProcessorSupplier.of(this.createProcessor(1, 1, true, new int[0]));
        TestSupport.verifyProcessor((ProcessorSupplier)processorSupplier).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).outputChecker(TestSupport.SAME_ITEMS_ANY_ORDER).expectExactOutput(new TestSupport.TestEvent[]{TestSupport.in((Object)StreamToStreamJoinPInnerTest.wm((long)0L, (byte)1)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(1L)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(1L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(1L, 1L)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(2L)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(2L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(2L, 2L)), TestSupport.in((Object)StreamToStreamJoinPInnerTest.wm((long)2L, (byte)0)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)0L, (byte)0)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)0L, (byte)1))});
    }

    @Test
    public void test_twoWmKeysOnLeft() {
        this.postponeTimeMap.put((byte)0, Collections.emptyMap());
        this.postponeTimeMap.put((byte)1, Collections.singletonMap((byte)2, 1L));
        this.postponeTimeMap.put((byte)2, Collections.singletonMap((byte)1, 4L));
        this.leftExtractors = new HashMap<Byte, ToLongFunctionEx<JetSqlRow>>();
        this.leftExtractors.put((byte)0, (ToLongFunctionEx<JetSqlRow>)(ToLongFunctionEx & Serializable)l -> (Long)l.getRow().get(0));
        this.leftExtractors.put((byte)1, (ToLongFunctionEx<JetSqlRow>)(ToLongFunctionEx & Serializable)l -> (Long)l.getRow().get(1));
        this.rightExtractors = Collections.singletonMap((byte)2, (ToLongFunctionEx & Serializable)r -> (Long)r.getRow().get(0));
        SupplierEx<Processor> supplier = this.createProcessor(2, 1, false, new int[0]);
        TestSupport.verifyProcessor(supplier).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).outputChecker(SAME_ITEMS_ANY_ORDER_EQUIVALENT_WMS).expectExactOutput(new TestSupport.TestEvent[]{TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(12L, 9L)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(9L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(12L, 9L, 9L)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(12L, 13L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(12L, 13L, 9L)), TestSupport.processorAssertion(p -> {
            Assert.assertEquals(Arrays.asList(StreamToStreamJoinPInnerTest.jetRow(12L, 9L), StreamToStreamJoinPInnerTest.jetRow(12L, 13L)), (Object)p.buffer[0].content());
            Assert.assertEquals((Object)StreamToStreamJoinPInnerTest.jetRow(9L), p.buffer[1].content().iterator().next());
        }), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.wm((long)15L, (byte)2)), TestSupport.processorAssertion(p -> {
            Assert.assertEquals(Collections.singletonList(StreamToStreamJoinPInnerTest.jetRow(12L, 13L)), (Object)p.buffer[0].content());
            Assert.assertEquals((Object)StreamToStreamJoinPInnerTest.jetRow(9L), p.buffer[1].content().iterator().next());
        }), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.wm((long)12L, (byte)1)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)11L, (byte)1)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)11L, (byte)2)), TestSupport.processorAssertion(p -> {
            Assert.assertEquals(Collections.singletonList(StreamToStreamJoinPInnerTest.jetRow(12L, 13L)), (Object)p.buffer[0].content());
            Assert.assertTrue((boolean)p.buffer[1].isEmpty());
        }), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.wm((long)13L, (byte)0)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(16L)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.wm((long)13L, (byte)1)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)12L, (byte)2)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(16L))});
    }

    @Test
    public void test_twoWmKeysOnEachInput() {
        this.postponeTimeMap.put((byte)0, (Map<Byte, Long>)ImmutableMap.of((Object)2, (Object)2L, (Object)3, (Object)4L));
        this.postponeTimeMap.put((byte)1, (Map<Byte, Long>)ImmutableMap.of());
        this.leftExtractors = new HashMap<Byte, ToLongFunctionEx<JetSqlRow>>();
        this.leftExtractors.put((byte)0, (ToLongFunctionEx<JetSqlRow>)(ToLongFunctionEx & Serializable)l -> (Long)l.getRow().get(0));
        this.leftExtractors.put((byte)1, (ToLongFunctionEx<JetSqlRow>)(ToLongFunctionEx & Serializable)l -> (Long)l.getRow().get(1));
        this.postponeTimeMap.put((byte)2, (Map<Byte, Long>)ImmutableMap.of((Object)0, (Object)1L));
        this.postponeTimeMap.put((byte)3, (Map<Byte, Long>)ImmutableMap.of((Object)0, (Object)3L));
        this.rightExtractors = new HashMap<Byte, ToLongFunctionEx<JetSqlRow>>();
        this.rightExtractors.put((byte)2, (ToLongFunctionEx<JetSqlRow>)(ToLongFunctionEx & Serializable)r -> (Long)r.getRow().get(0));
        this.rightExtractors.put((byte)3, (ToLongFunctionEx<JetSqlRow>)(ToLongFunctionEx & Serializable)r -> (Long)r.getRow().get(1));
        SupplierEx<Processor> supplier = this.createProcessor(2, 2, false, new int[0]);
        TestSupport.verifyProcessor(supplier).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).outputChecker(SAME_ITEMS_ANY_ORDER_EQUIVALENT_WMS).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).expectExactOutput(new TestSupport.TestEvent[]{TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(12L, 10L)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(12L, 10L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(12L, 10L, 12L, 10L)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(12L, 13L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(12L, 13L, 12L, 10L)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(12L, 13L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(12L, 10L, 12L, 13L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(12L, 13L, 12L, 13L)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.wm((long)15L, (byte)0)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.wm((long)15L, (byte)2)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)13L, (byte)2)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)14L, (byte)0))});
    }

    @Test
    public void test_joinWithAdditionalCondition() {
        this.postponeTimeMap.put((byte)0, (Map<Byte, Long>)ImmutableMap.of((Object)1, (Object)1L));
        this.postponeTimeMap.put((byte)1, (Map<Byte, Long>)ImmutableMap.of((Object)0, (Object)1L));
        this.leftExtractors = ImmutableMap.of((Object)0, (ToLongFunctionEx & Serializable)l -> (Long)l.getRow().get(0));
        this.rightExtractors = ImmutableMap.of((Object)1, (ToLongFunctionEx & Serializable)r -> (Long)r.getRow().get(0));
        AndPredicate condition = AndPredicate.create((Expression[])new Expression[]{StreamToStreamJoinPInnerTest.createConditionFromPostponeTimeMap(this.postponeTimeMap, 1, 2), ComparisonPredicate.create((Expression)ColumnExpression.create((int)1, (QueryDataType)QueryDataType.INT), (Expression)ConstantExpression.create((Object)10, (QueryDataType)QueryDataType.INT), (ComparisonMode)ComparisonMode.GREATER_THAN)});
        JetJoinInfo joinInfo = new JetJoinInfo(JoinRelType.INNER, new int[0], new int[0], (Expression)condition, (Expression)condition);
        SupplierEx & Serializable supplier = (SupplierEx & Serializable)() -> new StreamToStreamJoinP(joinInfo, this.leftExtractors, this.rightExtractors, this.postponeTimeMap, Tuple2.tuple2((Object)2, (Object)1));
        TestSupport.verifyProcessor((SupplierEx)supplier).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).expectExactOutput(new TestSupport.TestEvent[]{TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(3L)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(2L, 2)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(2L, 42)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(2L, 42, 3L))});
    }

    @Test
    public void test_joinWithMultipleRowsAtOnce() {
        this.postponeTimeMap.put((byte)0, Collections.singletonMap((byte)1, 0L));
        this.postponeTimeMap.put((byte)1, Collections.singletonMap((byte)0, 0L));
        ProcessorSupplier processorSupplier = ProcessorSupplier.of(this.createProcessor(2, 1, true, 1, 2));
        TestSupport.verifyProcessor((ProcessorSupplier)processorSupplier).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).outputChecker(TestSupport.SAME_ITEMS_ANY_ORDER).cooperativeTimeout(0L).expectExactOutput(new TestSupport.TestEvent[]{TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(1L, 42)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(1L, 43)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(1L, 44)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(1L, 45)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(1L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(1L, 42, 1L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(1L, 43, 1L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(1L, 44, 1L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(1L, 45, 1L))});
    }

    @Test
    public void test_nonLateItemOutOfLimit() {
        this.postponeTimeMap.put((byte)0, (Map<Byte, Long>)ImmutableMap.of((Object)1, (Object)1L));
        this.postponeTimeMap.put((byte)1, (Map<Byte, Long>)ImmutableMap.of((Object)0, (Object)1L));
        this.leftExtractors = Collections.singletonMap((byte)0, (ToLongFunctionEx & Serializable)l -> (Long)l.getRow().get(0));
        this.rightExtractors = Collections.singletonMap((byte)1, (ToLongFunctionEx & Serializable)r -> (Long)r.getRow().get(0));
        SupplierEx<Processor> supplier = this.createProcessor(1, 1, false, new int[0]);
        TestSupport.verifyProcessor(supplier).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).expectExactOutput(new TestSupport.TestEvent[]{TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.wm((long)10L, (byte)0)), TestSupport.processorAssertion(p -> Assert.assertEquals((Object)ImmutableMap.of((Object)0, (Object)-9223372036854775807L, (Object)1, (Object)9L), (Object)p.wmState)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(8L)), TestSupport.processorAssertion(p -> Assert.assertEquals((long)0L, (long)(p.buffer[0].size() + p.buffer[1].size())))});
    }

    @Test
    public void test_nonLateItemOutOfLimit_hasMatchInBuffer() {
        this.postponeTimeMap.put((byte)0, Collections.singletonMap((byte)1, 0L));
        this.postponeTimeMap.put((byte)1, Collections.singletonMap((byte)0, 0L));
        ProcessorSupplier processorSupplier = ProcessorSupplier.of(this.createProcessor(1, 1, true, new int[0]));
        TestSupport.verifyProcessor((ProcessorSupplier)processorSupplier).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).outputChecker(TestSupport.SAME_ITEMS_ANY_ORDER).expectExactOutput(new TestSupport.TestEvent[]{TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.jetRow(0L)), TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.wm((long)10L, (byte)0)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(0L)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.jetRow(0L, 0L)), TestSupport.processorAssertion(processor -> Assert.assertEquals((long)0L, (long)processor.buffer[1].size()))});
    }

    @Test
    public void test_dropLateItems() {
        this.postponeTimeMap.put((byte)0, (Map<Byte, Long>)ImmutableMap.of((Object)1, (Object)1L));
        this.postponeTimeMap.put((byte)1, (Map<Byte, Long>)ImmutableMap.of((Object)0, (Object)1L));
        this.leftExtractors = Collections.singletonMap((byte)0, (ToLongFunctionEx & Serializable)l -> (Long)l.getRow().get(0));
        this.rightExtractors = Collections.singletonMap((byte)1, (ToLongFunctionEx & Serializable)r -> (Long)r.getRow().get(0));
        SupplierEx<Processor> supplier = this.createProcessor(1, 1, false, new int[0]);
        TestSupport.verifyProcessor(supplier).hazelcastInstance(StreamToStreamJoinPInnerTest.instance()).outputChecker(SAME_ITEMS_ANY_ORDER_EQUIVALENT_WMS).expectExactOutput(new TestSupport.TestEvent[]{TestSupport.in((int)0, (Object)StreamToStreamJoinPInnerTest.wm((long)10L, (byte)0)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.wm((long)10L, (byte)1)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)9L, (byte)1)), TestSupport.out((Object)StreamToStreamJoinPInnerTest.wm((long)9L, (byte)0)), TestSupport.in((int)1, (Object)StreamToStreamJoinPInnerTest.jetRow(8L)), TestSupport.processorAssertion(p -> {
            Assert.assertEquals((long)0L, (long)p.buffer[0].size());
            Assert.assertEquals((long)0L, (long)p.buffer[1].size());
        })});
    }

    static Expression<Boolean> createConditionFromPostponeTimeMap(Map<Byte, Map<Byte, Long>> postponeTimeMap, int ... wmKeyToColumnIndex) {
        HashMap<Byte, Byte> wmKeyToColumnIndexMap = new HashMap<Byte, Byte>();
        for (int i = 0; i < wmKeyToColumnIndex.length; i += 2) {
            wmKeyToColumnIndexMap.put((byte)wmKeyToColumnIndex[i], (byte)wmKeyToColumnIndex[i + 1]);
        }
        ArrayList<ComparisonPredicate> conditions = new ArrayList<ComparisonPredicate>();
        for (Map.Entry<Byte, Map<Byte, Long>> enOuter : postponeTimeMap.entrySet()) {
            for (Map.Entry<Byte, Long> enInner : enOuter.getValue().entrySet()) {
                byte leftColumnIndex = wmKeyToColumnIndexMap.getOrDefault(enOuter.getKey(), enOuter.getKey());
                byte rightColumnIndex = wmKeyToColumnIndexMap.getOrDefault(enInner.getKey(), enInner.getKey());
                conditions.add(ComparisonPredicate.create((Expression)ColumnExpression.create((int)leftColumnIndex, (QueryDataType)QueryDataType.BIGINT), (Expression)MinusFunction.create((Expression)ColumnExpression.create((int)rightColumnIndex, (QueryDataType)QueryDataType.BIGINT), (Expression)ConstantExpression.create((Object)enInner.getValue(), (QueryDataType)QueryDataType.BIGINT), (QueryDataType)QueryDataType.BIGINT), (ComparisonMode)ComparisonMode.GREATER_THAN_OR_EQUAL));
            }
        }
        return AndPredicate.create((Expression[])conditions.toArray(new Expression[0]));
    }

    private SupplierEx<Processor> createProcessor(int leftColumnCount, int rightColumnCount, boolean assumeEquiJoin, int ... wmKeyToColumnIndex) {
        Expression<Boolean> condition = StreamToStreamJoinPInnerTest.createConditionFromPostponeTimeMap(this.postponeTimeMap, wmKeyToColumnIndex);
        int[] equiJoinIndices = new int[assumeEquiJoin ? 1 : 0];
        JetJoinInfo joinInfo = new JetJoinInfo(JoinRelType.INNER, equiJoinIndices, equiJoinIndices, condition, condition);
        return (SupplierEx & Serializable)() -> new StreamToStreamJoinP(joinInfo, this.leftExtractors, this.rightExtractors, this.postponeTimeMap, Tuple2.tuple2((Object)leftColumnCount, (Object)rightColumnCount));
    }
}

