/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.opt.physical;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.internal.util.MutableByte;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.pipeline.ServiceFactories;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.sql.impl.ExpressionUtil;
import com.hazelcast.jet.sql.impl.HazelcastPhysicalScan;
import com.hazelcast.jet.sql.impl.JetJoinInfo;
import com.hazelcast.jet.sql.impl.ObjectArrayKey;
import com.hazelcast.jet.sql.impl.aggregate.WindowUtils;
import com.hazelcast.jet.sql.impl.connector.HazelcastRexNode;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
import com.hazelcast.jet.sql.impl.connector.SqlConnectorUtil;
import com.hazelcast.jet.sql.impl.connector.map.IMapSqlConnector;
import com.hazelcast.jet.sql.impl.opt.ExpressionValues;
import com.hazelcast.jet.sql.impl.opt.WatermarkKeysAssigner;
import com.hazelcast.jet.sql.impl.opt.WatermarkThrottlingFrameSizeCalculator;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateAccumulateByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateAccumulatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateCombineByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateCombinePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.CalcPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.CreateDagVisitorBase;
import com.hazelcast.jet.sql.impl.opt.physical.DagBuildContextImpl;
import com.hazelcast.jet.sql.impl.opt.physical.DeletePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.DropLateItemsPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.FullScanPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.IndexScanMapPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.InsertPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.JoinHashPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.JoinNestedLoopPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.LimitPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.PhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.RootRel;
import com.hazelcast.jet.sql.impl.opt.physical.SinkPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SlidingWindowAggregatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SlidingWindowPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SortPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.StreamToStreamJoinPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UnionPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UpdatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.ValuesPhysicalRel;
import com.hazelcast.jet.sql.impl.processors.LateItemsDropP;
import com.hazelcast.jet.sql.impl.processors.RootResultConsumerSink;
import com.hazelcast.jet.sql.impl.processors.SqlHashJoinP;
import com.hazelcast.jet.sql.impl.processors.StreamToStreamJoinP;
import com.hazelcast.jet.sql.impl.schema.HazelcastTable;
import com.hazelcast.shaded.org.apache.calcite.rel.RelNode;
import com.hazelcast.shaded.org.apache.calcite.rel.SingleRel;
import com.hazelcast.shaded.org.apache.calcite.rex.RexProgram;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.QueryParameterMetadata;
import com.hazelcast.sql.impl.expression.ConstantExpression;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.expression.MockExpressionEvalContext;
import com.hazelcast.sql.impl.optimizer.PlanObjectKey;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.type.QueryDataType;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;

