package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.class */
class StreamSourceOperatorLatencyMetricsTest {
    private static final long maxProcessingTime = 100;
    private static final long latencyMarkInterval = 10;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest$OperatorSetupOperation.class */
    public interface OperatorSetupOperation {
        void setupSourceOperator(StreamSource<Long, ?> streamSource, TestProcessingTimeService testProcessingTimeService) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest$ProcessingTimeServiceSource.class */
    public static final class ProcessingTimeServiceSource implements SourceFunction<Long> {
        private final TestProcessingTimeService processingTimeService;
        private final List<Long> processingTimes;
        private boolean cancelled;

        private ProcessingTimeServiceSource(TestProcessingTimeService testProcessingTimeService, List<Long> list) {
            this.cancelled = false;
            this.processingTimeService = testProcessingTimeService;
            this.processingTimes = list;
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            for (Long l : this.processingTimes) {
                if (this.cancelled) {
                    return;
                } else {
                    this.processingTimeService.setCurrentTime(l.longValue());
                }
            }
        }

        public void cancel() {
            this.cancelled = true;
        }
    }

    StreamSourceOperatorLatencyMetricsTest() {
    }

    @Test
    void testLatencyMarkEmissionDisabled() throws Exception {
        testLatencyMarkEmission(0, (streamSource, testProcessingTimeService) -> {
            setupSourceOperator(streamSource, new ExecutionConfig(), MockEnvironment.builder().build(), testProcessingTimeService);
        });
    }

    @Test
    void testLatencyMarkEmissionEnabledViaExecutionConfig() throws Exception {
        testLatencyMarkEmission(11, (streamSource, testProcessingTimeService) -> {
            ExecutionConfig executionConfig = new ExecutionConfig();
            executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
            setupSourceOperator(streamSource, executionConfig, MockEnvironment.builder().build(), testProcessingTimeService);
        });
    }

    @Test
    void testLatencyMarkEmissionEnabledViaFlinkConfig() throws Exception {
        testLatencyMarkEmission(11, (streamSource, testProcessingTimeService) -> {
            Configuration configuration = new Configuration();
            configuration.set(MetricOptions.LATENCY_INTERVAL, Long.valueOf(latencyMarkInterval));
            setupSourceOperator(streamSource, new ExecutionConfig(), MockEnvironment.builder().setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(configuration)).build(), testProcessingTimeService);
        });
    }

    @Test
    void testLatencyMarkEmissionEnabledOverrideViaExecutionConfig() throws Exception {
        testLatencyMarkEmission(11, (streamSource, testProcessingTimeService) -> {
            ExecutionConfig executionConfig = new ExecutionConfig();
            executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
            Configuration configuration = new Configuration();
            configuration.set(MetricOptions.LATENCY_INTERVAL, 0L);
            setupSourceOperator(streamSource, executionConfig, MockEnvironment.builder().setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(configuration)).build(), testProcessingTimeService);
        });
    }

    @Test
    void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exception {
        testLatencyMarkEmission(0, (streamSource, testProcessingTimeService) -> {
            Configuration configuration = new Configuration();
            configuration.set(MetricOptions.LATENCY_INTERVAL, Long.valueOf(latencyMarkInterval));
            MockEnvironment build = MockEnvironment.builder().setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(configuration)).build();
            ExecutionConfig executionConfig = new ExecutionConfig();
            executionConfig.setLatencyTrackingInterval(0L);
            setupSourceOperator(streamSource, executionConfig, build, testProcessingTimeService);
        });
    }

    private void testLatencyMarkEmission(int i, OperatorSetupOperation operatorSetupOperation) throws Exception {
        ArrayList arrayList = new ArrayList();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        List asList = Arrays.asList(1L, Long.valueOf(latencyMarkInterval), 11L, 21L, Long.valueOf(maxProcessingTime));
        StreamSource<Long, ?> streamSource = new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, asList));
        operatorSetupOperation.setupSourceOperator(streamSource, testProcessingTimeService);
        RegularOperatorChain regularOperatorChain = new RegularOperatorChain(streamSource.getContainingTask(), StreamTask.createRecordWriterDelegate(streamSource.getOperatorConfig(), new MockEnvironmentBuilder().build()));
        try {
            streamSource.run(new Object(), new CollectorOutput(arrayList), regularOperatorChain);
            streamSource.finish();
            regularOperatorChain.close();
            Assertions.assertThat(arrayList).hasSize(i);
            long j = 0;
            int i2 = 0;
            for (int i3 = 0; i3 < i; i3++) {
                StreamElement streamElement = (StreamElement) arrayList.get(i3);
                Assertions.assertThat(streamElement.isLatencyMarker()).isTrue();
                Assertions.assertThat(streamElement.asLatencyMarker().getOperatorId()).isEqualTo(streamSource.getOperatorID());
                Assertions.assertThat(streamElement.asLatencyMarker().getSubtaskIndex()).isZero();
                while (j > ((Long) asList.get(i2)).longValue()) {
                    i2++;
                }
                Assertions.assertThat(streamElement.asLatencyMarker().getMarkedTime()).isEqualTo(asList.get(i2));
                j += latencyMarkInterval;
            }
        } catch (Throwable th) {
            regularOperatorChain.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void setupSourceOperator(StreamSource<T, ?> streamSource, ExecutionConfig executionConfig, Environment environment, TimerService timerService) throws Exception {
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStateBackend(new MemoryStateBackend());
        streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.serializeAllConfigs();
        MockStreamTask build = new MockStreamTaskBuilder(environment).setConfig(streamConfig).setExecutionConfig(executionConfig).setTimerService(timerService).build();
        streamSource.setProcessingTimeService(build.getProcessingTimeServiceFactory().createProcessingTimeService((MailboxExecutor) null));
        streamSource.setup(build, streamConfig, (Output) Mockito.mock(Output.class));
    }
}
