/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testframe.testsuites;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.math3.util.Precision;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
import org.apache.flink.connector.testframe.source.FromElementsSource;
import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
import org.apache.flink.connector.testframe.utils.MetricQuerier;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.opentest4j.TestAbortedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={ConnectorTestingExtension.class, TestLoggerExtension.class, TestCaseInvocationContextProvider.class})
@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Experimental
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);

    @TestTemplate
    @DisplayName(value="Test data stream sink")
    public void testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        TestingSinkSettings sinkSettings = this.getTestingSinkSettings(semantic);
        List<T> testRecords = this.generateTestData(sinkSettings, externalContext);
        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build());
        execEnv.enableCheckpointing(50L);
        SingleOutputStreamOperator dataStream = execEnv.fromData(testRecords).name("sourceInSinkTest").setParallelism(1).returns(externalContext.getProducedType());
        this.tryCreateSink((DataStream<T>)dataStream, externalContext, sinkSettings).setParallelism(1).name("sinkInSinkTest");
        JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.FINISHED));
        this.checkResultWithSemantic(externalContext.createSinkDataReader(sinkSettings), testRecords, semantic);
    }

    @TestTemplate
    @DisplayName(value="Test sink restarting from a savepoint")
    public void testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        this.restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
    }

    @TestTemplate
    @DisplayName(value="Test sink restarting with a higher parallelism")
    public void testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        this.restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
    }

    @TestTemplate
    @DisplayName(value="Test sink restarting with a lower parallelism")
    public void testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        this.restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic, int beforeParallelism, int afterParallelism) throws Exception {
        String savepointPath;
        TestingSinkSettings sinkSettings = this.getTestingSinkSettings(semantic);
        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build());
        RestartStrategyUtils.configureNoRestartStrategy((StreamExecutionEnvironment)execEnv);
        List<T> testRecords = this.generateTestData(sinkSettings, externalContext);
        int numBeforeSuccess = testRecords.size() / 2;
        DataStreamSource source = execEnv.fromSource(new FromElementsSource<T>(Boundedness.CONTINUOUS_UNBOUNDED, testRecords, numBeforeSuccess), WatermarkStrategy.noWatermarks(), "beforeRestartSource").setParallelism(1);
        SingleOutputStreamOperator dataStream = source.returns(externalContext.getProducedType());
        this.tryCreateSink((DataStream<T>)dataStream, externalContext, sinkSettings).name("Sink restart test").setParallelism(beforeParallelism);
        CollectResultIterator<T> iterator = this.addCollectSink((DataStream<T>)source);
        JobClient jobClient = execEnv.executeAsync("Restart Test");
        iterator.setJobClient(jobClient);
        ExecutorService executorService = Executors.newCachedThreadPool();
        try {
            CommonTestUtils.waitForAllTaskRunning(() -> MetricQuerier.getJobDetails(new RestClient(new Configuration(), (Executor)executorService), testEnv.getRestEndpoint(), jobClient.getJobID()));
            this.waitExpectedSizeData(iterator, numBeforeSuccess);
            savepointPath = (String)jobClient.stopWithSavepoint(false, testEnv.getCheckpointUri(), SavepointFormatType.CANONICAL).get(30L, TimeUnit.SECONDS);
            CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.FINISHED));
        }
        catch (Exception e) {
            executorService.shutdown();
            this.killJob(jobClient);
            throw e;
        }
        List<T> target = testRecords.subList(0, numBeforeSuccess);
        this.checkResultWithSemantic(externalContext.createSinkDataReader(sinkSettings), target, semantic);
        StreamExecutionEnvironment restartEnv = testEnv.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).setSavepointRestorePath(savepointPath).build());
        restartEnv.enableCheckpointing(50L);
        DataStreamSource restartSource = restartEnv.fromSource(new FromElementsSource<T>(Boundedness.CONTINUOUS_UNBOUNDED, testRecords, testRecords.size()), WatermarkStrategy.noWatermarks(), "restartSource").setParallelism(1);
        SingleOutputStreamOperator sinkStream = restartSource.returns(externalContext.getProducedType());
        this.tryCreateSink((DataStream<T>)sinkStream, externalContext, sinkSettings).setParallelism(afterParallelism);
        this.addCollectSink((DataStream<T>)restartSource);
        JobClient restartJobClient = restartEnv.executeAsync("Restart Test");
        try {
            this.checkResultWithSemantic(externalContext.createSinkDataReader(sinkSettings), testRecords, semantic);
        }
        finally {
            executorService.shutdown();
            this.killJob(restartJobClient);
            iterator.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    @DisplayName(value="Test sink metrics")
    public void testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        TestingSinkSettings sinkSettings = this.getTestingSinkSettings(semantic);
        int parallelism = 1;
        List<T> testRecords = this.generateTestData(sinkSettings, externalContext);
        String sinkName = "metricTestSink" + testRecords.hashCode();
        StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build());
        env.enableCheckpointing(50L);
        DataStreamSource source = env.fromSource(new FromElementsSource<T>(Boundedness.CONTINUOUS_UNBOUNDED, testRecords, testRecords.size()), WatermarkStrategy.noWatermarks(), "metricTestSource").setParallelism(1);
        SingleOutputStreamOperator dataStream = source.returns(externalContext.getProducedType());
        this.tryCreateSink((DataStream<T>)dataStream, externalContext, sinkSettings).name(sinkName).setParallelism(parallelism);
        JobClient jobClient = env.executeAsync("Metrics Test");
        MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
        ExecutorService executorService = Executors.newCachedThreadPool();
        try {
            CommonTestUtils.waitForAllTaskRunning(() -> MetricQuerier.getJobDetails(new RestClient(new Configuration(), (Executor)executorService), testEnv.getRestEndpoint(), jobClient.getJobID()));
            CommonTestUtils.waitUntilCondition(() -> {
                try {
                    return this.compareSinkMetrics(queryRestClient, testEnv, externalContext, jobClient.getJobID(), sinkName, "numRecordsSend", testRecords.size());
                }
                catch (Exception e) {
                    return false;
                }
            });
        }
        finally {
            executorService.shutdown();
            this.killJob(jobClient);
        }
    }

    protected List<T> generateTestData(TestingSinkSettings testingSinkSettings, DataStreamSinkExternalContext<T> externalContext) {
        return externalContext.generateTestData(testingSinkSettings, ThreadLocalRandom.current().nextLong());
    }

    private List<T> pollAndAppendResultData(List<T> result, ExternalSystemDataReader<T> reader, List<T> expected, int retryTimes, CheckpointingMode semantic) {
        long timeoutMs = 1000L;
        int retryIndex = 0;
        while (retryIndex++ < retryTimes && !this.checkGetEnoughRecordsWithSemantic(expected, result, semantic)) {
            result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
        }
        return result;
    }

    private boolean checkGetEnoughRecordsWithSemantic(List<T> expected, List<T> result, CheckpointingMode semantic) {
        Preconditions.checkNotNull(expected);
        Preconditions.checkNotNull(result);
        if (CheckpointingMode.EXACTLY_ONCE.equals((Object)semantic)) {
            return expected.size() <= result.size();
        }
        if (CheckpointingMode.AT_LEAST_ONCE.equals((Object)semantic)) {
            HashSet<Integer> matchedIndex = new HashSet<Integer>();
            for (Comparable record : expected) {
                int before = matchedIndex.size();
                for (int i = 0; i < result.size(); ++i) {
                    if (matchedIndex.contains(i) || !record.equals(result.get(i))) continue;
                    matchedIndex.add(i);
                    break;
                }
                if (before != matchedIndex.size()) continue;
                return false;
            }
            return true;
        }
        throw new IllegalStateException(String.format("%s delivery guarantee doesn't support test.", semantic.name()));
    }

    protected void checkResultWithSemantic(ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic) throws Exception {
        ArrayList result = new ArrayList();
        CommonTestUtils.waitUntilCondition(() -> {
            this.pollAndAppendResultData(result, reader, testData, 30, semantic);
            try {
                CollectIteratorAssertions.assertThat(this.sort(result).iterator()).matchesRecordsFromSource(Arrays.asList(this.sort(testData)), semantic);
                return true;
            }
            catch (Throwable t) {
                return false;
            }
        });
    }

    @Deprecated
    protected void checkResultWithSemantic(ExternalSystemDataReader<T> reader, List<T> testData, org.apache.flink.streaming.api.CheckpointingMode semantic) throws Exception {
        this.checkResultWithSemantic(reader, testData, org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode((org.apache.flink.streaming.api.CheckpointingMode)semantic));
    }

    private boolean compareSinkMetrics(MetricQuerier metricQuerier, TestEnvironment testEnv, DataStreamSinkExternalContext<T> context, JobID jobId, String sinkName, String metricsName, long expectedSize) throws Exception {
        double sumNumRecordsOut = metricQuerier.getAggregatedMetricsByRestAPI(testEnv.getRestEndpoint(), jobId, sinkName, metricsName, this.getSinkMetricFilter(context));
        if (Precision.equals((double)expectedSize, (double)sumNumRecordsOut)) {
            return true;
        }
        LOG.info("expected:<{}> but was <{}>({})", new Object[]{expectedSize, sumNumRecordsOut, metricsName});
        return false;
    }

    private List<T> sort(List<T> list) {
        return list.stream().sorted().collect(Collectors.toList());
    }

    private TestingSinkSettings getTestingSinkSettings(CheckpointingMode checkpointingMode) {
        return TestingSinkSettings.builder().setCheckpointingMode(checkpointingMode).build();
    }

    private void killJob(JobClient jobClient) throws Exception {
        CommonTestUtils.terminateJob((JobClient)jobClient);
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.CANCELED));
    }

    private DataStreamSink<T> tryCreateSink(DataStream<T> dataStream, DataStreamSinkExternalContext<T> context, TestingSinkSettings sinkSettings) {
        try {
            if (context instanceof DataStreamSinkV2ExternalContext) {
                Sink sinkV2 = ((DataStreamSinkV2ExternalContext)context).createSink(sinkSettings);
                return dataStream.sinkTo(sinkV2);
            }
            throw new IllegalArgumentException(String.format("The supported context are DataStreamSinkV1ExternalContext and DataStreamSinkV2ExternalContext, but actual is %s.", context.getClass()));
        }
        catch (UnsupportedOperationException e) {
            throw new TestAbortedException("Cannot create a sink satisfying given options.", (Throwable)e);
        }
    }

    private String getSinkMetricFilter(DataStreamSinkExternalContext<T> context) {
        if (context instanceof DataStreamSinkV2ExternalContext) {
            return "Writer";
        }
        throw new IllegalArgumentException(String.format("Get unexpected sink context: %s", context.getClass()));
    }

    protected CollectResultIterator<T> addCollectSink(DataStream<T> stream) {
        TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionConfig().getSerializerConfig());
        String accumulatorName = "dataStreamCollect_" + String.valueOf(UUID.randomUUID());
        CollectSinkOperatorFactory factory = new CollectSinkOperatorFactory(serializer, accumulatorName);
        CollectStreamSink sink = new CollectStreamSink(stream, factory);
        String operatorUid = "dataStreamCollect";
        sink.name("Data stream collect sink");
        sink.uid(operatorUid);
        stream.getExecutionEnvironment().addOperator(sink.getTransformation());
        return new CollectResultIterator(operatorUid, serializer, accumulatorName, stream.getExecutionEnvironment().getCheckpointConfig(), ((Duration)RpcOptions.ASK_TIMEOUT_DURATION.defaultValue()).toMillis());
    }

    private void waitExpectedSizeData(CollectResultIterator<T> iterator, int targetNum) {
        FlinkAssertions.assertThatFuture(CompletableFuture.supplyAsync(() -> {
            int count;
            for (count = 0; count < targetNum && iterator.hasNext(); ++count) {
                iterator.next();
            }
            if (count < targetNum) {
                throw new IllegalStateException(String.format("Fail to get %d records.", targetNum));
            }
            return true;
        })).eventuallySucceeds();
    }
}

