/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

@Internal
public class CacheTransformationTranslator<OUT, T extends CacheTransformation<OUT>>
extends SimpleTransformationTranslator<OUT, T> {
    public static final String CACHE_CONSUMER_OPERATOR_NAME = "CacheRead";
    public static final String CACHE_PRODUCER_OPERATOR_NAME = "CacheWrite";

    @Override
    protected Collection<Integer> translateForBatchInternal(T transformation, TransformationTranslator.Context context) {
        if (!((CacheTransformation)transformation).isCached()) {
            List<Transformation<?>> inputs = ((CacheTransformation)transformation).getInputs();
            Preconditions.checkState(inputs.size() == 1, "There could be only one transformation input to cache");
            Transformation<?> input = inputs.get(0);
            if (input instanceof PhysicalTransformation) {
                return this.physicalTransformationProduceCache(transformation, context, input);
            }
            if (input instanceof SideOutputTransformation) {
                return this.sideOutputTransformationProduceCache(transformation, context, (SideOutputTransformation)input);
            }
            throw new RuntimeException(String.format("Unsupported transformation %s", input.getClass()));
        }
        return this.consumeCache(transformation, context);
    }

    @Override
    protected Collection<Integer> translateForStreamingInternal(T transformation, TransformationTranslator.Context context) {
        if (((CacheTransformation)transformation).isCached()) {
            return this.consumeCache(transformation, context);
        }
        List<Transformation<?>> inputs = ((CacheTransformation)transformation).getInputs();
        Preconditions.checkState(inputs.size() == 1, "There could be only one transformation input to cache");
        return context.getStreamNodeIds(inputs.get(0));
    }

    private Collection<Integer> sideOutputTransformationProduceCache(T transformation, TransformationTranslator.Context context, SideOutputTransformation<?> input) {
        StreamGraph streamGraph = context.getStreamGraph();
        Transformation<?> physicalTransformation = input.getInputs().get(0);
        Collection<Integer> cacheNodeIds = context.getStreamNodeIds(physicalTransformation);
        Preconditions.checkState(cacheNodeIds.size() == 1, "We expect only one stream node for the input transform");
        Integer cacheNodeId = cacheNodeIds.iterator().next();
        this.addCacheProduceNode(streamGraph, transformation, context, physicalTransformation);
        int virtualId = Transformation.getNewNodeId();
        streamGraph.addVirtualSideOutputNode(cacheNodeId, virtualId, input.getOutputTag());
        streamGraph.addEdge(virtualId, ((Transformation)transformation).getId(), 0, new IntermediateDataSetID(((CacheTransformation)transformation).getDatasetId()));
        return Collections.singletonList(virtualId);
    }

    private List<Integer> physicalTransformationProduceCache(T transformation, TransformationTranslator.Context context, Transformation<?> input) {
        StreamGraph streamGraph = context.getStreamGraph();
        Collection<Integer> cachedNodeIds = context.getStreamNodeIds(input);
        Preconditions.checkState(cachedNodeIds.size() == 1, "We expect only one stream node for the input transform");
        Integer cacheNodeId = cachedNodeIds.iterator().next();
        this.addCacheProduceNode(streamGraph, transformation, context, input);
        streamGraph.addEdge(cacheNodeId, ((Transformation)transformation).getId(), 0, new IntermediateDataSetID(((CacheTransformation)transformation).getDatasetId()));
        return Collections.singletonList(cacheNodeId);
    }

    private void addCacheProduceNode(StreamGraph streamGraph, T cacheTransformation, TransformationTranslator.Context context, Transformation<?> input) {
        SimpleOperatorFactory operatorFactory = SimpleOperatorFactory.of(new NoOpStreamOperator());
        operatorFactory.setChainingStrategy(ChainingStrategy.HEAD);
        streamGraph.addOperator(((Transformation)cacheTransformation).getId(), context.getSlotSharingGroup(), ((Transformation)cacheTransformation).getCoLocationGroupKey(), operatorFactory, ((CacheTransformation)cacheTransformation).getInputs().get(0).getOutputType(), null, CACHE_PRODUCER_OPERATOR_NAME);
        streamGraph.setParallelism(((Transformation)cacheTransformation).getId(), input.getParallelism(), input.isParallelismConfigured());
        streamGraph.setMaxParallelism(((Transformation)cacheTransformation).getId(), input.getMaxParallelism());
    }

    private List<Integer> consumeCache(T transformation, TransformationTranslator.Context context) {
        StreamGraph streamGraph = context.getStreamGraph();
        SimpleOperatorFactory operatorFactory = SimpleOperatorFactory.of(new IdentityStreamOperator());
        TypeInformation outputType = ((CacheTransformation)transformation).getTransformationToCache().getOutputType();
        streamGraph.addLegacySource(((Transformation)transformation).getId(), context.getSlotSharingGroup(), ((Transformation)transformation).getCoLocationGroupKey(), operatorFactory, outputType, outputType, CACHE_CONSUMER_OPERATOR_NAME);
        streamGraph.setParallelism(((Transformation)transformation).getId(), ((CacheTransformation)transformation).getTransformationToCache().getParallelism(), ((Transformation)transformation).isParallelismConfigured());
        streamGraph.setMaxParallelism(((Transformation)transformation).getId(), ((CacheTransformation)transformation).getTransformationToCache().getMaxParallelism());
        StreamNode streamNode = streamGraph.getStreamNode(((Transformation)transformation).getId());
        streamNode.setConsumeClusterDatasetId(new IntermediateDataSetID(((CacheTransformation)transformation).getDatasetId()));
        return Collections.singletonList(((Transformation)transformation).getId());
    }

    public static class IdentityStreamOperator<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 4517845269225218313L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }
    }

    public static class NoOpStreamOperator<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 4517845269225218313L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
        }
    }
}