public class CreateTopLevelDagVisitor
extends CreateDagVisitorBase<Vertex> {
    private static final ExpressionEvalContext MOCK_EEC = new MockExpressionEvalContext();
    private static final int HIGH_PRIORITY = 1;
    private static final int LOW_PRIORITY = 10;
    private final Set<PlanObjectKey> objectKeys = new HashSet<PlanObjectKey>();
    private final NodeEngine nodeEngine;
    private final Address localMemberAddress;
    private final WatermarkKeysAssigner watermarkKeysAssigner;
    private long watermarkThrottlingFrameSize = -1L;
    private final Map<String, List<Map<String, Expression<?>>>> partitionStrategyCandidates;
    private final DagBuildContextImpl dagBuildContext;

    public CreateTopLevelDagVisitor(NodeEngine nodeEngine, QueryParameterMetadata parameterMetadata, WatermarkKeysAssigner watermarkKeysAssigner, Set<PlanObjectKey> usedViews, @Nullable Map<String, List<Map<String, Expression<?>>>> partitionStrategyCandidates) {
        super(new DAG());
        this.nodeEngine = nodeEngine;
        this.localMemberAddress = nodeEngine.getThisAddress();
        this.watermarkKeysAssigner = watermarkKeysAssigner;
        this.objectKeys.addAll(usedViews);
        this.partitionStrategyCandidates = partitionStrategyCandidates;
        this.dagBuildContext = new DagBuildContextImpl(nodeEngine, this.getDag(), parameterMetadata);
    }

    @Override
    public Vertex onValues(ValuesPhysicalRel rel) {
        List<ExpressionValues> values = rel.values();
        return this.dag.newUniqueVertex("Values", SourceProcessors.convenientSourceP(ExpressionEvalContext::from, (BiConsumerEx & Serializable)(context, buffer) -> {
            values.forEach(vs -> vs.toValues((ExpressionEvalContext)context).forEach(arg_0 -> ((SourceBuilder.SourceBuffer)buffer).add(arg_0)));
            buffer.close();
        }, (FunctionEx & Serializable)ctx -> null, (BiConsumerEx & Serializable)(ctx, states) -> {}, (ConsumerEx)ConsumerEx.noop(), (int)0, (boolean)true, null));
    }

    @Override
    public Vertex onInsert(InsertPhysicalRel rel) {
        this.watermarkThrottlingFrameSize = WatermarkThrottlingFrameSizeCalculator.calculate((PhysicalRel)rel.getInput(), MOCK_EEC);
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        this.dagBuildContext.setTable((Table)table);
        this.dagBuildContext.setRel(rel);
        SqlConnector.VertexWithInputConfig vertexWithConfig = SqlConnectorUtil.getJetSqlConnector(table).insertProcessor(this.dagBuildContext);
        Vertex vertex = vertexWithConfig.vertex();
        this.connectInput(rel.getInput(), vertex, vertexWithConfig.configureEdgeFn());
        return vertex;
    }

    @Override
    public Vertex onSink(SinkPhysicalRel rel) {
        this.watermarkThrottlingFrameSize = WatermarkThrottlingFrameSizeCalculator.calculate((PhysicalRel)rel.getInput(), MOCK_EEC);
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        this.dagBuildContext.setTable((Table)table);
        this.dagBuildContext.setRel(rel);
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).sinkProcessor(this.dagBuildContext);
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    @Override
    public Vertex onUpdate(UpdatePhysicalRel rel) {
        this.watermarkThrottlingFrameSize = WatermarkThrottlingFrameSizeCalculator.calculate(rel, MOCK_EEC);
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.dagBuildContext.setTable((Table)table);
        this.dagBuildContext.setRel(rel);
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).updateProcessor(this.dagBuildContext, rel.getUpdateColumnList(), HazelcastRexNode.wrap(rel.getSourceExpressionList()), HazelcastRexNode.wrap(rel.getPredicate()), rel.getInput() != null);
        if (rel.getInput() != null) {
            this.connectInput(rel.getInput(), vertex, null);
        }
        return vertex;
    }

    @Override
    public Vertex onDelete(DeletePhysicalRel rel) {
        this.watermarkThrottlingFrameSize = WatermarkThrottlingFrameSizeCalculator.calculate(rel, MOCK_EEC);
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.dagBuildContext.setTable((Table)table);
        this.dagBuildContext.setRel(rel);
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).deleteProcessor(this.dagBuildContext, HazelcastRexNode.wrap(rel.getPredicate()), rel.getInput() != null);
        if (rel.getInput() != null) {
            this.connectInput(rel.getInput(), vertex, null);
        }
        return vertex;
    }

    @Override
    public Vertex onFullScan(FullScanPhysicalRel rel) {
        Byte wmKey;
        HazelcastTable hazelcastTable = rel.getTable().unwrap(HazelcastTable.class);
        Object table = hazelcastTable.getTarget();
        this.collectObjectKeys((Table)table);
        BiFunctionEx<ExpressionEvalContext, Byte, EventTimePolicy<JetSqlRow>> policyProvider = rel.eventTimePolicyProvider(rel.watermarkedColumnIndex(), rel.lagExpression(), this.watermarkThrottlingFrameSize);
        Map<Integer, MutableByte> fieldsKey = this.watermarkKeysAssigner.getWatermarkedFieldsKey(rel);
        if (fieldsKey != null) {
            wmKey = fieldsKey.get(rel.watermarkedColumnIndex()).getValue();
        } else {
            assert (rel.watermarkedColumnIndex() < 0);
            wmKey = null;
        }
        this.dagBuildContext.setTable((Table)table);
        this.dagBuildContext.setRel(rel);
        List<Map<String, Expression<?>>> partitionStrategyCandidate = null;
        if (this.partitionStrategyCandidates != null) {
            partitionStrategyCandidate = this.partitionStrategyCandidates.get(((Table)table).getSqlName());
        }
        Object sqlConnector = SqlConnectorUtil.getJetSqlConnector(table);
        return sqlConnector.fullScanReader(this.dagBuildContext, HazelcastRexNode.wrap(rel.filter()), HazelcastRexNode.wrap(rel.projection()), partitionStrategyCandidate, policyProvider != null ? (FunctionEx & Serializable)context -> (EventTimePolicy)policyProvider.apply(context, (Object)wmKey) : null);
    }

    @Override
    public Vertex onMapIndexScan(IndexScanMapPhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        this.dagBuildContext.setTable((Table)table);
        this.dagBuildContext.setRel(rel);
        return ((IMapSqlConnector)SqlConnectorUtil.getJetSqlConnector(table)).indexScanReader(this.dagBuildContext, this.localMemberAddress, rel.getIndex(), HazelcastRexNode.wrap(rel.filter()), HazelcastRexNode.wrap(rel.projection()), rel.getIndexFilter(), rel.getComparator(), rel.isDescending(), rel.requiresSort());
    }

    @Override
    public Vertex onCalc(CalcPhysicalRel rel) {
        Vertex vertex;
        RexProgram program = rel.getProgram();
        this.dagBuildContext.setTable(null);
        this.dagBuildContext.setRel(rel);
        List<Expression<?>> projection = this.dagBuildContext.convertProjection(HazelcastRexNode.wrap(rel.projection()));
        boolean projectionsCooperative = projection.stream().allMatch(Expression::isCooperative);
        if (program.getCondition() != null) {
            Expression<Boolean> filterExpr = this.dagBuildContext.convertFilter(HazelcastRexNode.wrap(rel.filter()));
            assert (filterExpr != null);
            vertex = this.dag.newUniqueVertex("Calc", Processors.mapUsingServiceP((ServiceFactory)ServiceFactories.nonSharedService((FunctionEx & Serializable)ctx -> ExpressionUtil.calcFn(projection, filterExpr, ExpressionEvalContext.from((ProcessorMetaSupplier.Context)ctx))).setCooperative(projectionsCooperative && filterExpr.isCooperative()), (BiFunctionEx & Serializable)(calcFn, row) -> (JetSqlRow)calcFn.apply(row)));
        } else {
            vertex = this.dag.newUniqueVertex("Project", Processors.mapUsingServiceP((ServiceFactory)ServiceFactories.nonSharedService((FunctionEx & Serializable)ctx -> ExpressionUtil.projectionFn(projection, ExpressionEvalContext.from((ProcessorMetaSupplier.Context)ctx))).setCooperative(projectionsCooperative), (BiFunctionEx & Serializable)(projectionFn, row) -> (JetSqlRow)projectionFn.apply(row)));
        }
        this.connectInputPreserveCollation(rel, vertex);
        return vertex;
    }

    @Override
    public Vertex onSort(SortPhysicalRel rel) {
        ComparatorEx<JetSqlRow> comparator = ExpressionUtil.comparisonFn(rel.getCollations());
        Vertex sortVertex = this.dag.newUniqueVertex("Sort", ProcessorMetaSupplier.of((SupplierEx)Processors.sortP(comparator)));
        this.connectInput(rel.getInput(), sortVertex, null);
        Vertex combineVertex = this.dag.newUniqueVertex("SortCombine", ProcessorMetaSupplier.forceTotalParallelismOne((ProcessorSupplier)ProcessorSupplier.of((SupplierEx)Processors.mapP((FunctionEx)FunctionEx.identity())), (Address)this.localMemberAddress));
        Edge edge = Edge.between((Vertex)sortVertex, (Vertex)combineVertex).ordered(comparator).distributeTo(this.localMemberAddress).allToOne();
        this.dag.edge(edge);
        return combineVertex;
    }

    @Override
    public Vertex onAggregate(AggregatePhysicalRel rel) {
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Aggregate", ProcessorMetaSupplier.forceTotalParallelismOne((ProcessorSupplier)ProcessorSupplier.of((SupplierEx)Processors.aggregateP(aggregateOperation)), (Address)this.localMemberAddress));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne());
        return vertex;
    }

    @Override
    public Vertex onAccumulate(AggregateAccumulatePhysicalRel rel) {
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Accumulate", Processors.accumulateP(aggregateOperation));
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    @Override
    public Vertex onCombine(AggregateCombinePhysicalRel rel) {
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Combine", ProcessorMetaSupplier.forceTotalParallelismOne((ProcessorSupplier)ProcessorSupplier.of((SupplierEx)Processors.combineP(aggregateOperation)), (Address)this.localMemberAddress));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne());
        return vertex;
    }

    @Override
    public Vertex onAggregateByKey(AggregateByKeyPhysicalRel rel) {
        FunctionEx<JetSqlRow, ObjectArrayKey> groupKeyFn = rel.groupKeyFn();
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("AggregateByKey", Processors.aggregateByKeyP(Collections.singletonList(groupKeyFn), aggregateOperation, (BiFunctionEx & Serializable)(key, value) -> value));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributed().partitioned(groupKeyFn));
        return vertex;
    }

    @Override
    public Vertex onAccumulateByKey(AggregateAccumulateByKeyPhysicalRel rel) {
        FunctionEx<JetSqlRow, ObjectArrayKey> groupKeyFn = rel.groupKeyFn();
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("AccumulateByKey", Processors.accumulateByKeyP(Collections.singletonList(groupKeyFn), aggregateOperation));
        this.connectInput(rel.getInput(), vertex, edge -> edge.partitioned(groupKeyFn));
        return vertex;
    }

    @Override
    public Vertex onCombineByKey(AggregateCombineByKeyPhysicalRel rel) {
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("CombineByKey", Processors.combineByKeyP(aggregateOperation, (BiFunctionEx & Serializable)(key, value) -> value));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributed().partitioned(Functions.entryKey()));
        return vertex;
    }

    @Override
    public Vertex onSlidingWindow(SlidingWindowPhysicalRel rel) {
        int orderingFieldIndex = rel.orderingFieldIndex();
        FunctionEx<ExpressionEvalContext, SlidingWindowPolicy> windowPolicySupplier = rel.windowPolicyProvider();
        Vertex vertex = this.dag.newUniqueVertex("Sliding-Window", Processors.flatMapUsingServiceP((ServiceFactory)ServiceFactories.nonSharedService((FunctionEx & Serializable)ctx -> {
            ExpressionEvalContext evalContext = ExpressionEvalContext.from((ProcessorMetaSupplier.Context)ctx);
            SlidingWindowPolicy windowPolicy = (SlidingWindowPolicy)windowPolicySupplier.apply((Object)evalContext);
            return row -> WindowUtils.addWindowBounds(row, orderingFieldIndex, windowPolicy);
        }), Function::apply));
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    @Override
    public Vertex onSlidingWindowAggregate(SlidingWindowAggregatePhysicalRel rel) {
        byte watermarkKey;
        FunctionEx<JetSqlRow, ?> groupKeyFn = rel.groupKeyFn();
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Expression<?> timestampExpression = rel.timestampExpression();
        ToLongFunctionEx & Serializable timestampFn = (ToLongFunctionEx & Serializable)row -> WindowUtils.extractMillis(timestampExpression.eval(row.getRow(), MOCK_EEC));
        SlidingWindowPolicy windowPolicy = (SlidingWindowPolicy)rel.windowPolicyProvider().apply((Object)MOCK_EEC);
        KeyedWindowResultFunction<? super Object, ? super JetSqlRow, ?> resultMapping = rel.outputValueMapping();
        Map<Integer, MutableByte> watermarkedFieldsKeys = this.watermarkKeysAssigner.getWatermarkedFieldsKey(rel);
        MutableByte mutableWatermarkKey = watermarkedFieldsKeys.isEmpty() ? this.watermarkKeysAssigner.getInputWatermarkKey(rel) : watermarkedFieldsKeys.get(rel.timestampFieldIndex());
        byte by = watermarkKey = mutableWatermarkKey != null ? mutableWatermarkKey.getValue() : watermarkedFieldsKeys.get(rel.watermarkedFields().findFirst(rel.getGroupSet())).getValue();
        if (rel.numStages() == 1) {
            Vertex vertex = this.dag.newUniqueVertex("Sliding-Window-AggregateByKey", Processors.aggregateToSlidingWindowP(Collections.singletonList(groupKeyFn), Collections.singletonList(timestampFn), (TimestampKind)TimestampKind.EVENT, (SlidingWindowPolicy)windowPolicy, (long)0L, aggregateOperation, resultMapping, (byte)watermarkKey));
            this.connectInput(rel.getInput(), vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne());
            return vertex;
        }
        assert (rel.numStages() == 2);
        Vertex vertex1 = this.dag.newUniqueVertex("Sliding-Window-AccumulateByKey", Processors.accumulateByFrameP(Collections.singletonList(groupKeyFn), Collections.singletonList(timestampFn), (TimestampKind)TimestampKind.EVENT, (SlidingWindowPolicy)windowPolicy, aggregateOperation, (byte)watermarkKey));
        Vertex vertex2 = this.dag.newUniqueVertex("Sliding-Window-CombineByKey", Processors.combineToSlidingWindowP((SlidingWindowPolicy)windowPolicy, aggregateOperation, resultMapping, (byte)watermarkKey));
        this.connectInput(rel.getInput(), vertex1, edge -> edge.partitioned(groupKeyFn));
        this.dag.edge(Edge.between((Vertex)vertex1, (Vertex)vertex2).distributed().partitioned(Functions.entryKey()));
        return vertex2;
    }

    @Override
    public Vertex onDropLateItems(DropLateItemsPhysicalRel rel) {
        Expression<?> timestampExpression = rel.timestampExpression();
        byte key = this.watermarkKeysAssigner.getWatermarkedFieldsKey(rel).get(rel.wmField()).getValue();
        SupplierEx & Serializable lateItemsDropPSupplier = (SupplierEx & Serializable)() -> new LateItemsDropP(key, timestampExpression);
        Vertex vertex = this.dag.newUniqueVertex("Drop-Late-Items", (SupplierEx)lateItemsDropPSupplier);
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    @Override
    public Vertex onNestedLoopJoin(JoinNestedLoopPhysicalRel rel) {
        assert (rel.getRight() instanceof HazelcastPhysicalScan) : rel.getRight().getClass();
        Object rightTable = rel.getRight().getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)rightTable);
        this.dagBuildContext.setTable((Table)rightTable);
        this.dagBuildContext.setRel(rel);
        SqlConnector.VertexWithInputConfig vertexWithConfig = SqlConnectorUtil.getJetSqlConnector(rightTable).nestedLoopReader(this.dagBuildContext, HazelcastRexNode.wrap(rel.rightFilter()), HazelcastRexNode.wrap(rel.rightProjection()), rel.joinInfo(this.dagBuildContext.getParameterMetadata()));
        Vertex vertex = vertexWithConfig.vertex();
        this.connectInput(rel.getLeft(), vertex, vertexWithConfig.configureEdgeFn());
        return vertex;
    }

    @Override
    public Vertex onHashJoin(JoinHashPhysicalRel rel) {
        JetJoinInfo joinInfo = rel.joinInfo(this.dagBuildContext.getParameterMetadata());
        Vertex joinVertex = this.dag.newUniqueVertex("Hash Join", (ProcessorSupplier)SqlHashJoinP.supplier(joinInfo, rel.getRight().getRowType().getFieldCount()));
        this.connectJoinInput(joinInfo, rel.getLeft(), rel.getRight(), joinVertex);
        return joinVertex;
    }

    @Override
    public Vertex onStreamToStreamJoin(StreamToStreamJoinPhysicalRel rel) {
        JetJoinInfo joinInfo = rel.joinInfo(this.dagBuildContext.getParameterMetadata());
        HashMap<Byte, ToLongFunctionEx<JetSqlRow>> leftExtractors = new HashMap<Byte, ToLongFunctionEx<JetSqlRow>>();
        HashMap<Byte, ToLongFunctionEx<JetSqlRow>> rightExtractors = new HashMap<Byte, ToLongFunctionEx<JetSqlRow>>();
        Map<Integer, MutableByte> refByteMap = this.watermarkKeysAssigner.getWatermarkedFieldsKey(rel.getLeft());
        for (Map.Entry<Integer, ToLongFunctionEx<JetSqlRow>> entry : rel.leftTimeExtractors().entrySet()) {
            Byte by = refByteMap.get(entry.getKey()).getValue();
            leftExtractors.put(by, entry.getValue());
        }
        refByteMap = this.watermarkKeysAssigner.getWatermarkedFieldsKey(rel.getRight());
        for (Map.Entry<Integer, ToLongFunctionEx<JetSqlRow>> entry : rel.rightTimeExtractors().entrySet()) {
            Byte by = refByteMap.get(entry.getKey()).getValue();
            rightExtractors.put(by, entry.getValue());
        }
        refByteMap = this.watermarkKeysAssigner.getWatermarkedFieldsKey(rel);
        HashMap<Byte, Map<Byte, Long>> postponeTimeMap = new HashMap<Byte, Map<Byte, Long>>();
        for (Map.Entry<Integer, Map<Integer, Long>> entry : rel.postponeTimeMap().entrySet()) {
            HashMap<Byte, Long> map = new HashMap<Byte, Long>();
            for (Map.Entry<Integer, Long> innerEntry : entry.getValue().entrySet()) {
                map.put(refByteMap.get(innerEntry.getKey()).getValue(), innerEntry.getValue());
            }
            postponeTimeMap.put(refByteMap.get(entry.getKey()).getValue(), map);
        }
        for (MutableByte mutableByte : refByteMap.values()) {
            postponeTimeMap.putIfAbsent(mutableByte.getValue(), Collections.emptyMap());
        }
        Vertex vertex = this.dag.newUniqueVertex("Stream-Stream Join", (ProcessorSupplier)new StreamToStreamJoinP.StreamToStreamJoinProcessorSupplier(joinInfo, leftExtractors, rightExtractors, postponeTimeMap, rel.getLeft().getRowType().getFieldCount(), rel.getRight().getRowType().getFieldCount()));
        this.connectStreamToStreamJoinInput(joinInfo, rel.getLeft(), rel.getRight(), vertex);
        return vertex;
    }

    @Override
    public Vertex onUnion(UnionPhysicalRel rel) {
        if (!rel.all) {
            throw new RuntimeException("Union[all=false] rel should never be produced");
        }
        Vertex merger = this.dag.newUniqueVertex("UnionMerger", ProcessorSupplier.of((SupplierEx)Processors.mapP((FunctionEx)FunctionEx.identity())));
        int ordinal = 0;
        for (RelNode input : rel.getInputs()) {
            Vertex inputVertex = ((PhysicalRel)input).accept(this);
            Edge edge = Edge.from((Vertex)inputVertex).to(merger, ordinal++);
            this.dag.edge(edge);
        }
        return merger;
    }

    @Override
    public Vertex onLimit(LimitPhysicalRel rel) {
        throw QueryException.error((String)"FETCH/OFFSET is only supported for the top-level SELECT");
    }

    @Override
    public Vertex onRoot(RootRel rootRel) {
        this.watermarkThrottlingFrameSize = WatermarkThrottlingFrameSizeCalculator.calculate((PhysicalRel)rootRel.getInput(), MOCK_EEC);
        RelNode input = rootRel.getInput();
        Expression<?> fetch = ConstantExpression.create(Long.MAX_VALUE, QueryDataType.BIGINT);
        Expression<?> offset = ConstantExpression.create(0L, QueryDataType.BIGINT);
        if (input instanceof LimitPhysicalRel) {
            LimitPhysicalRel limit = (LimitPhysicalRel)input;
            if (limit.fetch() != null) {
                fetch = limit.fetch(this.dagBuildContext.getParameterMetadata());
            }
            if (limit.offset() != null) {
                offset = limit.offset(this.dagBuildContext.getParameterMetadata());
            }
            input = limit.getInput();
        }
        Vertex vertex = this.dag.newUniqueVertex("ClientSink", RootResultConsumerSink.rootResultConsumerSink(this.localMemberAddress, fetch, offset));
        this.connectInput(input, vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne());
        return vertex;
    }

    public void optimizeFinishedDag() {
        CreateTopLevelDagVisitor.decreaseParallelism(this.dag, this.nodeEngine.getConfig().getJetConfig().getCooperativeThreadCount());
    }

    static void decreaseParallelism(DAG dag, int defaultParallelism) {
        if (defaultParallelism == 1) {
            return;
        }
        HashSet<Vertex> verticesToChangeParallelism = new HashSet<Vertex>();
        for (Vertex vertex2 : dag) {
            for (Edge edge : dag.getInboundEdges(vertex2.getName())) {
                if (!CreateTopLevelDagVisitor.shouldChangeLocalParallelism(edge) || !edge.isLocal()) continue;
                verticesToChangeParallelism.add(edge.getSource());
                verticesToChangeParallelism.add(edge.getDestination());
                edge.isolated();
            }
        }
        int newParallelism = (int)Math.max(2.0, Math.sqrt(defaultParallelism));
        verticesToChangeParallelism.forEach(vertex -> {
            if (vertex.getMetaSupplier().preferredLocalParallelism() == -1) {
                vertex.localParallelism(newParallelism);
            }
        });
    }

    private static boolean shouldChangeLocalParallelism(Edge edge) {
        if (edge.getDestination() == null) {
            return false;
        }
        return edge.getSource().getLocalParallelism() == -1 && edge.getDestination().getLocalParallelism() == -1;
    }

    public Set<PlanObjectKey> getObjectKeys() {
        return this.objectKeys;
    }

    private Vertex connectInput(RelNode inputRel, Vertex thisVertex, @Nullable Consumer<Edge> configureEdgeFn) {
        Vertex inputVertex = ((PhysicalRel)inputRel).accept(this);
        Edge edge = Edge.between((Vertex)inputVertex, (Vertex)thisVertex);
        if (configureEdgeFn != null) {
            configureEdgeFn.accept(edge);
        }
        this.dag.edge(edge);
        return inputVertex;
    }

    private void connectJoinInput(JetJoinInfo joinInfo, RelNode leftInputRel, RelNode rightInputRel, Vertex joinVertex) {
        Vertex leftInput = ((PhysicalRel)leftInputRel).accept(this);
        Vertex rightInput = ((PhysicalRel)rightInputRel).accept(this);
        Edge left = Edge.between((Vertex)leftInput, (Vertex)joinVertex).priority(10).broadcast().distributed();
        Edge right = Edge.from((Vertex)rightInput).to(joinVertex, 1).priority(1).unicast().local();
        if (joinInfo.isLeftOuter()) {
            left = left.unicast().local();
            right = right.broadcast().distributed();
        }
        if (joinInfo.isEquiJoin()) {
            left = left.distributed().partitioned(ObjectArrayKey.projectFn(joinInfo.leftEquiJoinIndices()));
            right = right.distributed().partitioned(ObjectArrayKey.projectFn(joinInfo.rightEquiJoinIndices()));
        }
        this.dag.edge(left);
        this.dag.edge(right);
    }

    private void connectStreamToStreamJoinInput(JetJoinInfo joinInfo, RelNode leftInputRel, RelNode rightInputRel, Vertex joinVertex) {
        Vertex leftInput = ((PhysicalRel)leftInputRel).accept(this);
        Vertex rightInput = ((PhysicalRel)rightInputRel).accept(this);
        Edge left = Edge.from((Vertex)leftInput).to(joinVertex, 0);
        Edge right = Edge.from((Vertex)rightInput).to(joinVertex, 1);
        if (joinInfo.isEquiJoin()) {
            left = left.distributed().partitioned(ObjectArrayKey.projectFn(joinInfo.leftEquiJoinIndices()));
            right = right.distributed().partitioned(ObjectArrayKey.projectFn(joinInfo.rightEquiJoinIndices()));
        } else if (joinInfo.isRightOuter()) {
            left = left.distributed().broadcast();
            right = right.unicast().local();
        } else {
            left = left.unicast().local();
            right = right.distributed().broadcast();
        }
        this.dag.edge(left);
        this.dag.edge(right);
    }

    private void connectInputPreserveCollation(SingleRel rel, Vertex vertex) {
        boolean preserveCollation = !rel.getTraitSet().getCollation().getFieldCollations().isEmpty();
        Vertex inputVertex = this.connectInput(rel.getInput(), vertex, preserveCollation ? Edge::isolated : null);
        if (preserveCollation) {
            int cooperativeThreadCount = this.nodeEngine.getConfig().getJetConfig().getCooperativeThreadCount();
            int explicitLP = inputVertex.determineLocalParallelism(cooperativeThreadCount);
            inputVertex.determineLocalParallelism(explicitLP);
            vertex.localParallelism(explicitLP);
        }
    }

    private void collectObjectKeys(Table table) {
        PlanObjectKey objectKey = table.getObjectKey();
        if (objectKey != null) {
            this.objectKeys.add(objectKey);
        }
    }
}

