/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.CheckpointStartRequest;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.checkpoint.channel.TestException;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ChannelStateWriteRequestExecutorImplTest {
    private static final JobID JOB_ID = new JobID();
    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
    private static final int SUBTASK_INDEX = 0;

    ChannelStateWriteRequestExecutorImplTest() {
    }

    @Test
    void testCloseAfterSubmit() {
        Assertions.assertThatThrownBy(() -> this.testCloseAfterSubmit((BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception>)((BiConsumerWithException)ChannelStateWriteRequestExecutor::submit))).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testCloseAfterSubmitPriority() {
        Assertions.assertThatThrownBy(() -> this.testCloseAfterSubmit((BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception>)((BiConsumerWithException)ChannelStateWriteRequestExecutor::submitPriority))).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testSubmitFailure() throws Exception {
        this.testSubmitFailure((BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception>)((BiConsumerWithException)ChannelStateWriteRequestExecutor::submit));
    }

    @Test
    void testSubmitPriorityFailure() throws Exception {
        this.testSubmitFailure((BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception>)((BiConsumerWithException)ChannelStateWriteRequestExecutor::submitPriority));
    }

    private void testCloseAfterSubmit(BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception> requestFun) throws Exception {
        WorkerClosingDeque closingDeque = new WorkerClosingDeque();
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, (Deque)closingDeque, 5, e -> {});
        closingDeque.setWorker((ChannelStateWriteRequestExecutor)worker);
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        TestWriteRequest request = new TestWriteRequest(JOB_VERTEX_ID, 0);
        requestFun.accept((Object)worker, (Object)request);
        Assertions.assertThat((Collection)closingDeque).isEmpty();
        Assertions.assertThat((boolean)request.isCancelled()).isFalse();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testSubmitFailure(BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception> submitAction) throws Exception {
        TestWriteRequest request = new TestWriteRequest(JOB_VERTEX_ID, 0);
        ArrayDeque deque = new ArrayDeque();
        try {
            ChannelStateWriteRequestExecutorImpl executor = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, deque, 5, e -> {});
            executor.registerSubtask(JOB_VERTEX_ID, 0);
            submitAction.accept((Object)executor, (Object)request);
        }
        catch (IllegalStateException e2) {
            return;
        }
        finally {
            Assertions.assertThat((boolean)request.cancelled).isTrue();
            Assertions.assertThat(deque).isEmpty();
        }
        throw new RuntimeException("expected exception not thrown");
    }

    @Test
    void testCleanup() throws IOException {
        TestWriteRequest request = new TestWriteRequest(JOB_VERTEX_ID, 0);
        ArrayDeque<TestWriteRequest> deque = new ArrayDeque<TestWriteRequest>();
        deque.add(request);
        TestRequestDispatcher requestProcessor = new TestRequestDispatcher();
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl((ChannelStateWriteRequestDispatcher)requestProcessor, deque, 5, e -> {});
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        worker.releaseSubtask(JOB_VERTEX_ID, 0);
        worker.run();
        Assertions.assertThat((boolean)requestProcessor.isStopped()).isTrue();
        Assertions.assertThat(deque).isEmpty();
        Assertions.assertThat((boolean)request.isCancelled()).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testIgnoresInterruptsWhileRunning() throws Exception {
        TestRequestDispatcher requestProcessor = new TestRequestDispatcher();
        ArrayDeque deque = new ArrayDeque();
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl((ChannelStateWriteRequestDispatcher)requestProcessor, deque, 5, e -> {});
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        try {
            worker.start();
            worker.getThread().interrupt();
            worker.submit((ChannelStateWriteRequest)new TestWriteRequest(JOB_VERTEX_ID, 0));
            worker.getThread().interrupt();
            while (!deque.isEmpty()) {
                Thread.sleep(100L);
            }
        }
        finally {
            worker.releaseSubtask(JOB_VERTEX_ID, 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCanBeClosed() throws Exception {
        long checkpointId = 1L;
        ChannelStateWriteRequestDispatcherImpl processor = new ChannelStateWriteRequestDispatcherImpl((CheckpointStorage)new JobManagerCheckpointStorage(), JOB_ID, (ChannelStateSerializer)new ChannelStateSerializerImpl());
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl((ChannelStateWriteRequestDispatcher)processor, 5, e -> {});
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        try {
            worker.start();
            worker.submit((ChannelStateWriteRequest)new CheckpointStartRequest(JOB_VERTEX_ID, 0, checkpointId, new ChannelStateWriter.ChannelStateWriteResult(), CheckpointStorageLocationReference.getDefault()));
            worker.submit(ChannelStateWriteRequest.write((JobVertexID)JOB_VERTEX_ID, (int)0, (long)checkpointId, (ResultSubpartitionInfo)new ResultSubpartitionInfo(0, 0), new CompletableFuture()));
            worker.submit(ChannelStateWriteRequest.write((JobVertexID)JOB_VERTEX_ID, (int)0, (long)checkpointId, (ResultSubpartitionInfo)new ResultSubpartitionInfo(0, 0), new CompletableFuture()));
        }
        finally {
            worker.releaseSubtask(JOB_VERTEX_ID, 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSkipUnreadyDataFuture() throws Exception {
        final int subtaskIndex0 = 0;
        final int subtaskIndex1 = 1;
        LinkedList<TestWriteRequest> firstBatchRequests = new LinkedList<TestWriteRequest>();
        LinkedList<TestWriteRequest> secondBatchRequests = new LinkedList<TestWriteRequest>();
        CompletableFuture dataFuture = new CompletableFuture();
        final int firstBatchSubtask1Count = 3;
        final int subtask0Count = 4;
        final int subtask1Count = 4;
        firstBatchRequests.add(new TestWriteRequest(JOB_VERTEX_ID, subtaskIndex1));
        firstBatchRequests.add(new TestWriteRequest(JOB_VERTEX_ID, subtaskIndex0));
        firstBatchRequests.add(new TestWriteRequest(JOB_VERTEX_ID, subtaskIndex0, dataFuture));
        firstBatchRequests.add(new TestWriteRequest(JOB_VERTEX_ID, subtaskIndex0));
        firstBatchRequests.add(new TestWriteRequest(JOB_VERTEX_ID, subtaskIndex1));
        firstBatchRequests.add(new TestWriteRequest(JOB_VERTEX_ID, subtaskIndex1));
        secondBatchRequests.add(new TestWriteRequest(JOB_VERTEX_ID, subtaskIndex0));
        secondBatchRequests.add(new TestWriteRequest(JOB_VERTEX_ID, subtaskIndex1));
        final CompletableFuture firstBatchFuture = new CompletableFuture();
        final CompletableFuture allReceivedFuture = new CompletableFuture();
        final AtomicInteger subtask0ReceivedCounter = new AtomicInteger(-1);
        final AtomicInteger subtask1ReceivedCounter = new AtomicInteger(-1);
        TestRequestDispatcher throwingRequestProcessor = new TestRequestDispatcher(){

            @Override
            public void dispatch(ChannelStateWriteRequest request) {
                if (request.getSubtaskIndex() == subtaskIndex0) {
                    subtask0ReceivedCounter.incrementAndGet();
                } else if (request.getSubtaskIndex() == subtaskIndex1) {
                    if (subtask1ReceivedCounter.incrementAndGet() == firstBatchSubtask1Count) {
                        firstBatchFuture.complete(null);
                    }
                } else {
                    throw new IllegalStateException(String.format("Unknown subtask index %s.", request.getSubtaskIndex()));
                }
                if (subtask0ReceivedCounter.get() == subtask0Count && subtask1ReceivedCounter.get() == subtask1Count) {
                    allReceivedFuture.complete(null);
                }
            }
        };
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl((ChannelStateWriteRequestDispatcher)throwingRequestProcessor, 5, e -> {});
        worker.registerSubtask(JOB_VERTEX_ID, subtaskIndex0);
        worker.registerSubtask(JOB_VERTEX_ID, subtaskIndex1);
        try {
            worker.start();
            for (ChannelStateWriteRequest channelStateWriteRequest : firstBatchRequests) {
                worker.submit(channelStateWriteRequest);
            }
            firstBatchFuture.get();
            Assertions.assertThat((int)subtask0ReceivedCounter.get()).isOne();
            Assertions.assertThat((int)subtask1ReceivedCounter.get()).isEqualTo(firstBatchSubtask1Count);
            for (ChannelStateWriteRequest channelStateWriteRequest : secondBatchRequests) {
                worker.submit(channelStateWriteRequest);
            }
            dataFuture.complete(Collections.emptyList());
            allReceivedFuture.get();
            Assertions.assertThat((int)subtask0ReceivedCounter.get()).isEqualTo(subtask0Count);
            Assertions.assertThat((int)subtask1ReceivedCounter.get()).isEqualTo(subtask1Count);
        }
        finally {
            worker.releaseSubtask(JOB_VERTEX_ID, subtaskIndex0);
            worker.releaseSubtask(JOB_VERTEX_ID, subtaskIndex1);
        }
    }

    @Test
    void testRecordsException() throws IOException {
        final TestException testException = new TestException();
        TestRequestDispatcher throwingRequestProcessor = new TestRequestDispatcher(){

            @Override
            public void dispatch(ChannelStateWriteRequest request) {
                throw testException;
            }
        };
        ArrayDeque<TestWriteRequest> deque = new ArrayDeque<TestWriteRequest>(Collections.singletonList(new TestWriteRequest(JOB_VERTEX_ID, 0)));
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl((ChannelStateWriteRequestDispatcher)throwingRequestProcessor, deque, 5, e -> {});
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        worker.run();
        try {
            worker.releaseSubtask(JOB_VERTEX_ID, 0);
        }
        catch (IOException e2) {
            if (ExceptionUtils.findThrowable((Throwable)e2, TestException.class).filter(found -> found == testException).isPresent()) {
                return;
            }
            throw e2;
        }
        Assertions.fail((String)"exception not thrown");
    }

    @Test
    void testSubmitRequestOfUnregisteredSubtask() throws Exception {
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, 5, e -> {});
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        worker.start();
        worker.submit((ChannelStateWriteRequest)new TestWriteRequest(JOB_VERTEX_ID, 0));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> worker.submit((ChannelStateWriteRequest)new TestWriteRequest(new JobVertexID(), 0))).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("is not yet registered.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> worker.submitPriority((ChannelStateWriteRequest)new TestWriteRequest(new JobVertexID(), 0))).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("is not yet registered.");
    }

    @Test
    void testSubmitPriorityUnreadyRequest() throws Exception {
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, 5, e -> {});
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        worker.start();
        worker.submitPriority((ChannelStateWriteRequest)new TestWriteRequest(JOB_VERTEX_ID, 0));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> worker.submitPriority((ChannelStateWriteRequest)new TestWriteRequest(JOB_VERTEX_ID, 0, new CompletableFuture()))).isInstanceOf(IllegalStateException.class)).hasMessage("The priority request must be ready.");
    }

    @Test
    void testRegisterSubtaskAfterRegisterCompleted() throws Exception {
        int maxSubtasksPerChannelStateFile = 5;
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, maxSubtasksPerChannelStateFile, e -> {});
        for (int i = 0; i < maxSubtasksPerChannelStateFile; ++i) {
            Assertions.assertThat((boolean)worker.isRegistering()).isTrue();
            worker.registerSubtask(new JobVertexID(), 0);
        }
        Assertions.assertThat((boolean)worker.isRegistering()).isFalse();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> worker.registerSubtask(new JobVertexID(), 0)).isInstanceOf(IllegalStateException.class)).hasMessage("This executor has been registered.");
    }

    @Test
    void testSubmitStartRequestBeforeRegisterCompleted() throws Exception {
        final CompletableFuture dispatcherFuture = new CompletableFuture();
        TestRequestDispatcher dispatcher = new TestRequestDispatcher(){

            @Override
            public void dispatch(ChannelStateWriteRequest request) {
                if (request instanceof CheckpointStartRequest) {
                    dispatcherFuture.complete(null);
                }
            }
        };
        int maxSubtasksPerChannelStateFile = 5;
        CompletableFuture workerFuture = new CompletableFuture();
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl((ChannelStateWriteRequestDispatcher)dispatcher, maxSubtasksPerChannelStateFile, workerFuture::complete);
        worker.start();
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        Assertions.assertThat((boolean)worker.isRegistering()).isTrue();
        worker.submit(ChannelStateWriteRequest.start((JobVertexID)JOB_VERTEX_ID, (int)0, (long)1L, (ChannelStateWriter.ChannelStateWriteResult)new ChannelStateWriter.ChannelStateWriteResult(), (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        dispatcherFuture.get();
        Assertions.assertThat((boolean)worker.isRegistering()).isFalse();
        Assertions.assertThat(workerFuture).isCompletedWithValue((Object)worker);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> worker.registerSubtask(new JobVertexID(), 0)).isInstanceOf(IllegalStateException.class)).hasMessage("This executor has been registered.");
    }

    @Test
    void testReleaseSubtaskBeforeRegisterCompleted() throws Exception {
        int maxSubtasksPerChannelStateFile = 5;
        CompletableFuture workerFuture = new CompletableFuture();
        ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl((ChannelStateWriteRequestDispatcher)new TestRequestDispatcher(), maxSubtasksPerChannelStateFile, workerFuture::complete);
        worker.start();
        worker.registerSubtask(JOB_VERTEX_ID, 0);
        Assertions.assertThat((boolean)worker.isRegistering()).isTrue();
        worker.releaseSubtask(JOB_VERTEX_ID, 0);
        Assertions.assertThat((boolean)worker.isRegistering()).isFalse();
        Assertions.assertThat(workerFuture).isCompletedWithValue((Object)worker);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> worker.registerSubtask(new JobVertexID(), 0)).isInstanceOf(IllegalStateException.class)).hasMessage("This executor has been registered.");
    }

    private static class TestRequestDispatcher
    implements ChannelStateWriteRequestDispatcher {
        private boolean isStopped;

        private TestRequestDispatcher() {
        }

        public void dispatch(ChannelStateWriteRequest request) {
        }

        public void fail(Throwable cause) {
            this.isStopped = true;
        }

        public boolean isStopped() {
            return this.isStopped;
        }
    }

    private static class WorkerClosingDeque
    extends ArrayDeque<ChannelStateWriteRequest> {
        private ChannelStateWriteRequestExecutor worker;

        private WorkerClosingDeque() {
        }

        @Override
        public boolean add(@Nonnull ChannelStateWriteRequest request) {
            boolean add = super.add(request);
            try {
                this.worker.releaseSubtask(JOB_VERTEX_ID, 0);
            }
            catch (IOException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
            return add;
        }

        @Override
        public void addFirst(@Nonnull ChannelStateWriteRequest request) {
            super.addFirst(request);
            try {
                this.worker.releaseSubtask(JOB_VERTEX_ID, 0);
            }
            catch (IOException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
        }

        public void setWorker(ChannelStateWriteRequestExecutor worker) {
            this.worker = worker;
        }
    }

    private static class TestWriteRequest
    extends ChannelStateWriteRequest {
        private boolean cancelled = false;
        @Nullable
        private final CompletableFuture<?> readyFuture;

        public TestWriteRequest(JobVertexID jobVertexID, int subtaskIndex) {
            this(jobVertexID, subtaskIndex, null);
        }

        public TestWriteRequest(JobVertexID jobVertexID, int subtaskIndex, @Nullable CompletableFuture<?> readyFuture) {
            super(jobVertexID, subtaskIndex, 0L, "Test");
            this.readyFuture = readyFuture;
        }

        public void cancel(Throwable cause) {
            this.cancelled = true;
        }

        public boolean isCancelled() {
            return this.cancelled;
        }

        public CompletableFuture<?> getReadyFuture() {
            if (this.readyFuture != null) {
                return this.readyFuture;
            }
            return super.getReadyFuture();
        }
    }
}

