/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.map;

import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.core.test.TestProcessorMetaSupplierContext;
import com.hazelcast.jet.core.test.TestProcessorSupplierContext;
import com.hazelcast.jet.impl.connector.ReadMapOrCacheP;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.map.SpecificPartitionsImapReaderPms;
import com.hazelcast.map.IMap;
import com.hazelcast.shaded.org.apache.commons.lang3.ArrayUtils;
import com.hazelcast.sql.impl.expression.ConstantExpression;
import com.hazelcast.sql.impl.type.QueryDataType;
import com.hazelcast.test.Accessors;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.assertj.core.api.AbstractIntArrayAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastSerialClassRunner.class)
@Category(value={QuickTest.class, ParallelJVMTest.class})
public class SpecificPartitionsImapReaderPmsTest
extends SqlTestSupport {
    private static final int MEMBERS = 5;
    private static final int ITERATIONS = 1000;
    private int coordinatorOwnedPartitionKey;
    private int coordinatorOwnedPartitionId;
    private int[] perMemberOwnedPKey;
    private int[] perMemberOwnedPId;
    private String mapName;
    private String sinkName;
    private IMap<Integer, Integer> sourceMap;
    private IMap<Integer, Integer> sinkMap;
    Map<Address, int[]> partitionAssignment;
    Map<Integer, Address> reversedPartitionAssignment;

    @BeforeClass
    public static void beforeClass() throws Exception {
        SpecificPartitionsImapReaderPmsTest.initialize((int)5, null);
    }

    @Before
    public void before() throws Exception {
        this.partitionAssignment = SpecificPartitionsImapReaderPmsTest.getPartitionAssignment((HazelcastInstance)SpecificPartitionsImapReaderPmsTest.instance());
        this.reversedPartitionAssignment = new HashMap<Integer, Address>();
        for (Map.Entry<Address, int[]> entry : this.partitionAssignment.entrySet()) {
            for (int pId : entry.getValue()) {
                this.reversedPartitionAssignment.put(pId, entry.getKey());
            }
        }
        this.mapName = SpecificPartitionsImapReaderPmsTest.randomName();
        this.sinkName = SpecificPartitionsImapReaderPmsTest.randomName();
        this.sourceMap = SpecificPartitionsImapReaderPmsTest.instance().getMap(this.mapName);
        this.sinkMap = SpecificPartitionsImapReaderPmsTest.instance().getMap(this.sinkName);
        this.perMemberOwnedPKey = new int[4];
        this.perMemberOwnedPId = new int[4];
        Address coordinatorAddress = Accessors.getAddress((HazelcastInstance)SpecificPartitionsImapReaderPmsTest.instance());
        for (int i = 1; i < 1000; ++i) {
            int pIdCandidate = SpecificPartitionsImapReaderPmsTest.instance().getPartitionService().getPartition((Object)i).getPartitionId();
            if (!this.reversedPartitionAssignment.get(pIdCandidate).equals((Object)coordinatorAddress)) continue;
            this.coordinatorOwnedPartitionKey = i;
            this.coordinatorOwnedPartitionId = pIdCandidate;
            break;
        }
        HashSet<Address> clusterAddresses = new HashSet<Address>(this.partitionAssignment.keySet());
        clusterAddresses.remove(coordinatorAddress);
        int j = 0;
        for (int i = 1; i < 1000 && !clusterAddresses.isEmpty(); ++i) {
            int pIdCandidate = Accessors.getNodeEngineImpl((HazelcastInstance)SpecificPartitionsImapReaderPmsTest.instance()).getPartitionService().getPartitionId((Object)i);
            Address address = this.reversedPartitionAssignment.get(pIdCandidate);
            if (!clusterAddresses.contains(address)) continue;
            this.perMemberOwnedPKey[j] = i;
            this.perMemberOwnedPId[j++] = pIdCandidate;
            clusterAddresses.remove(address);
        }
    }

    @Test
    public void test_nonPrunableScan_unitTest() throws Exception {
        SpecificPartitionsImapReaderPms readPms = (SpecificPartitionsImapReaderPms)SpecificPartitionsImapReaderPms.mapReader((String)this.mapName, null, null);
        readPms.init((ProcessorMetaSupplier.Context)SpecificPartitionsImapReaderPmsTest.getMetaSupplierContext());
        Assert.assertNull((Object)readPms.partitionsToScan);
        Function psf = readPms.get(new ArrayList<Address>(this.partitionAssignment.keySet()));
        for (HazelcastInstance hz : SpecificPartitionsImapReaderPmsTest.instances()) {
            TestProcessorSupplierContext psCtx = new TestProcessorSupplierContext().setHazelcastInstance(hz);
            Address address = Accessors.getAddress((HazelcastInstance)hz);
            ReadMapOrCacheP.LocalProcessorSupplier ps = (ReadMapOrCacheP.LocalProcessorSupplier)psf.apply(address);
            ps.init((ProcessorSupplier.Context)psCtx);
            ((AbstractIntArrayAssert)Assertions.assertThat((int[])ps.getPartitionsToScan()).as("Should scan all partitions owned by member", new Object[0])).containsExactly(psCtx.memberPartitions());
        }
    }

    @Test
    public void test_prunableScan_unitTest() throws Exception {
        for (int partitionCountToUse = 0; partitionCountToUse < this.perMemberOwnedPKey.length + 1; ++partitionCountToUse) {
            ArrayList<List<ConstantExpression>> expressions = new ArrayList<List<ConstantExpression>>();
            if (partitionCountToUse > 0) {
                expressions.add(List.of(ConstantExpression.create((Object)this.coordinatorOwnedPartitionKey, (QueryDataType)QueryDataType.INT)));
            }
            for (int i = 0; i < partitionCountToUse - 1; ++i) {
                expressions.add(List.of(ConstantExpression.create((Object)this.perMemberOwnedPKey[i], (QueryDataType)QueryDataType.INT)));
            }
            SpecificPartitionsImapReaderPms readPms = (SpecificPartitionsImapReaderPms)SpecificPartitionsImapReaderPms.mapReader((String)this.mapName, null, expressions);
            readPms.init((ProcessorMetaSupplier.Context)SpecificPartitionsImapReaderPmsTest.getMetaSupplierContext());
            int[] expected = partitionCountToUse > 0 ? ArrayUtils.add((int[])Arrays.copyOf(this.perMemberOwnedPId, partitionCountToUse - 1), (int)this.coordinatorOwnedPartitionId) : new int[]{};
            ((AbstractIntArrayAssert)((AbstractIntArrayAssert)Assertions.assertThat((int[])readPms.partitionsToScan).as("Should scan expected partitions", new Object[0])).containsExactlyInAnyOrder(expected).as("Partitions list should be sorted", new Object[0])).isSorted();
        }
    }

    @Test
    public void test_prunableScan_psUnitTest() throws Exception {
        ArrayList<List<ConstantExpression>> expressions = new ArrayList<List<ConstantExpression>>();
        expressions.add(List.of(ConstantExpression.create((Object)this.coordinatorOwnedPartitionKey, (QueryDataType)QueryDataType.INT)));
        SpecificPartitionsImapReaderPms readPms = (SpecificPartitionsImapReaderPms)SpecificPartitionsImapReaderPms.mapReader((String)this.mapName, null, expressions);
        TestProcessorSupplierContext context = new TestProcessorSupplierContext().setHazelcastInstance(SpecificPartitionsImapReaderPmsTest.instance());
        readPms.init((ProcessorMetaSupplier.Context)context);
        Address coordinatorAddress = Accessors.getAddress((HazelcastInstance)SpecificPartitionsImapReaderPmsTest.instance());
        ReadMapOrCacheP.LocalProcessorSupplier ps = (ReadMapOrCacheP.LocalProcessorSupplier)readPms.get(List.of(coordinatorAddress)).apply(coordinatorAddress);
        ps.init((ProcessorSupplier.Context)context);
        ((AbstractIntArrayAssert)Assertions.assertThat((int[])ps.getPartitionsToScan()).as("Should scan only expected partitions", new Object[0])).containsExactly(new int[]{this.coordinatorOwnedPartitionId});
    }

    @Test
    public void test_prunableScanDuplicates_unitTest() throws Exception {
        ArrayList<List<ConstantExpression>> expressions = new ArrayList<List<ConstantExpression>>();
        for (int i = 0; i < 5; ++i) {
            expressions.add(List.of(ConstantExpression.create((Object)this.coordinatorOwnedPartitionKey, (QueryDataType)QueryDataType.INT)));
        }
        SpecificPartitionsImapReaderPms readPms = (SpecificPartitionsImapReaderPms)SpecificPartitionsImapReaderPms.mapReader((String)this.mapName, null, expressions);
        readPms.init((ProcessorMetaSupplier.Context)SpecificPartitionsImapReaderPmsTest.getMetaSupplierContext());
        ((AbstractIntArrayAssert)Assertions.assertThat((int[])readPms.partitionsToScan).as("Should scan partition once", new Object[0])).containsExactly(new int[]{this.coordinatorOwnedPartitionId});
    }

    @Test
    public void test_nonPrunableScan() {
        this.sourceMap.put((Object)0, (Object)0);
        DAG dag = new DAG();
        ProcessorMetaSupplier readPms = SpecificPartitionsImapReaderPms.mapReader((String)this.mapName, null, null);
        Vertex source = dag.newVertex("source", readPms);
        Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP((String)this.sinkName));
        dag.edge(Edge.between((Vertex)source, (Vertex)sink));
        SpecificPartitionsImapReaderPmsTest.instance().getJet().newLightJob(dag).join();
        Assert.assertEquals((Object)0, (Object)SpecificPartitionsImapReaderPmsTest.instance().getMap(this.sinkName).get((Object)0));
    }

    @Test
    public void test_prunableSinglePartition() {
        int partitionsToUse = 1;
        DAG dag = this.setupPrunableDag(this.sourceMap, partitionsToUse);
        SpecificPartitionsImapReaderPmsTest.instance().getJet().newLightJob(dag).join();
        this.assertPrunability(partitionsToUse);
    }

    @Test
    public void test_prunableMultiplePartitions() {
        int partitionsToUse = 3;
        DAG dag = this.setupPrunableDag(this.sourceMap, partitionsToUse);
        SpecificPartitionsImapReaderPmsTest.instance().getJet().newLightJob(dag).join();
        this.assertPrunability(partitionsToUse);
    }

    @Test
    public void test_prunableNoPartitions() {
        int partitionsToUse = 0;
        DAG dag = this.setupPrunableDag(this.sourceMap, partitionsToUse);
        SpecificPartitionsImapReaderPmsTest.instance().getJet().newLightJob(dag).join();
        this.assertPrunability(partitionsToUse);
    }

    private DAG setupPrunableDag(IMap<Integer, Integer> map, int partitionCountToUse) {
        ArrayList<List<ConstantExpression>> expressions = new ArrayList<List<ConstantExpression>>();
        if (partitionCountToUse > 0) {
            map.put((Object)this.coordinatorOwnedPartitionKey, (Object)this.coordinatorOwnedPartitionKey);
            expressions.add(List.of(ConstantExpression.create((Object)this.coordinatorOwnedPartitionKey, (QueryDataType)QueryDataType.INT)));
        }
        for (int i = 0; i < partitionCountToUse - 1; ++i) {
            map.put((Object)this.perMemberOwnedPKey[i], (Object)this.perMemberOwnedPKey[i]);
            expressions.add(List.of(ConstantExpression.create((Object)this.perMemberOwnedPKey[i], (QueryDataType)QueryDataType.INT)));
        }
        Assert.assertEquals((long)partitionCountToUse, (long)expressions.size());
        DAG dag = new DAG();
        ProcessorMetaSupplier mapReader = SpecificPartitionsImapReaderPms.mapReader((String)this.mapName, null, expressions);
        Vertex source = dag.newVertex("source", mapReader);
        Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP((String)this.sinkName));
        dag.edge(Edge.between((Vertex)source, (Vertex)sink));
        return dag;
    }

    private void assertPrunability(int partitionCountUsed) {
        Assert.assertEquals((long)partitionCountUsed, (long)this.sinkMap.size());
        if (partitionCountUsed > 0) {
            Assert.assertEquals((Object)this.coordinatorOwnedPartitionKey, (Object)SpecificPartitionsImapReaderPmsTest.instance().getMap(this.sinkName).get((Object)this.coordinatorOwnedPartitionKey));
        }
        for (int i = 0; i < partitionCountUsed - 1; ++i) {
            Assert.assertEquals((Object)this.perMemberOwnedPKey[i], (Object)SpecificPartitionsImapReaderPmsTest.instance().getMap(this.sinkName).get((Object)this.perMemberOwnedPKey[i]));
        }
    }

    @Nonnull
    private static TestProcessorMetaSupplierContext getMetaSupplierContext() {
        return new TestProcessorMetaSupplierContext().setHazelcastInstance(SpecificPartitionsImapReaderPmsTest.instance());
    }
}

