/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.util.Preconditions;

public class SourceOperatorTestContext
implements AutoCloseable {
    public static final int SUBTASK_INDEX = 1;
    public static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
    private MockSourceReader mockSourceReader;
    private MockOperatorEventGateway mockGateway;
    private TestProcessingTimeService timeService;
    private SourceOperator<Integer, MockSourceSplit> operator;

    public SourceOperatorTestContext() throws Exception {
        this(false);
    }

    public SourceOperatorTestContext(boolean idle) throws Exception {
        this(idle, (WatermarkStrategy<Integer>)WatermarkStrategy.noWatermarks());
    }

    public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> watermarkStrategy) throws Exception {
        this(idle, false, watermarkStrategy, new MockOutput<Integer>(new ArrayList()), false);
    }

    public SourceOperatorTestContext(boolean idle, boolean usePerSplitOutputs, WatermarkStrategy<Integer> watermarkStrategy, Output<StreamRecord<Integer>> output, boolean supportsSplitReassignmentOnRecovery) throws Exception {
        this.mockSourceReader = new MockSourceReader(idle ? MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED : MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, idle, usePerSplitOutputs);
        this.mockGateway = new MockOperatorEventGateway();
        this.timeService = new TestProcessingTimeService();
        Environment env = this.getTestingEnvironment();
        this.operator = new TestingSourceOperator<Integer>((StreamOperatorParameters<Integer>)new StreamOperatorParameters((StreamTask)new SourceOperatorStreamTask(env), (StreamConfig)new MockStreamConfig(new Configuration(), 1), output, () -> this.timeService, null, null), (SourceReader<Integer, MockSourceSplit>)this.mockSourceReader, watermarkStrategy, (ProcessingTimeService)this.timeService, this.mockGateway, 1, 5, true, supportsSplitReassignmentOnRecovery);
        this.operator.initializeState((StreamTaskStateInitializer)new StreamTaskStateInitializerImpl(env, (StateBackend)new HashMapStateBackend()));
    }

    @Override
    public void close() throws Exception {
        this.operator.close();
        Preconditions.checkState((boolean)this.mockSourceReader.isClosed());
    }

    public TestProcessingTimeService getTimeService() {
        return this.timeService;
    }

    public SourceOperator<Integer, MockSourceSplit> getOperator() {
        return this.operator;
    }

    public MockOperatorEventGateway getGateway() {
        return this.mockGateway;
    }

    public MockSourceReader getSourceReader() {
        return this.mockSourceReader;
    }

    public StateInitializationContext createStateContext() throws Exception {
        return this.createStateContext(Collections.singletonList(MOCK_SPLIT));
    }

    public StateInitializationContext createStateContext(Collection<MockSourceSplit> initialSplits) throws Exception {
        ArrayList<byte[]> serializedSplits = new ArrayList<byte[]>();
        for (MockSourceSplit initialSplit : initialSplits) {
            serializedSplits.add(SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)new MockSourceSplitSerializer(), (Object)initialSplit));
        }
        OperatorStateStore operatorStateStore = this.createOperatorStateStore();
        StateInitializationContextImpl stateContext = new StateInitializationContextImpl(null, operatorStateStore, null, null, null);
        stateContext.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC).update(serializedSplits);
        return stateContext;
    }

    private OperatorStateStore createOperatorStateStore() throws Exception {
        MockEnvironment env = new MockEnvironmentBuilder().build();
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        return abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl((Environment)env, "test-operator", Collections.emptyList(), cancelStreamRegistry));
    }

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, new TestTaskStateManager());
    }
}

