/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterators;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.ClassRule;
import org.junit.Test;

public class CoordinatorEventsExactlyOnceITCase
extends TestLogger {
    private static final ConfigOption<String> ACC_NAME = ConfigOptions.key((String)"acc").stringType().noDefaultValue();
    private static final String OPERATOR_1_NAME = "operator-1";
    private static final String OPERATOR_2_NAME = "operator-2";
    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).setNumberSlotsPerTaskManager(1).build());

    @Test
    public void test() throws Exception {
        TestScript.reset();
        int numEvents1 = 200;
        int numEvents2 = 10;
        boolean delay1 = true;
        int delay2 = 100;
        JobVertex task1 = CoordinatorEventsExactlyOnceITCase.buildJobVertex(OPERATOR_1_NAME, 200, 1);
        JobVertex task2 = CoordinatorEventsExactlyOnceITCase.buildJobVertex(OPERATOR_2_NAME, 10, 100);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().setJobName("Coordinator Events Job").addJobVertices(Arrays.asList(task1, task2)).setJobCheckpointingSettings(CoordinatorEventsExactlyOnceITCase.createCheckpointSettings()).build();
        JobExecutionResult result = MINI_CLUSTER.getMiniCluster().executeJobBlocking(jobGraph);
        CoordinatorEventsExactlyOnceITCase.checkListContainsSequence((List)result.getAccumulatorResult(OPERATOR_1_NAME), 200);
        CoordinatorEventsExactlyOnceITCase.checkListContainsSequence((List)result.getAccumulatorResult(OPERATOR_2_NAME), 10);
    }

    protected static void checkListContainsSequence(List<Integer> ints, int length) {
        Object[] expected = new Integer[length];
        for (int i = 0; i < length; ++i) {
            expected[i] = i;
        }
        Assertions.assertThat(ints).containsExactly(expected);
    }

    private static JobVertex buildJobVertex(final String name, final int numEvents, final int delay) throws IOException {
        JobVertex vertex = new JobVertex(name);
        final OperatorID opId = OperatorID.fromJobVertexID((JobVertexID)vertex.getID());
        vertex.setParallelism(1);
        vertex.setInvokableClass(EventCollectingTask.class);
        vertex.getConfiguration().setString(ACC_NAME, name);
        OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider(){

            public OperatorID getOperatorId() {
                return opId;
            }

            public OperatorCoordinator create(OperatorCoordinator.Context context) {
                return new EventSendingCoordinator(context, name, numEvents, delay);
            }
        };
        vertex.addOperatorCoordinator(new SerializedValue((Object)provider));
        return vertex;
    }

    private static JobCheckpointingSettings createCheckpointSettings() {
        CheckpointCoordinatorConfiguration coordCfg = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).setCheckpointInterval(10L).setCheckpointTimeout(100000L).build();
        return new JobCheckpointingSettings(coordCfg, null);
    }

    static byte[] intToBytes(int value) {
        byte[] bytes = new byte[4];
        ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).putInt(0, value);
        return bytes;
    }

    static int bytesToInt(byte[] bytes) {
        Assertions.assertThat((byte[])bytes).hasSize(4);
        return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt(0);
    }

    static ByteStreamStateHandle stateToHandle(List<Integer> state) throws IOException {
        byte[] bytes = InstantiationUtil.serializeObject(state);
        return new ByteStreamStateHandle("state", bytes);
    }

    static List<Integer> handleToState(StreamStateHandle handle) throws IOException, ClassNotFoundException {
        ByteStreamStateHandle byteHandle = (ByteStreamStateHandle)handle;
        return (List)InstantiationUtil.deserializeObject((byte[])byteHandle.getData(), (ClassLoader)EventCollectingTask.class.getClassLoader());
    }

    static TaskStateSnapshot createSnapshot(StreamStateHandle handle, OperatorID operatorId) {
        OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        OperatorStreamStateHandle state = new OperatorStreamStateHandle(Collections.singletonMap("\u00e9tat_et_moi_:_\u00e7a_fait_deux", metaInfo), handle);
        OperatorSubtaskState oss = OperatorSubtaskState.builder().setManagedOperatorState((OperatorStateHandle)state).build();
        return new TaskStateSnapshot(Collections.singletonMap(operatorId, oss));
    }

    @Nullable
    static StreamStateHandle readSnapshot(TaskStateManager stateManager, OperatorID operatorId) {
        PrioritizedOperatorSubtaskState poss = stateManager.prioritizedOperatorState(operatorId);
        if (!poss.isRestored()) {
            return null;
        }
        StateObjectCollection opState = (StateObjectCollection)stateManager.prioritizedOperatorState(operatorId).getPrioritizedManagedOperatorState().get(0);
        OperatorStateHandle handle = (OperatorStateHandle)Iterators.getOnlyElement((Iterator)opState.iterator());
        return handle.getDelegateStateHandle();
    }

    protected static final class TestScript {
        private static final Map<String, TestScript> MAP_FOR_OPERATOR = new HashMap<String, TestScript>();
        private final Collection<CountDownLatch> recoveredTaskRunning = new ArrayList<CountDownLatch>();
        private boolean failedBefore;

        protected TestScript() {
        }

        public static TestScript getForOperator(String operatorName) {
            return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, key -> new TestScript());
        }

        public static void reset() {
            MAP_FOR_OPERATOR.clear();
        }

        public void recordHasFailed() {
            this.failedBefore = true;
        }

        public boolean hasAlreadyFailed() {
            return this.failedBefore;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void registerHookToNotifyAfterTaskRecovered(CountDownLatch latch) {
            Collection<CountDownLatch> collection = this.recoveredTaskRunning;
            synchronized (collection) {
                this.recoveredTaskRunning.add(latch);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void signalRecoveredTaskReady() {
            Collection<CountDownLatch> collection = this.recoveredTaskRunning;
            synchronized (collection) {
                for (CountDownLatch latch : this.recoveredTaskRunning) {
                    latch.countDown();
                }
                this.recoveredTaskRunning.clear();
            }
        }
    }

    public static final class EventCollectingTask
    extends AbstractInvokable {
        private final OperatorID operatorID;
        private final String accumulatorName;
        private final LinkedBlockingQueue<Object> actions;
        private volatile boolean running = true;

        public EventCollectingTask(Environment environment) {
            super(environment);
            this.operatorID = OperatorID.fromJobVertexID((JobVertexID)environment.getJobVertexId());
            this.accumulatorName = (String)environment.getTaskConfiguration().get(ACC_NAME);
            this.actions = new LinkedBlockingQueue();
        }

        public void invoke() throws Exception {
            Object next;
            ArrayList<Integer> collectedInts = new ArrayList<Integer>();
            this.restoreState(collectedInts);
            this.getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(this.operatorID, new SerializedValue((Object)new StartEvent(-1)));
            CoordinationResponse response = (CoordinationResponse)this.getEnvironment().getOperatorCoordinatorEventGateway().sendRequestToCoordinator(this.operatorID, new SerializedValue((Object)new IntegerRequest(100))).get();
            Assertions.assertThat((Object)response).isInstanceOf(IntegerResponse.class);
            Assertions.assertThat((int)((IntegerResponse)response).value).isEqualTo(101);
            while (this.running && !((next = this.actions.take()) instanceof EndEvent)) {
                if (next instanceof IntegerEvent) {
                    collectedInts.add(((IntegerEvent)next).value);
                    continue;
                }
                if (next instanceof CheckpointMetaData) {
                    this.takeCheckpoint(((CheckpointMetaData)next).getCheckpointId(), collectedInts);
                    this.getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(this.operatorID, new SerializedValue((Object)new AcknowledgeCheckpointEvent(((CheckpointMetaData)next).getCheckpointId())));
                    continue;
                }
                throw new Exception("Unrecognized: " + next);
            }
            if (this.running) {
                ListAccumulator acc = new ListAccumulator();
                collectedInts.forEach(arg_0 -> ((ListAccumulator)acc).add(arg_0));
                this.getEnvironment().getAccumulatorRegistry().getUserMap().put(this.accumulatorName, acc);
            }
        }

        public void cancel() throws Exception {
            this.running = false;
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            this.actions.add(checkpointMetaData);
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            return CompletableFuture.completedFuture(null);
        }

        public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {
            return CompletableFuture.completedFuture(null);
        }

        public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
            try {
                OperatorEvent opEvent = (OperatorEvent)event.deserializeValue(this.getUserCodeClassLoader());
                this.actions.add(opEvent);
            }
            catch (IOException | ClassNotFoundException e) {
                throw new FlinkException((Throwable)e);
            }
        }

        private void takeCheckpoint(long checkpointId, List<Integer> state) throws Exception {
            ByteStreamStateHandle handle = CoordinatorEventsExactlyOnceITCase.stateToHandle(state);
            TaskStateSnapshot snapshot = CoordinatorEventsExactlyOnceITCase.createSnapshot((StreamStateHandle)handle, this.operatorID);
            this.getEnvironment().acknowledgeCheckpoint(checkpointId, new CheckpointMetrics(), snapshot);
        }

        private void restoreState(List<Integer> target) throws Exception {
            StreamStateHandle stateHandle = CoordinatorEventsExactlyOnceITCase.readSnapshot(this.getEnvironment().getTaskStateManager(), this.operatorID);
            if (stateHandle != null) {
                List<Integer> list = CoordinatorEventsExactlyOnceITCase.handleToState(stateHandle);
                target.addAll(list);
            }
        }
    }

    protected static class EventSendingCoordinator
    implements OperatorCoordinator,
    CoordinationRequestHandler {
        protected final OperatorCoordinator.Context context;
        protected final int maxNumberBeforeFailure;
        protected final TestScript testScript;
        private final ExecutorService mailboxExecutor;
        private final ScheduledExecutorService scheduledExecutor;
        private final int delay;
        private final int maxNumber;
        protected int nextNumber;
        protected CompletableFuture<byte[]> nextToComplete;
        protected CompletableFuture<byte[]> requestedCheckpoint;
        private OperatorCoordinator.SubtaskGateway subtaskGateway;
        private boolean workLoopRunning;

        protected EventSendingCoordinator(OperatorCoordinator.Context context, String name, int numEvents, int delay) {
            Preconditions.checkArgument((delay > 0 ? 1 : 0) != 0);
            Preconditions.checkArgument((numEvents >= 3 ? 1 : 0) != 0);
            this.context = context;
            this.maxNumber = numEvents;
            this.delay = delay;
            this.testScript = TestScript.getForOperator(name);
            this.mailboxExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Coordinator Mailbox for " + name));
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Coordinator Periodic Actions for " + name));
            this.nextNumber = 0;
            this.maxNumberBeforeFailure = numEvents * 2 / 3 + new Random().nextInt(numEvents / 6);
        }

        public void start() throws Exception {
        }

        public void close() throws Exception {
            this.scheduledExecutor.shutdownNow();
            Assertions.assertThat((boolean)this.scheduledExecutor.awaitTermination(10L, TimeUnit.MINUTES)).isTrue();
            this.mailboxExecutor.shutdownNow();
            Assertions.assertThat((boolean)this.mailboxExecutor.awaitTermination(10L, TimeUnit.MINUTES)).isTrue();
        }

        public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {
            if (subtask != 0 || !(event instanceof StartEvent)) {
                throw new Exception(String.format("Don't recognize event '%s' from task %d.", event, subtask));
            }
            this.testScript.signalRecoveredTaskReady();
            this.runInMailbox(() -> {
                Preconditions.checkState((!this.workLoopRunning ? 1 : 0) != 0);
                Preconditions.checkState((this.subtaskGateway != null ? 1 : 0) != 0);
                if (((StartEvent)event).lastValue >= 0) {
                    this.nextNumber = ((StartEvent)event).lastValue + 1;
                }
                this.workLoopRunning = true;
                this.scheduleSingleAction();
            });
        }

        public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
            CountDownLatch successorIsRunning = new CountDownLatch(1);
            this.testScript.registerHookToNotifyAfterTaskRecovered(successorIsRunning);
            this.runInMailbox(() -> {
                try {
                    successorIsRunning.await();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                this.executeSingleAction();
            });
            this.runInMailbox(() -> {
                this.workLoopRunning = false;
                this.subtaskGateway = null;
            });
        }

        public void subtaskReset(int subtask, long checkpointId) {
        }

        public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
            this.runInMailbox(() -> {
                Preconditions.checkState((!this.workLoopRunning ? 1 : 0) != 0);
                this.subtaskGateway = gateway;
            });
        }

        public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
            this.runInMailbox(() -> {
                this.nextNumber = checkpointData == null ? 0 : CoordinatorEventsExactlyOnceITCase.bytesToInt(checkpointData);
            });
        }

        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
            this.runInMailbox(() -> {
                this.requestedCheckpoint = result;
            });
        }

        public void notifyCheckpointComplete(long checkpointId) {
        }

        protected void runInMailbox(Runnable action) {
            this.mailboxExecutor.execute(() -> {
                try {
                    action.run();
                }
                catch (Throwable t) {
                    t.printStackTrace();
                    ExceptionUtils.rethrow((Throwable)t);
                }
            });
        }

        void scheduleSingleAction() {
            block2: {
                try {
                    this.scheduledExecutor.schedule(() -> this.runInMailbox(this::executeSingleAction), (long)this.delay, TimeUnit.MILLISECONDS);
                }
                catch (RejectedExecutionException e) {
                    if (this.scheduledExecutor.isShutdown()) break block2;
                    throw e;
                }
            }
        }

        private void executeSingleAction() {
            if (!this.workLoopRunning) {
                return;
            }
            try {
                this.handleCheckpoint();
                this.sendNextEvent();
                this.checkWhetherToTriggerFailure();
            }
            catch (Throwable t) {
                t.printStackTrace();
                System.exit(-1);
            }
            this.scheduleSingleAction();
        }

        protected void handleCheckpoint() {
            if (this.nextToComplete != null) {
                int numToCheckpoint = Math.min(this.nextNumber, this.maxNumber);
                this.nextToComplete.complete(CoordinatorEventsExactlyOnceITCase.intToBytes(numToCheckpoint));
                this.nextToComplete = null;
            }
            if (this.requestedCheckpoint != null) {
                this.nextToComplete = this.requestedCheckpoint;
                this.requestedCheckpoint = null;
            }
        }

        protected void sendNextEvent() {
            if (this.nextNumber > this.maxNumber) {
                return;
            }
            if (this.nextNumber == this.maxNumber) {
                this.subtaskGateway.sendEvent((OperatorEvent)new EndEvent());
            } else {
                this.subtaskGateway.sendEvent((OperatorEvent)new IntegerEvent(this.nextNumber));
            }
            ++this.nextNumber;
        }

        private void checkWhetherToTriggerFailure() {
            if (this.nextNumber > this.maxNumberBeforeFailure && !this.testScript.hasAlreadyFailed()) {
                this.testScript.recordHasFailed();
                this.context.failJob((Throwable)new Exception("test failure"));
            }
        }

        public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
            if (request instanceof IntegerRequest) {
                int value = ((IntegerRequest)request).value;
                return CompletableFuture.completedFuture(new IntegerResponse(value + 1));
            }
            throw new UnsupportedOperationException("Unsupported request type: " + request);
        }
    }

    private static final class IntegerResponse
    implements CoordinationResponse {
        final int value;

        private IntegerResponse(int value) {
            this.value = value;
        }
    }

    private static final class IntegerRequest
    implements CoordinationRequest {
        final int value;

        private IntegerRequest(int value) {
            this.value = value;
        }
    }

    protected static final class IntegerEvent
    implements OperatorEvent {
        public final int value;

        private IntegerEvent(int value) {
            this.value = value;
        }

        public String toString() {
            return "IntegerEvent " + this.value;
        }
    }

    protected static final class EndEvent
    implements OperatorEvent {
        protected EndEvent() {
        }
    }

    protected static final class StartEvent
    implements OperatorEvent {
        private final int lastValue;

        public StartEvent(int lastValue) {
            this.lastValue = lastValue;
        }
    }
}

