/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class RecreateOnResetOperatorCoordinatorTest {
    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
    private static final int NUM_SUBTASKS = 1;

    RecreateOnResetOperatorCoordinatorTest() {
    }

    @Test
    void testQuiesceableContextForwardsProperties() {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = new RecreateOnResetOperatorCoordinator.QuiesceableContext((OperatorCoordinator.Context)context);
        Assertions.assertThat((Comparable)quiesceableContext.getOperatorId()).isEqualTo((Object)OPERATOR_ID);
        Assertions.assertThat((int)quiesceableContext.currentParallelism()).isEqualTo(1);
    }

    @Test
    void testQuiesceableContextNotQuiesced() {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = new RecreateOnResetOperatorCoordinator.QuiesceableContext((OperatorCoordinator.Context)context);
        Exception exception = new Exception();
        quiesceableContext.failJob((Throwable)exception);
        Assertions.assertThat((boolean)context.isJobFailed()).isTrue();
        Assertions.assertThat((Throwable)context.getJobFailureReason()).isEqualTo((Object)exception);
    }

    @Test
    void testQuiescedContext() {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = new RecreateOnResetOperatorCoordinator.QuiesceableContext((OperatorCoordinator.Context)context);
        quiesceableContext.quiesce();
        quiesceableContext.failJob((Throwable)new Exception());
        Assertions.assertThat((boolean)context.isJobFailed()).isFalse();
    }

    @Test
    void testResetToCheckpoint() throws Exception {
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider(null);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator coordinator = this.createCoordinator(provider, context);
        RecreateOnResetOperatorCoordinator.QuiesceableContext contextBeforeReset = coordinator.getQuiesceableContext();
        TestingOperatorCoordinator internalCoordinatorBeforeReset = this.getInternalCoordinator(coordinator);
        byte[] stateToRestore = new byte[]{};
        coordinator.resetToCheckpoint(1L, stateToRestore);
        coordinator.waitForAllAsyncCallsFinish();
        Assertions.assertThat((boolean)contextBeforeReset.isQuiesced()).isTrue();
        Assertions.assertThat((byte[])internalCoordinatorBeforeReset.getLastRestoredCheckpointState()).isNull();
        TestingOperatorCoordinator internalCoordinatorAfterReset = this.getInternalCoordinator(coordinator);
        Assertions.assertThat((byte[])internalCoordinatorAfterReset.getLastRestoredCheckpointState()).isEqualTo((Object)stateToRestore);
        Assertions.assertThat((Object)internalCoordinatorBeforeReset).isNotEqualTo((Object)internalCoordinatorAfterReset);
    }

    @Test
    void testResetToCheckpointTimeout() throws Exception {
        long closingTimeoutMs = 1L;
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider(new CountDownLatch(1));
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)provider.create(context, 1L);
        coordinator.resetToCheckpoint(2L, new byte[0]);
        CommonTestUtils.waitUtil(context::isJobFailed, (Duration)Duration.ofSeconds(5L), (String)"The job should fail due to resetToCheckpoint() timeout.");
    }

    @Test
    void testMethodCallsOnLongResetToCheckpoint() throws Exception {
        long closingTimeoutMs = Long.MAX_VALUE;
        CountDownLatch blockOnCloseLatch = new CountDownLatch(1);
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider(blockOnCloseLatch);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 2);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)provider.create(context, Long.MAX_VALUE);
        byte[] restoredState = new byte[]{};
        TestingEvent testingEvent = new TestingEvent();
        long completedCheckpointId = 1234L;
        coordinator.resetToCheckpoint(2L, restoredState);
        coordinator.handleEventFromOperator(1, 0, (OperatorEvent)testingEvent);
        coordinator.executionAttemptFailed(1, 0, (Throwable)new Exception("Subtask Failure Exception."));
        coordinator.notifyCheckpointComplete(1234L);
        Assertions.assertThat(provider.getCreatedCoordinators()).hasSize(1);
        blockOnCloseLatch.countDown();
        CompletableFuture checkpointFuture = new CompletableFuture();
        coordinator.checkpointCoordinator(5678L, checkpointFuture);
        coordinator.waitForAllAsyncCallsFinish();
        TestingOperatorCoordinator internalCoordinatorAfterReset = this.getInternalCoordinator(coordinator);
        Assertions.assertThat(internalCoordinatorAfterReset.getLastTriggeredCheckpoint()).isEqualTo(checkpointFuture);
        Assertions.assertThat((Object)internalCoordinatorAfterReset).isEqualTo((Object)provider.getCreatedCoordinators().get(1));
        Assertions.assertThat((byte[])internalCoordinatorAfterReset.getLastRestoredCheckpointState()).isEqualTo((Object)restoredState);
        Assertions.assertThat((Object)internalCoordinatorAfterReset.getNextReceivedOperatorEvent()).isEqualTo((Object)testingEvent);
        Assertions.assertThat(internalCoordinatorAfterReset.getFailedTasks()).isEqualTo(Collections.singletonList(1));
        Assertions.assertThat((long)internalCoordinatorAfterReset.getLastCheckpointComplete()).isEqualTo(1234L);
    }

    @Test
    @Timeout(value=30L)
    void testConsecutiveResetToCheckpoint() throws Exception {
        long closingTimeoutMs = Long.MAX_VALUE;
        int numResets = 1000;
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider();
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)provider.create(context, Long.MAX_VALUE);
        for (int i = 0; i < 1000; ++i) {
            coordinator.handleEventFromOperator(1, 0, (OperatorEvent)new TestingEvent(i));
            coordinator.executionAttemptFailed(i, 0, (Throwable)new Exception());
            CompletableFuture<byte[]> future = CompletableFuture.completedFuture(new byte[i]);
            coordinator.checkpointCoordinator((long)i, future);
            int loop = i;
            future.thenRun(() -> coordinator.notifyCheckpointComplete((long)loop));
            coordinator.resetToCheckpoint((long)i, new byte[i + 1]);
        }
        coordinator.waitForAllAsyncCallsFinish();
        for (TestingOperatorCoordinator internalCoordinator : provider.getCreatedCoordinators()) {
            int indexOfCoordinator = 0;
            byte[] lastRestoredState = internalCoordinator.getLastRestoredCheckpointState();
            if (lastRestoredState != null) {
                indexOfCoordinator = lastRestoredState.length;
            }
            TestingEvent testingEvent = (TestingEvent)internalCoordinator.getNextReceivedOperatorEvent();
            List<Integer> failedTasks = internalCoordinator.getFailedTasks();
            int finalIndexOfCoordinator = indexOfCoordinator;
            Assertions.assertThat((Object)testingEvent).satisfiesAnyOf(new ThrowingConsumer[]{x -> Assertions.assertThat((Object)x).isNull(), x -> Assertions.assertThat((int)x.getId()).isEqualTo(finalIndexOfCoordinator)});
            Assertions.assertThat(failedTasks).satisfiesAnyOf(new ThrowingConsumer[]{x -> Assertions.assertThat((List)x).isEmpty(), x -> ((ObjectAssert)((ListAssert)Assertions.assertThat((List)x).hasSize(1)).element(0)).isEqualTo((Object)finalIndexOfCoordinator)});
            Assertions.assertThat((Object)internalCoordinator).satisfiesAnyOf(new ThrowingConsumer[]{x -> Assertions.assertThat((boolean)x.hasCompleteCheckpoint()).isFalse(), x -> Assertions.assertThat((long)x.getLastCheckpointComplete()).isEqualTo((long)finalIndexOfCoordinator)});
            Assertions.assertThat((Object)internalCoordinator).satisfiesAnyOf(new ThrowingConsumer[]{x -> Assertions.assertThat((boolean)x.hasTriggeredCheckpoint()).isFalse(), x -> Assertions.assertThat((byte[])x.getLastTriggeredCheckpoint().get()).hasSize(finalIndexOfCoordinator)});
        }
        coordinator.close();
        TestingOperatorCoordinator internalCoordinator = this.getInternalCoordinator(coordinator);
        CommonTestUtils.waitUtil(internalCoordinator::isClosed, (Duration)Duration.ofSeconds(5L), (String)"Timed out when waiting for the coordinator to close.");
    }

    @Test
    void testNotifyCheckpointAbortedSuccess() throws Exception {
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider(null);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator coordinator = this.createCoordinator(provider, context);
        TestingOperatorCoordinator internalCoordinatorAfterReset = this.getInternalCoordinator(coordinator);
        long checkpointId = 10L;
        coordinator.notifyCheckpointAborted(checkpointId);
        Assertions.assertThat((long)internalCoordinatorAfterReset.getLastCheckpointAborted()).isEqualTo(checkpointId);
    }

    private RecreateOnResetOperatorCoordinator createCoordinator(TestingCoordinatorProvider provider, OperatorCoordinator.Context context) throws Exception {
        return (RecreateOnResetOperatorCoordinator)provider.create(context);
    }

    private TestingOperatorCoordinator getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) throws Exception {
        return (TestingOperatorCoordinator)coordinator.getInternalCoordinator();
    }

    private static class TestingEvent
    implements OperatorEvent {
        private static final long serialVersionUID = -3289352911927668275L;
        private final int id;

        private TestingEvent() {
            this(-1);
        }

        private TestingEvent(int id) {
            this.id = id;
        }

        private int getId() {
            return this.id;
        }
    }

    private static class TestingCoordinatorProvider
    extends RecreateOnResetOperatorCoordinator.Provider {
        private final CountDownLatch blockOnCloseLatch;
        private final List<TestingOperatorCoordinator> createdCoordinators;

        public TestingCoordinatorProvider() {
            this(null);
        }

        public TestingCoordinatorProvider(CountDownLatch blockOnCloseLatch) {
            super(OPERATOR_ID);
            this.blockOnCloseLatch = blockOnCloseLatch;
            this.createdCoordinators = new ArrayList<TestingOperatorCoordinator>();
        }

        protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
            TestingOperatorCoordinator testingCoordinator = new TestingOperatorCoordinator(context, this.blockOnCloseLatch);
            this.createdCoordinators.add(testingCoordinator);
            return testingCoordinator;
        }

        private List<TestingOperatorCoordinator> getCreatedCoordinators() {
            return this.createdCoordinators;
        }
    }
}

