/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.datastream.impl.stream;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import org.apache.flink.api.common.state.StateDeclaration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.BroadcastStream;
import org.apache.flink.datastream.api.stream.GlobalStream;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;
import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
import org.apache.flink.datastream.impl.operators.ProcessOperator;
import org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator;
import org.apache.flink.datastream.impl.operators.TwoOutputProcessOperator;
import org.apache.flink.datastream.impl.stream.AbstractDataStream;
import org.apache.flink.datastream.impl.stream.BroadcastStreamImpl;
import org.apache.flink.datastream.impl.stream.KeyedPartitionStreamImpl;
import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
import org.apache.flink.datastream.impl.utils.StreamUtils;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.OutputTag;

public class GlobalStreamImpl<T>
extends AbstractDataStream<T>
implements GlobalStream<T> {
    public GlobalStreamImpl(ExecutionEnvironmentImpl environment, Transformation<T> transformation) {
        super(environment, transformation);
    }

    public <OUT> GlobalStream.ProcessConfigurableAndGlobalStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction) {
        StreamUtils.validateStates(processFunction.usesStates(), new HashSet<StateDeclaration.RedistributionMode>(Collections.singletonList(StateDeclaration.RedistributionMode.IDENTICAL)));
        TypeInformation<OUT> outType = StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, this.getType());
        ProcessOperator<T, OUT> operator = new ProcessOperator<T, OUT>(processFunction);
        return StreamUtils.wrapWithConfigureHandle(this.transform("Global Process", outType, operator));
    }

    public <OUT1, OUT2> GlobalStream.TwoGlobalStreams<OUT1, OUT2> process(TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
        StreamUtils.validateStates(processFunction.usesStates(), new HashSet<StateDeclaration.RedistributionMode>(Collections.singletonList(StateDeclaration.RedistributionMode.IDENTICAL)));
        Tuple2<TypeInformation<OUT1>, TypeInformation<OUT2>> twoOutputType = StreamUtils.getOutputTypesForTwoOutputProcessFunction(processFunction, this.getType());
        TypeInformation firstOutputType = (TypeInformation)twoOutputType.f0;
        TypeInformation secondOutputType = (TypeInformation)twoOutputType.f1;
        OutputTag secondOutputTag = new OutputTag("Second-Output", secondOutputType);
        TwoOutputProcessOperator<T, OUT1, OUT2> operator = new TwoOutputProcessOperator<T, OUT1, OUT2>(processFunction, secondOutputTag);
        GlobalStreamImpl firstStream = this.transform("Two-Output-Operator", firstOutputType, operator);
        GlobalStreamImpl secondStream = new GlobalStreamImpl(this.environment, firstStream.getSideOutputTransform(secondOutputTag));
        return TwoGlobalStreamsImpl.of(firstStream, secondStream);
    }

    public <T_OTHER, OUT> GlobalStream.ProcessConfigurableAndGlobalStream<OUT> connectAndProcess(GlobalStream<T_OTHER> other, TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction) {
        StreamUtils.validateStates(processFunction.usesStates(), new HashSet<StateDeclaration.RedistributionMode>(Arrays.asList(StateDeclaration.RedistributionMode.NONE, StateDeclaration.RedistributionMode.IDENTICAL)));
        TypeInformation<OUT> outTypeInfo = StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction(processFunction, this.getType(), ((GlobalStreamImpl)other).getType());
        TwoInputNonBroadcastProcessOperator<T, T_OTHER, OUT> processOperator = new TwoInputNonBroadcastProcessOperator<T, T_OTHER, OUT>(processFunction);
        TwoInputTransformation<T, T_OTHER, OUT> outTransformation = StreamUtils.getTwoInputTransformation("Global-Global-TwoInput-Process", this, (GlobalStreamImpl)other, outTypeInfo, processOperator);
        outTransformation.setParallelism(1, true);
        this.environment.addOperator((Transformation<?>)outTransformation);
        return StreamUtils.wrapWithConfigureHandle(new GlobalStreamImpl<T>(this.environment, outTransformation));
    }

    public ProcessConfigurable<?> toSink(Sink<T> sink) {
        DataStreamV2SinkTransformation<T, T> sinkTransformation = StreamUtils.addSinkOperator(this, sink, this.getType());
        sinkTransformation.setParallelism(1, true);
        return StreamUtils.wrapWithConfigureHandle(new GlobalStreamImpl<T>(this.environment, sinkTransformation));
    }

    public <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector) {
        return new KeyedPartitionStreamImpl<K, T>(this, keySelector);
    }

    public NonKeyedPartitionStream<T> shuffle() {
        return new NonKeyedPartitionStreamImpl(this.environment, new PartitionTransformation(this.getTransformation(), (StreamPartitioner)new ShufflePartitioner()));
    }

    public BroadcastStream<T> broadcast() {
        return new BroadcastStreamImpl(this.environment, this.getTransformation());
    }

    private <R> GlobalStreamImpl<R> transform(String operatorName, TypeInformation<R> outputTypeInfo, OneInputStreamOperator<T, R> operator) {
        this.transformation.getOutputType();
        OneInputTransformation resultTransform = new OneInputTransformation(this.transformation, operatorName, (StreamOperatorFactory)SimpleUdfStreamOperatorFactory.of(operator), outputTypeInfo, 1, true);
        GlobalStreamImpl<T> returnStream = new GlobalStreamImpl<T>(this.environment, resultTransform);
        this.environment.addOperator((Transformation<?>)resultTransform);
        return returnStream;
    }

    private static class TwoGlobalStreamsImpl<OUT1, OUT2>
    implements GlobalStream.TwoGlobalStreams<OUT1, OUT2> {
        private final GlobalStreamImpl<OUT1> firstStream;
        private final GlobalStreamImpl<OUT2> secondStream;

        public static <OUT1, OUT2> TwoGlobalStreamsImpl<OUT1, OUT2> of(GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) {
            return new TwoGlobalStreamsImpl<OUT1, OUT2>(firstStream, secondStream);
        }

        private TwoGlobalStreamsImpl(GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) {
            this.firstStream = firstStream;
            this.secondStream = secondStream;
        }

        public GlobalStream.ProcessConfigurableAndGlobalStream<OUT1> getFirst() {
            return StreamUtils.wrapWithConfigureHandle(this.firstStream);
        }

        public GlobalStream.ProcessConfigurableAndGlobalStream<OUT2> getSecond() {
            return StreamUtils.wrapWithConfigureHandle(this.secondStream);
        }
    }
}

