package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.class */
class InputFormatSourceFunctionTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest$LifeCycleTestInputFormat.class */
    public static class LifeCycleTestInputFormat extends RichInputFormat<Integer, InputSplit> {
        private static final long serialVersionUID = 7408902249499583273L;
        private boolean isConfigured;
        private boolean isInputFormatOpen;
        private boolean isSplitOpen;
        private boolean eos;
        private int splitCounter;
        private int reachedEndCalls;
        private int nextRecordCalls;

        private LifeCycleTestInputFormat() {
            this.isConfigured = false;
            this.isInputFormatOpen = false;
            this.isSplitOpen = false;
            this.eos = false;
            this.splitCounter = 0;
            this.reachedEndCalls = 0;
            this.nextRecordCalls = 0;
        }

        public void openInputFormat() {
            Assertions.assertThat(this.isConfigured).isTrue();
            Assertions.assertThat(this.isInputFormatOpen).isFalse();
            Assertions.assertThat(this.isSplitOpen).isFalse();
            this.isInputFormatOpen = true;
        }

        public void closeInputFormat() {
            Assertions.assertThat(this.isSplitOpen).isFalse();
            this.isInputFormatOpen = false;
        }

        public void configure(Configuration configuration) {
            Assertions.assertThat(this.isConfigured).isFalse();
            this.isConfigured = true;
        }

        public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
            return null;
        }

        public InputSplit[] createInputSplits(int i) throws IOException {
            Assertions.assertThat(this.isConfigured).isTrue();
            InputSplit[] inputSplitArr = new InputSplit[i];
            for (int i2 = 0; i2 < i; i2++) {
                final int i3 = i2;
                inputSplitArr[i3] = new InputSplit() { // from class: org.apache.flink.streaming.api.functions.source.InputFormatSourceFunctionTest.LifeCycleTestInputFormat.1
                    private static final long serialVersionUID = -1480792932361908285L;

                    public int getSplitNumber() {
                        return i3;
                    }
                };
            }
            return inputSplitArr;
        }

        public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplitArr) {
            return null;
        }

        public void open(InputSplit inputSplit) throws IOException {
            Assertions.assertThat(this.isInputFormatOpen).isTrue();
            Assertions.assertThat(this.isConfigured).isTrue();
            Assertions.assertThat(this.isSplitOpen).isFalse();
            this.isSplitOpen = true;
            this.eos = false;
        }

        public boolean reachedEnd() throws IOException {
            Assertions.assertThat(this.isInputFormatOpen).isTrue();
            Assertions.assertThat(this.isConfigured).isTrue();
            Assertions.assertThat(this.isSplitOpen).isTrue();
            if (!this.eos) {
                this.reachedEndCalls++;
            }
            return this.eos;
        }

        public Integer nextRecord(Integer num) throws IOException {
            Assertions.assertThat(this.isInputFormatOpen).isTrue();
            Assertions.assertThat(this.isConfigured).isTrue();
            Assertions.assertThat(this.isSplitOpen).isTrue();
            int i = this.nextRecordCalls + 1;
            this.nextRecordCalls = i;
            Assertions.assertThat(i).isEqualTo(this.reachedEndCalls);
            this.eos = true;
            int i2 = this.splitCounter;
            this.splitCounter = i2 + 1;
            return Integer.valueOf(i2);
        }

        public void close() throws IOException {
            this.isSplitOpen = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest$MockRuntimeContext.class */
    public static class MockRuntimeContext extends StreamingRuntimeContext {
        private final int noOfSplits;
        private int nextSplit;
        private final LifeCycleTestInputFormat format;
        private InputSplit[] inputSplits;

        /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest$MockRuntimeContext$MockStreamOperator.class */
        private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
            private static final long serialVersionUID = -1153976702711944427L;

            private MockStreamOperator() {
            }

            public ExecutionConfig getExecutionConfig() {
                return new ExecutionConfig();
            }

            public OperatorID getOperatorID() {
                return new OperatorID();
            }
        }

        private MockRuntimeContext(LifeCycleTestInputFormat lifeCycleTestInputFormat, int i, Environment environment) {
            super(new MockStreamOperator(), environment, Collections.emptyMap());
            this.nextSplit = 0;
            this.noOfSplits = i;
            this.format = lifeCycleTestInputFormat;
        }

        public OperatorMetricGroup getMetricGroup() {
            return UnregisteredMetricsGroup.createOperatorMetricGroup();
        }

        public InputSplitProvider getInputSplitProvider() {
            try {
                this.inputSplits = this.format.createInputSplits(this.noOfSplits);
                Assertions.assertThat(this.inputSplits).hasSize(this.noOfSplits);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return new InputSplitProvider() { // from class: org.apache.flink.streaming.api.functions.source.InputFormatSourceFunctionTest.MockRuntimeContext.1
                public InputSplit getNextInputSplit(ClassLoader classLoader) {
                    if (MockRuntimeContext.this.nextSplit < MockRuntimeContext.this.inputSplits.length) {
                        return MockRuntimeContext.this.inputSplits[MockRuntimeContext.access$708(MockRuntimeContext.this)];
                    }
                    return null;
                }
            };
        }

        static /* synthetic */ int access$708(MockRuntimeContext mockRuntimeContext) {
            int i = mockRuntimeContext.nextSplit;
            mockRuntimeContext.nextSplit = i + 1;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest$TestSourceContext.class */
    public static class TestSourceContext implements SourceFunction.SourceContext<Integer> {
        private final InputFormatSourceFunction<Integer> reader;
        private final LifeCycleTestInputFormat format;
        private final boolean shouldCancel;
        private final int cancelAt;
        int splitIdx;

        private TestSourceContext(InputFormatSourceFunction<Integer> inputFormatSourceFunction, LifeCycleTestInputFormat lifeCycleTestInputFormat, boolean z, int i) {
            this.splitIdx = 0;
            this.reader = inputFormatSourceFunction;
            this.format = lifeCycleTestInputFormat;
            this.shouldCancel = z;
            this.cancelAt = i;
        }

        public void collect(Integer num) {
            Assertions.assertThat(this.format.isInputFormatOpen).isTrue();
            Assertions.assertThat(this.splitIdx).isEqualTo(num);
            if (this.shouldCancel && this.splitIdx == this.cancelAt) {
                this.reader.cancel();
            } else {
                this.splitIdx++;
            }
        }

        public void collectWithTimestamp(Integer num, long j) {
            throw new UnsupportedOperationException();
        }

        public void emitWatermark(Watermark watermark) {
            throw new UnsupportedOperationException();
        }

        public void markAsTemporarilyIdle() {
            throw new UnsupportedOperationException();
        }

        public Object getCheckpointLock() {
            return null;
        }

        public void close() {
            throw new UnsupportedOperationException();
        }

        public int getSplitsSeen() {
            return this.splitIdx;
        }
    }

    InputFormatSourceFunctionTest() {
    }

    @Test
    void testNormalOp() throws Exception {
        testFormatLifecycle(false);
    }

    @Test
    void testCancelation() throws Exception {
        testFormatLifecycle(true);
    }

    private void testFormatLifecycle(boolean z) throws Exception {
        LifeCycleTestInputFormat lifeCycleTestInputFormat = new LifeCycleTestInputFormat();
        InputFormatSourceFunction inputFormatSourceFunction = new InputFormatSourceFunction(lifeCycleTestInputFormat, TypeInformation.of(Integer.class));
        MockEnvironment build = new MockEnvironmentBuilder().setTaskName("no").setManagedMemorySize(131072L).build();
        Throwable th = null;
        try {
            try {
                inputFormatSourceFunction.setRuntimeContext(new MockRuntimeContext(lifeCycleTestInputFormat, 5, build));
                Assertions.assertThat(lifeCycleTestInputFormat.isConfigured).isFalse();
                Assertions.assertThat(lifeCycleTestInputFormat.isInputFormatOpen).isFalse();
                Assertions.assertThat(lifeCycleTestInputFormat.isSplitOpen).isFalse();
                inputFormatSourceFunction.open(DefaultOpenContext.INSTANCE);
                Assertions.assertThat(lifeCycleTestInputFormat.isConfigured).isTrue();
                TestSourceContext testSourceContext = new TestSourceContext(inputFormatSourceFunction, lifeCycleTestInputFormat, z, 2);
                inputFormatSourceFunction.run(testSourceContext);
                int splitsSeen = testSourceContext.getSplitsSeen();
                Assertions.assertThat(z ? splitsSeen == 2 : splitsSeen == 5).isTrue();
                Assertions.assertThat(lifeCycleTestInputFormat.isSplitOpen).isFalse();
                Assertions.assertThat(lifeCycleTestInputFormat.isInputFormatOpen).isFalse();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }
}
