/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.map;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.collection.PartitionIdSet;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.processor.TransformP;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.sql.impl.ExpressionUtil;
import com.hazelcast.jet.sql.impl.JetJoinInfo;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvRowProjector;
import com.hazelcast.jet.sql.impl.connector.map.QueryUtil;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import com.hazelcast.partition.Partition;
import com.hazelcast.partition.PartitionService;
import com.hazelcast.query.Predicate;
import com.hazelcast.query.impl.getters.Extractors;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.extract.QueryPath;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

@SuppressFBWarnings(value={"SE_BAD_FIELD", "SE_NO_SERIALVERSIONID"}, justification="the class is never java-serialized")
final class JoinByEquiJoinProcessorSupplier
implements ProcessorSupplier,
DataSerializable {
    private JetJoinInfo joinInfo;
    private String mapName;
    private int partitionCount;
    private List<Integer> partitions;
    private KvRowProjector.Supplier rightRowProjectorSupplier;
    private transient MapProxyImpl<Object, Object> map;
    private transient InternalSerializationService serializationService;
    private transient Extractors extractors;

    private JoinByEquiJoinProcessorSupplier() {
    }

    JoinByEquiJoinProcessorSupplier(@Nonnull JetJoinInfo joinInfo, @Nonnull String mapName, int partitionCount, @Nullable List<Integer> partitions, @Nonnull KvRowProjector.Supplier rightRowProjectorSupplier) {
        assert (joinInfo.isEquiJoin() && (joinInfo.isInner() || joinInfo.isLeftOuter()));
        this.joinInfo = joinInfo;
        this.mapName = mapName;
        this.partitionCount = partitionCount;
        this.partitions = partitions;
        this.rightRowProjectorSupplier = rightRowProjectorSupplier;
    }

    public void init(@Nonnull ProcessorSupplier.Context context) {
        this.map = (MapProxyImpl)context.jetInstance().getMap(this.mapName);
        this.serializationService = ((Contexts.ProcSupplierCtx)context).serializationService();
        this.extractors = Extractors.newBuilder((InternalSerializationService)this.serializationService).build();
    }

    @Nonnull
    public Collection<? extends Processor> get(int count) {
        ArrayList<1> processors = new ArrayList<1>(count);
        for (int i = 0; i < count; ++i) {
            PartitionIdSet partitions = this.partitions == null ? null : new PartitionIdSet(this.partitionCount, this.partitions);
            QueryPath[] rightPaths = this.rightRowProjectorSupplier.paths();
            KvRowProjector rightProjector = this.rightRowProjectorSupplier.get(this.serializationService, this.extractors);
            TransformP<Object[], Object[]> processor = new TransformP<Object[], Object[]>(JoinByEquiJoinProcessorSupplier.joinFn(this.joinInfo, this.map, partitions, rightPaths, rightProjector)){

                public boolean isCooperative() {
                    return false;
                }
            };
            processors.add(processor);
        }
        return processors;
    }

    private static FunctionEx<Object[], Traverser<Object[]>> joinFn(JetJoinInfo joinInfo, MapProxyImpl<Object, Object> map, PartitionIdSet partitions, QueryPath[] rightPaths, KvRowProjector rightRowProjector) {
        return (FunctionEx & Serializable)left -> {
            Predicate<Object, Object> predicate = QueryUtil.toPredicate(left, joinInfo.leftEquiJoinIndices(), joinInfo.rightEquiJoinIndices(), rightPaths);
            if (predicate == null) {
                return joinInfo.isInner() ? Traversers.empty() : Traversers.singleton((Object)Util.extendArray((Object[])left, (int)rightRowProjector.getColumnCount()));
            }
            Set matchingRows = joinInfo.isInner() ? map.entrySet(predicate, partitions.copy()) : map.entrySet(predicate);
            List<Object[]> joined = JoinByEquiJoinProcessorSupplier.join(left, matchingRows, rightRowProjector, joinInfo.nonEquiCondition());
            return joined.isEmpty() && joinInfo.isLeftOuter() ? Traversers.singleton((Object)Util.extendArray((Object[])left, (int)rightRowProjector.getColumnCount())) : Traversers.traverseIterable(joined);
        };
    }

    private static List<Object[]> join(Object[] left, Set<Map.Entry<Object, Object>> entries, KvRowProjector rightRowProjector, Expression<Boolean> condition) {
        ArrayList<Object[]> rows = new ArrayList<Object[]>();
        for (Map.Entry<Object, Object> entry : entries) {
            Object[] joined;
            Object[] right = rightRowProjector.project(entry);
            if (right == null || (joined = ExpressionUtil.join(left, right, condition)) == null) continue;
            rows.add(joined);
        }
        return rows;
    }

    public void writeData(ObjectDataOutput out) throws IOException {
        out.writeObject((Object)this.joinInfo);
        out.writeObject((Object)this.mapName);
        out.writeInt(this.partitionCount);
        out.writeObject(this.partitions);
        out.writeObject((Object)this.rightRowProjectorSupplier);
    }

    public void readData(ObjectDataInput in) throws IOException {
        this.joinInfo = (JetJoinInfo)in.readObject();
        this.mapName = (String)in.readObject();
        this.partitionCount = in.readInt();
        this.partitions = (List)in.readObject();
        this.rightRowProjectorSupplier = (KvRowProjector.Supplier)in.readObject();
    }

    static ProcessorMetaSupplier supplier(JetJoinInfo joinInfo, String mapName, KvRowProjector.Supplier rightRowProjectorSupplier) {
        return new Supplier(joinInfo, mapName, rightRowProjectorSupplier);
    }

    @SuppressFBWarnings(value={"SE_BAD_FIELD", "SE_NO_SERIALVERSIONID"}, justification="the class is never java-serialized")
    private static final class Supplier
    implements ProcessorMetaSupplier,
    DataSerializable {
        private JetJoinInfo joinInfo;
        private String mapName;
        private KvRowProjector.Supplier rightRowProjectorSupplier;
        private transient PartitionService partitionService;

        private Supplier() {
        }

        private Supplier(JetJoinInfo joinInfo, String mapName, KvRowProjector.Supplier rightRowProjectorSupplier) {
            assert (joinInfo.isEquiJoin() && (joinInfo.isInner() || joinInfo.isLeftOuter()));
            this.joinInfo = joinInfo;
            this.mapName = mapName;
            this.rightRowProjectorSupplier = rightRowProjectorSupplier;
        }

        public void init(@Nonnull ProcessorMetaSupplier.Context context) {
            this.partitionService = context.jetInstance().getHazelcastInstance().getPartitionService();
        }

        @Nonnull
        public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) {
            if (this.joinInfo.isInner()) {
                Set partitions = this.partitionService.getPartitions();
                int partitionCount = partitions.size();
                Map partitionsByMember = Util.assignPartitions(addresses, partitions.stream().collect(Collectors.groupingBy(partition -> partition.getOwner().getAddress(), Collectors.mapping(Partition::getPartitionId, Collectors.toList()))));
                return address -> new JoinByEquiJoinProcessorSupplier(this.joinInfo, this.mapName, partitionCount, (List)partitionsByMember.get(address), this.rightRowProjectorSupplier);
            }
            return address -> new JoinByEquiJoinProcessorSupplier(this.joinInfo, this.mapName, 0, null, this.rightRowProjectorSupplier);
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeObject((Object)this.joinInfo);
            out.writeObject((Object)this.mapName);
            out.writeObject((Object)this.rightRowProjectorSupplier);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.joinInfo = (JetJoinInfo)in.readObject();
            this.mapName = (String)in.readObject();
            this.rightRowProjectorSupplier = (KvRowProjector.Supplier)in.readObject();
        }
    }
}

