/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class InputFormatSourceFunctionTest {
    InputFormatSourceFunctionTest() {
    }

    @Test
    void testNormalOp() throws Exception {
        this.testFormatLifecycle(false);
    }

    @Test
    void testCancelation() throws Exception {
        this.testFormatLifecycle(true);
    }

    private void testFormatLifecycle(boolean midCancel) throws Exception {
        int noOfSplits = 5;
        int cancelAt = 2;
        LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
        InputFormatSourceFunction reader = new InputFormatSourceFunction((InputFormat)format, TypeInformation.of(Integer.class));
        try (MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("no").setManagedMemorySize(131072L).build();){
            reader.setRuntimeContext((RuntimeContext)new MockRuntimeContext(format, 5, environment));
            Assertions.assertThat((boolean)format.isConfigured).isFalse();
            Assertions.assertThat((boolean)format.isInputFormatOpen).isFalse();
            Assertions.assertThat((boolean)format.isSplitOpen).isFalse();
            reader.open(DefaultOpenContext.INSTANCE);
            Assertions.assertThat((boolean)format.isConfigured).isTrue();
            TestSourceContext ctx = new TestSourceContext((InputFormatSourceFunction<Integer>)reader, format, midCancel, 2);
            reader.run((SourceFunction.SourceContext)ctx);
            int splitsSeen = ctx.getSplitsSeen();
            Assertions.assertThat((boolean)(midCancel ? splitsSeen == 2 : splitsSeen == 5)).isTrue();
            Assertions.assertThat((boolean)format.isSplitOpen).isFalse();
            Assertions.assertThat((boolean)format.isInputFormatOpen).isFalse();
        }
    }

    private static class MockRuntimeContext
    extends StreamingRuntimeContext {
        private final int noOfSplits;
        private int nextSplit = 0;
        private final LifeCycleTestInputFormat format;
        private InputSplit[] inputSplits;

        private MockRuntimeContext(LifeCycleTestInputFormat format, int noOfSplits, Environment environment) {
            super((AbstractStreamOperator)new MockStreamOperator(), environment, Collections.emptyMap());
            this.noOfSplits = noOfSplits;
            this.format = format;
        }

        public OperatorMetricGroup getMetricGroup() {
            return UnregisteredMetricsGroup.createOperatorMetricGroup();
        }

        public InputSplitProvider getInputSplitProvider() {
            try {
                this.inputSplits = this.format.createInputSplits(this.noOfSplits);
                Assertions.assertThat((Object[])this.inputSplits).hasSize(this.noOfSplits);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            return new InputSplitProvider(){

                public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
                    if (nextSplit < inputSplits.length) {
                        return inputSplits[nextSplit++];
                    }
                    return null;
                }
            };
        }

        private static class MockStreamOperator
        extends AbstractStreamOperator<Integer> {
            private static final long serialVersionUID = -1153976702711944427L;

            private MockStreamOperator() {
            }

            public ExecutionConfig getExecutionConfig() {
                return new ExecutionConfig();
            }

            public OperatorID getOperatorID() {
                return new OperatorID();
            }
        }
    }

    private static class TestSourceContext
    implements SourceFunction.SourceContext<Integer> {
        private final InputFormatSourceFunction<Integer> reader;
        private final LifeCycleTestInputFormat format;
        private final boolean shouldCancel;
        private final int cancelAt;
        int splitIdx = 0;

        private TestSourceContext(InputFormatSourceFunction<Integer> reader, LifeCycleTestInputFormat format, boolean shouldCancel, int cancelAt) {
            this.reader = reader;
            this.format = format;
            this.shouldCancel = shouldCancel;
            this.cancelAt = cancelAt;
        }

        public void collect(Integer element) {
            Assertions.assertThat((boolean)this.format.isInputFormatOpen).isTrue();
            Assertions.assertThat((int)this.splitIdx).isEqualTo((Object)element);
            if (this.shouldCancel && this.splitIdx == this.cancelAt) {
                this.reader.cancel();
            } else {
                ++this.splitIdx;
            }
        }

        public void collectWithTimestamp(Integer element, long timestamp) {
            throw new UnsupportedOperationException();
        }

        public void emitWatermark(Watermark mark) {
            throw new UnsupportedOperationException();
        }

        public void markAsTemporarilyIdle() {
            throw new UnsupportedOperationException();
        }

        public Object getCheckpointLock() {
            return null;
        }

        public void close() {
            throw new UnsupportedOperationException();
        }

        public int getSplitsSeen() {
            return this.splitIdx;
        }
    }

    private static class LifeCycleTestInputFormat
    extends RichInputFormat<Integer, InputSplit> {
        private static final long serialVersionUID = 7408902249499583273L;
        private boolean isConfigured = false;
        private boolean isInputFormatOpen = false;
        private boolean isSplitOpen = false;
        private boolean eos = false;
        private int splitCounter = 0;
        private int reachedEndCalls = 0;
        private int nextRecordCalls = 0;

        private LifeCycleTestInputFormat() {
        }

        public void openInputFormat() {
            Assertions.assertThat((boolean)this.isConfigured).isTrue();
            Assertions.assertThat((boolean)this.isInputFormatOpen).isFalse();
            Assertions.assertThat((boolean)this.isSplitOpen).isFalse();
            this.isInputFormatOpen = true;
        }

        public void closeInputFormat() {
            Assertions.assertThat((boolean)this.isSplitOpen).isFalse();
            this.isInputFormatOpen = false;
        }

        public void configure(Configuration parameters) {
            Assertions.assertThat((boolean)this.isConfigured).isFalse();
            this.isConfigured = true;
        }

        public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
            return null;
        }

        public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
            Assertions.assertThat((boolean)this.isConfigured).isTrue();
            InputSplit[] splits = new InputSplit[minNumSplits];
            int i = 0;
            while (i < minNumSplits) {
                final int idx = i++;
                splits[idx] = new InputSplit(){
                    private static final long serialVersionUID = -1480792932361908285L;

                    public int getSplitNumber() {
                        return idx;
                    }
                };
            }
            return splits;
        }

        public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
            return null;
        }

        public void open(InputSplit split) throws IOException {
            Assertions.assertThat((boolean)this.isInputFormatOpen).isTrue();
            Assertions.assertThat((boolean)this.isConfigured).isTrue();
            Assertions.assertThat((boolean)this.isSplitOpen).isFalse();
            this.isSplitOpen = true;
            this.eos = false;
        }

        public boolean reachedEnd() throws IOException {
            Assertions.assertThat((boolean)this.isInputFormatOpen).isTrue();
            Assertions.assertThat((boolean)this.isConfigured).isTrue();
            Assertions.assertThat((boolean)this.isSplitOpen).isTrue();
            if (!this.eos) {
                ++this.reachedEndCalls;
            }
            return this.eos;
        }

        public Integer nextRecord(Integer reuse) throws IOException {
            Assertions.assertThat((boolean)this.isInputFormatOpen).isTrue();
            Assertions.assertThat((boolean)this.isConfigured).isTrue();
            Assertions.assertThat((boolean)this.isSplitOpen).isTrue();
            Assertions.assertThat((int)(++this.nextRecordCalls)).isEqualTo(this.reachedEndCalls);
            this.eos = true;
            return this.splitCounter++;
        }

        public void close() throws IOException {
            this.isSplitOpen = false;
        }
    }
}

