/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class StreamOperatorChainingTest {
    private static List<String> sink1Results;
    private static List<String> sink2Results;
    private static List<String> sink3Results;

    @Test
    public void testMultiChainingWithObjectReuse() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        this.testMultiChaining(env);
    }

    @Test
    public void testMultiChainingWithoutObjectReuse() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableObjectReuse();
        this.testMultiChaining(env);
    }

    private void testMultiChaining(StreamExecutionEnvironment env) throws Exception {
        env.setParallelism(2);
        DataStreamSource input = env.fromData((Object[])new Integer[]{1, 2, 3});
        sink1Results = new ArrayList<String>();
        sink2Results = new ArrayList<String>();
        input = input.map((MapFunction & Serializable)value -> value);
        input.map((MapFunction & Serializable)value -> "First: " + value).addSink((SinkFunction)new SinkFunction<String>(){

            public void invoke(String value, SinkFunction.Context ctx) throws Exception {
                sink1Results.add(value);
            }
        });
        input.map((MapFunction & Serializable)value -> "Second: " + value).addSink((SinkFunction)new SinkFunction<String>(){

            public void invoke(String value, SinkFunction.Context ctx) throws Exception {
                sink2Results.add(value);
            }
        });
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        Assert.assertTrue((jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2 ? 1 : 0) != 0);
        JobVertex chainedVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Configuration configuration = chainedVertex.getConfiguration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        StreamMap headOperator = (StreamMap)streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
        try (MockEnvironment environment = this.createMockEnvironment(chainedVertex.getName());){
            StreamTask mockTask = this.createMockTask(streamConfig, (Environment)environment);
            OperatorChain operatorChain = this.createOperatorChain(streamConfig, (Environment)environment, mockTask);
            headOperator.setup(mockTask, streamConfig, (Output)operatorChain.getMainOperatorOutput());
            operatorChain.initializeStateAndOpenOperators(null);
            headOperator.processElement(new StreamRecord((Object)1));
            headOperator.processElement(new StreamRecord((Object)2));
            headOperator.processElement(new StreamRecord((Object)3));
            MatcherAssert.assertThat(sink1Results, (Matcher)Matchers.contains((Object[])new String[]{"First: 1", "First: 2", "First: 3"}));
            MatcherAssert.assertThat(sink2Results, (Matcher)Matchers.contains((Object[])new String[]{"Second: 1", "Second: 2", "Second: 3"}));
        }
    }

    private MockEnvironment createMockEnvironment(String taskName) {
        return new MockEnvironmentBuilder().setTaskName(taskName).setManagedMemorySize(0x300000L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1024).build();
    }

    @Test
    public void testMultiChainingWithSplitWithObjectReuse() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        this.testMultiChainingWithSplit(env);
    }

    @Test
    public void testMultiChainingWithSplitWithoutObjectReuse() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableObjectReuse();
        this.testMultiChainingWithSplit(env);
    }

    private void testMultiChainingWithSplit(StreamExecutionEnvironment env) throws Exception {
        env.setParallelism(2);
        DataStreamSource input = env.fromData((Object[])new Integer[]{1, 2, 3});
        sink1Results = new ArrayList<String>();
        sink2Results = new ArrayList<String>();
        sink3Results = new ArrayList<String>();
        input = input.map((MapFunction & Serializable)value -> value);
        OutputTag<Integer> oneOutput = new OutputTag<Integer>("one"){};
        OutputTag<Integer> otherOutput = new OutputTag<Integer>("other"){};
        SingleOutputStreamOperator split = input.process((ProcessFunction)new ProcessFunction<Integer, Object>((OutputTag)oneOutput, (OutputTag)otherOutput){
            private static final long serialVersionUID = 1L;
            final /* synthetic */ OutputTag val$oneOutput;
            final /* synthetic */ OutputTag val$otherOutput;
            {
                this.val$oneOutput = outputTag;
                this.val$otherOutput = outputTag2;
            }

            public void processElement(Integer value, ProcessFunction.Context ctx, Collector<Object> out) throws Exception {
                if (value.equals(1)) {
                    ctx.output(this.val$oneOutput, (Object)value);
                } else {
                    ctx.output(this.val$otherOutput, (Object)value);
                }
            }
        });
        split.getSideOutput((OutputTag)oneOutput).map((MapFunction & Serializable)value -> "First 1: " + value).addSink((SinkFunction)new SinkFunction<String>(){

            public void invoke(String value, SinkFunction.Context ctx) throws Exception {
                sink1Results.add(value);
            }
        });
        split.getSideOutput((OutputTag)oneOutput).map((MapFunction & Serializable)value -> "First 2: " + value).addSink((SinkFunction)new SinkFunction<String>(){

            public void invoke(String value, SinkFunction.Context ctx) throws Exception {
                sink2Results.add(value);
            }
        });
        split.getSideOutput((OutputTag)otherOutput).map((MapFunction & Serializable)value -> "Second: " + value).addSink((SinkFunction)new SinkFunction<String>(){

            public void invoke(String value, SinkFunction.Context ctx) throws Exception {
                sink3Results.add(value);
            }
        });
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        Assert.assertTrue((jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2 ? 1 : 0) != 0);
        JobVertex chainedVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Configuration configuration = chainedVertex.getConfiguration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        StreamMap headOperator = (StreamMap)streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
        try (MockEnvironment environment = this.createMockEnvironment(chainedVertex.getName());){
            StreamTask mockTask = this.createMockTask(streamConfig, (Environment)environment);
            OperatorChain operatorChain = this.createOperatorChain(streamConfig, (Environment)environment, mockTask);
            headOperator.setup(mockTask, streamConfig, (Output)operatorChain.getMainOperatorOutput());
            operatorChain.initializeStateAndOpenOperators(null);
            headOperator.processElement(new StreamRecord((Object)1));
            headOperator.processElement(new StreamRecord((Object)2));
            headOperator.processElement(new StreamRecord((Object)3));
            MatcherAssert.assertThat(sink1Results, (Matcher)Matchers.contains((Object[])new String[]{"First 1: 1"}));
            MatcherAssert.assertThat(sink2Results, (Matcher)Matchers.contains((Object[])new String[]{"First 2: 1"}));
            MatcherAssert.assertThat(sink3Results, (Matcher)Matchers.contains((Object[])new String[]{"Second: 2", "Second: 3"}));
        }
    }

    private <IN, OT extends StreamOperator<IN>> OperatorChain<IN, OT> createOperatorChain(StreamConfig streamConfig, Environment environment, StreamTask<IN, OT> task) {
        return new TestOperatorChain<IN, OT>(task, StreamTask.createRecordWriterDelegate((StreamConfig)streamConfig, (Environment)environment));
    }

    private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(StreamConfig streamConfig, Environment environment) throws Exception {
        return new MockStreamTaskBuilder(environment).setConfig(streamConfig).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
    }

    private static class TestOperatorChain<IN, OUT extends StreamOperator<IN>>
    extends RegularOperatorChain<IN, OUT> {
        public TestOperatorChain(StreamTask<IN, OUT> task, RecordWriterDelegate<SerializationDelegate<StreamRecord<IN>>> recordWriterDelegate) {
            super(task, recordWriterDelegate);
        }

        public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
            for (StreamOperatorWrapper operatorWrapper : this.getAllOperators(true)) {
                StreamOperator operator = operatorWrapper.getStreamOperator();
                operator.open();
            }
        }
    }
}

