/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper;
import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CollectSinkFunctionRandomITCase {
    private static final int MAX_RESULTS_PER_BATCH = 3;
    private static final JobID TEST_JOB_ID = new JobID();
    private static final String UID = UUID.randomUUID().toString();
    private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
    private CollectSinkFunctionTestWrapper<Integer> functionWrapper;
    private boolean jobFinished;

    CollectSinkFunctionRandomITCase() {
    }

    @Test
    void testUncheckpointedFunction() throws Exception {
        for (int testCount = 30; testCount > 0; --testCount) {
            this.functionWrapper = new CollectSinkFunctionTestWrapper<Integer>(serializer, 12);
            this.jobFinished = false;
            ArrayList<Integer> expected = new ArrayList<Integer>();
            for (int i = 0; i < 50; ++i) {
                expected.add(i);
            }
            ThreadWithException feeder = new ThreadWithException(new UncheckpointedDataFeeder(expected));
            List<Integer> actual = this.runFunctionRandomTest(feeder);
            this.assertResultsEqualAfterSort(expected, actual);
            this.functionWrapper.closeWrapper();
        }
    }

    @Test
    void testCheckpointedFunction() throws Exception {
        for (int testCount = 30; testCount > 0; --testCount) {
            this.functionWrapper = new CollectSinkFunctionTestWrapper<Integer>(serializer, 12);
            this.jobFinished = false;
            ArrayList<Integer> expected = new ArrayList<Integer>();
            for (int i = 0; i < 50; ++i) {
                expected.add(i);
            }
            ThreadWithException feeder = new ThreadWithException(new CheckpointedDataFeeder(expected));
            List<Integer> actual = this.runFunctionRandomTest(feeder);
            this.assertResultsEqualAfterSort(expected, actual);
            this.functionWrapper.closeWrapper();
        }
    }

    private List<Integer> runFunctionRandomTest(Thread feeder) throws Exception {
        CollectClient collectClient = new CollectClient();
        ThreadWithException client = new ThreadWithException(collectClient);
        Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
            feeder.interrupt();
            client.interrupt();
            e.printStackTrace();
        };
        feeder.setUncaughtExceptionHandler(exceptionHandler);
        client.setUncaughtExceptionHandler(exceptionHandler);
        feeder.start();
        client.start();
        feeder.join();
        client.join();
        return collectClient.results;
    }

    private void assertResultsEqualAfterSort(List<Integer> expected, List<Integer> actual) {
        Collections.sort(expected);
        Collections.sort(actual);
        Assertions.assertThat(actual).isEqualTo(expected);
    }

    private static class ThreadWithException
    extends Thread {
        private final RunnableWithException runnable;

        private ThreadWithException(RunnableWithException runnable) {
            this.runnable = runnable;
        }

        @Override
        public void run() {
            try {
                this.runnable.run();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private class CollectClient
    implements RunnableWithException {
        private final List<Integer> results = new ArrayList<Integer>();
        private final CollectResultIterator<Integer> iterator = new CollectResultIterator((AbstractCollectResultBuffer)new CheckpointedCollectResultBuffer(serializer), UID, "tableCollectAccumulator", 0);

        private CollectClient() {
            TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider(){

                public boolean isJobFinished() {
                    return CollectSinkFunctionRandomITCase.this.jobFinished;
                }

                public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
                    HashMap<String, OptionalFailure<Object>> accumulatorResults = new HashMap<String, OptionalFailure<Object>>();
                    accumulatorResults.put("tableCollectAccumulator", OptionalFailure.of(CollectSinkFunctionRandomITCase.this.functionWrapper.getAccumulatorLocalValue()));
                    return accumulatorResults;
                }
            };
            TestJobClient jobClient = new TestJobClient(TEST_JOB_ID, UID, (CoordinationRequestHandler)CollectSinkFunctionRandomITCase.this.functionWrapper.getCoordinator(), infoProvider);
            this.iterator.setJobClient((JobClient)jobClient);
        }

        public void run() throws Exception {
            Random random = new Random();
            while (this.iterator.hasNext()) {
                this.results.add((Integer)this.iterator.next());
                if (!random.nextBoolean()) continue;
                try {
                    Thread.sleep(5L);
                }
                catch (InterruptedException interruptedException) {}
            }
            this.iterator.close();
        }
    }

    private static class CheckpointCountdown {
        private final long id;
        private final List<Integer> data;
        private int countdown;

        private CheckpointCountdown(long id, List<Integer> data, int countdown) {
            this.id = id;
            this.data = new ArrayList<Integer>(data);
            this.countdown = countdown;
        }

        private boolean tick() {
            if (this.countdown > 0) {
                --this.countdown;
                return this.countdown == 0;
            }
            return false;
        }
    }

    private class CheckpointedDataFeeder
    implements RunnableWithException {
        private LinkedList<Integer> data;
        private List<Integer> checkpointedData;
        private long checkpointId;
        private long lastSuccessCheckpointId;
        private final List<CheckpointCountdown> checkpointCountdowns;

        private CheckpointedDataFeeder(List<Integer> data) {
            this.data = new LinkedList<Integer>(data);
            this.checkpointedData = new ArrayList<Integer>(data);
            this.checkpointId = 0L;
            this.lastSuccessCheckpointId = 0L;
            this.checkpointCountdowns = new ArrayList<CheckpointCountdown>();
        }

        public void run() throws Exception {
            Random random = new Random();
            CollectSinkFunctionRandomITCase.this.functionWrapper.openFunctionWithState();
            while (this.data.size() > 0) {
                ListIterator<CheckpointCountdown> iterator = this.checkpointCountdowns.listIterator();
                while (iterator.hasNext()) {
                    CheckpointCountdown countdown = iterator.next();
                    if (countdown.id < this.lastSuccessCheckpointId) {
                        iterator.remove();
                        continue;
                    }
                    if (!countdown.tick()) continue;
                    this.checkpointedData = countdown.data;
                    CollectSinkFunctionRandomITCase.this.functionWrapper.checkpointComplete(countdown.id);
                    this.lastSuccessCheckpointId = countdown.id;
                    iterator.remove();
                }
                int r = random.nextInt(10);
                if (r < 6) {
                    int size = Math.min(this.data.size(), random.nextInt(9) + 1);
                    for (int i = 0; i < size; ++i) {
                        CollectSinkFunctionRandomITCase.this.functionWrapper.invoke(this.data.removeFirst());
                    }
                } else if (r < 9) {
                    ++this.checkpointId;
                    if (random.nextBoolean()) {
                        this.checkpointCountdowns.add(new CheckpointCountdown(this.checkpointId, this.data, random.nextInt(3) + 1));
                    }
                    CollectSinkFunctionRandomITCase.this.functionWrapper.checkpointFunction(this.checkpointId);
                } else {
                    this.checkpointCountdowns.clear();
                    Collections.shuffle(this.checkpointedData);
                    this.data = new LinkedList<Integer>(this.checkpointedData);
                    CollectSinkFunctionRandomITCase.this.functionWrapper.closeFunctionAbnormally();
                    CollectSinkFunctionRandomITCase.this.functionWrapper.openFunctionWithState();
                }
                if (!random.nextBoolean()) continue;
                Thread.sleep(random.nextInt(10));
            }
            CollectSinkFunctionRandomITCase.this.functionWrapper.closeFunctionNormally();
            CollectSinkFunctionRandomITCase.this.jobFinished = true;
        }
    }

    private class UncheckpointedDataFeeder
    implements RunnableWithException {
        private LinkedList<Integer> data;
        private final List<Integer> originalData;
        private boolean failedBefore;

        private UncheckpointedDataFeeder(List<Integer> data) {
            this.data = new LinkedList<Integer>(data);
            this.originalData = new ArrayList<Integer>(data);
            this.failedBefore = false;
        }

        public void run() throws Exception {
            Random random = new Random();
            CollectSinkFunctionRandomITCase.this.functionWrapper.openFunction();
            while (this.data.size() > 0) {
                int size = Math.min(this.data.size(), random.nextInt(9) + 1);
                for (int i = 0; i < size; ++i) {
                    CollectSinkFunctionRandomITCase.this.functionWrapper.invoke(this.data.removeFirst());
                }
                if (!this.failedBefore && this.data.size() < this.originalData.size() / 2) {
                    if (random.nextBoolean()) {
                        this.data = new LinkedList<Integer>(this.originalData);
                        Collections.shuffle(this.data);
                        CollectSinkFunctionRandomITCase.this.functionWrapper.closeFunctionAbnormally();
                        CollectSinkFunctionRandomITCase.this.functionWrapper.openFunction();
                    }
                    this.failedBefore = true;
                }
                if (!random.nextBoolean()) continue;
                Thread.sleep(random.nextInt(10));
            }
            CollectSinkFunctionRandomITCase.this.functionWrapper.closeFunctionNormally();
            CollectSinkFunctionRandomITCase.this.jobFinished = true;
        }
    }
}

