package org.apache.flink.runtime.checkpoint;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.class */
class CheckpointCoordinatorFailureTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest$FailingCompletedCheckpointStore.class */
    public static final class FailingCompletedCheckpointStore extends AbstractCompleteCheckpointStore {
        private final Exception addCheckpointFailure;

        public FailingCompletedCheckpointStore(Exception exc) {
            super(SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), Collections.emptyList(), RestoreMode.DEFAULT));
            this.addCheckpointFailure = exc;
        }

        public CompletedCheckpoint addCheckpointAndSubsumeOldestOne(CompletedCheckpoint completedCheckpoint, CheckpointsCleaner checkpointsCleaner, Runnable runnable) throws Exception {
            throw this.addCheckpointFailure;
        }

        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            return Collections.emptyList();
        }

        public int getNumberOfRetainedCheckpoints() {
            return -1;
        }

        public int getMaxNumberOfRetainedCheckpoints() {
            return 1;
        }

        public boolean requiresExternalizedCheckpoints() {
            return false;
        }
    }

    CheckpointCoordinatorFailureTest() {
    }

    @Test
    void testFailingCompletedCheckpointStoreAdd() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(new FailingCompletedCheckpointStore(new Exception("The failing completed checkpoint store failed again... :-("))).setTimer(manuallyTriggeredScheduledExecutor).build(build);
        build2.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(build2.getNumberOfPendingCheckpoints()).isOne();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build2.getPendingCheckpoints().values().iterator().next();
        Assertions.assertThat(pendingCheckpoint.isDisposed()).isFalse();
        long longValue = ((Long) build2.getPendingCheckpoints().keySet().iterator().next()).longValue();
        KeyedStateHandle keyedStateHandle = (KeyedStateHandle) Mockito.mock(KeyedStateHandle.class);
        Mockito.when(keyedStateHandle.getStateHandleId()).thenReturn(StateHandleID.randomStateHandleId());
        KeyedStateHandle keyedStateHandle2 = (KeyedStateHandle) Mockito.mock(KeyedStateHandle.class);
        Mockito.when(keyedStateHandle2.getStateHandleId()).thenReturn(StateHandleID.randomStateHandleId());
        OperatorStateHandle operatorStateHandle = (OperatorStateHandle) Mockito.mock(OperatorStreamStateHandle.class);
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.spy(OperatorSubtaskState.builder().setManagedOperatorState(operatorStateHandle).setRawOperatorState((OperatorStateHandle) Mockito.mock(OperatorStreamStateHandle.class)).setManagedKeyedState(keyedStateHandle).setRawKeyedState(keyedStateHandle2).setInputChannelState(StateObjectCollection.singleton(new InputChannelStateHandle(new InputChannelInfo(0, 1), (StreamStateHandle) Mockito.mock(StreamStateHandle.class), Collections.singletonList(1L)))).setResultSubpartitionState(StateObjectCollection.singleton(new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, 1), (StreamStateHandle) Mockito.mock(StreamStateHandle.class), Collections.singletonList(1L)))).build());
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        taskStateSnapshot.putSubtaskStateByOperatorID(new OperatorID(), operatorSubtaskState);
        Mockito.when(taskStateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(executionVertex.getJobvertexId()))).thenReturn(operatorSubtaskState);
        try {
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), executionVertex.getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot), "Unknown location");
            Assertions.fail("Expected a checkpoint exception because the completed checkpoint store could not store the completed checkpoint.");
        } catch (CheckpointException e) {
        }
        Assertions.assertThat(pendingCheckpoint.isDisposed()).isTrue();
        ((OperatorStateHandle) Mockito.verify(operatorSubtaskState.getManagedOperatorState().iterator().next())).discardState();
        ((OperatorStateHandle) Mockito.verify(operatorSubtaskState.getRawOperatorState().iterator().next())).discardState();
        ((KeyedStateHandle) Mockito.verify(operatorSubtaskState.getManagedKeyedState().iterator().next())).discardState();
        ((KeyedStateHandle) Mockito.verify(operatorSubtaskState.getRawKeyedState().iterator().next())).discardState();
        ((StreamStateHandle) Mockito.verify(((InputChannelStateHandle) operatorSubtaskState.getInputChannelState().iterator().next()).getDelegate())).discardState();
        ((StreamStateHandle) Mockito.verify(((ResultSubpartitionStateHandle) operatorSubtaskState.getResultSubpartitionState().iterator().next()).getDelegate())).discardState();
    }

    @Test
    void testCleanupForGenericFailure() throws Exception {
        testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1);
    }

    @Test
    void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
        testStoringFailureHandling(new PossibleInconsistentStateException(), 0);
    }

    private void testStoringFailureHandling(Exception exc, int i) throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        ScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        CompletedCheckpointStore failingCompletedCheckpointStore = new FailingCompletedCheckpointStore(exc);
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new JobID());
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(standaloneCheckpointIDCounter).setCheckpointsCleaner(new CheckpointsCleaner() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorFailureTest.1
            private static final long serialVersionUID = 2029876992397573325L;

            public void cleanCheckpointOnFailedStoring(CompletedCheckpoint completedCheckpoint, Executor executor) {
                atomicInteger.incrementAndGet();
                super.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
            }
        }).setCompletedCheckpointStore(failingCompletedCheckpointStore).setTimer(manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(checkpointStatsTracker).build(build);
        build2.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        CheckpointMetrics build3 = new CheckpointMetricsBuilder().setTotalBytesPersisted(18L).setBytesPersistedOfThisCheckpoint(18L).setBytesProcessedDuringAlignment(19L).setAsyncDurationMillis(20L).setAlignmentDurationNanos(123000000L).setCheckpointStartDelayNanos(567000000L).build();
        try {
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, standaloneCheckpointIDCounter.getLast(), build3, new TaskStateSnapshot()), "unknown location");
            Assertions.fail("CheckpointException should have been thrown.");
        } catch (CheckpointException e) {
            Assertions.assertThat(e.getCheckpointFailureReason()).isEqualTo(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE);
        }
        AbstractCheckpointStats checkpointById = checkpointStatsTracker.createSnapshot().getHistory().getCheckpointById(standaloneCheckpointIDCounter.getLast());
        Assertions.assertThat(checkpointById.getCheckpointId()).isEqualTo(standaloneCheckpointIDCounter.getLast());
        Assertions.assertThat(checkpointById.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED);
        CheckpointCoordinatorTest.assertStatsMetrics(executionVertex.getJobvertexId(), 0, build3, checkpointById);
        Assertions.assertThat(atomicInteger.get()).isEqualTo(i);
    }
}
