/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.RequestQueue;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class AsynchronousBufferFileWriterTest {
    private static final IOManager ioManager = new IOManagerAsync();
    private static final Buffer mockBuffer = (Buffer)Mockito.mock(Buffer.class);
    private AsynchronousBufferFileWriter writer;

    AsynchronousBufferFileWriterTest() {
    }

    @AfterAll
    static void shutdown() throws Exception {
        ioManager.close();
    }

    @BeforeEach
    void setUp() throws IOException {
        this.writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue());
    }

    @Test
    void testAddAndHandleRequest() throws Exception {
        this.addRequest();
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.writer.getNumberOfOutstandingRequests()).withFailMessage("Didn't increment number of outstanding requests.", new Object[0])).isOne();
        this.handleRequest();
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.writer.getNumberOfOutstandingRequests()).withFailMessage("Didn't decrement number of outstanding requests.", new Object[0])).isZero();
    }

    @Test
    void testAddWithFailingWriter() throws Exception {
        AsynchronousBufferFileWriter writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue());
        writer.close();
        NetworkBuffer buffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        Assertions.assertThatThrownBy(() -> AsynchronousBufferFileWriterTest.lambda$testAddWithFailingWriter$0(writer, (Buffer)buffer)).isInstanceOf(IOException.class);
        if (!buffer.isRecycled()) {
            buffer.recycleBuffer();
            Assertions.fail((String)"buffer not recycled");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)writer.getNumberOfOutstandingRequests()).withFailMessage("Shouldn't increment number of outstanding requests.", new Object[0])).isZero();
    }

    @Test
    void testSubscribe() throws Exception {
        TestNotificationListener listener = new TestNotificationListener();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.writer.registerAllRequestsProcessedListener((NotificationListener)listener)).withFailMessage("Allowed to subscribe w/o any outstanding requests.", new Object[0])).isFalse();
        this.addRequest();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.writer.registerAllRequestsProcessedListener((NotificationListener)listener)).withFailMessage("Didn't allow to subscribe.", new Object[0])).isTrue();
        this.handleRequest();
        ((AbstractIntegerAssert)Assertions.assertThat((int)listener.getNumberOfNotifications()).withFailMessage("Listener was not notified.", new Object[0])).isOne();
    }

    @Test
    void testSubscribeAndClose() throws Exception {
        TestNotificationListener listener = new TestNotificationListener();
        this.addRequest();
        this.addRequest();
        this.writer.registerAllRequestsProcessedListener((NotificationListener)listener);
        CheckedThread asyncCloseThread = new CheckedThread(){

            public void go() throws Exception {
                AsynchronousBufferFileWriterTest.this.writer.close();
            }
        };
        asyncCloseThread.start();
        this.handleRequest();
        this.handleRequest();
        asyncCloseThread.sync();
        ((AbstractIntegerAssert)Assertions.assertThat((int)listener.getNumberOfNotifications()).withFailMessage("Listener was not notified.", new Object[0])).isOne();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentSubscribeAndHandleRequest() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        TestNotificationListener listener = new TestNotificationListener();
        Callable<Boolean> subscriber = () -> this.writer.registerAllRequestsProcessedListener((NotificationListener)listener);
        Callable<Void> requestHandler = () -> {
            this.handleRequest();
            return null;
        };
        try {
            for (int i = 0; i < 50000; ++i) {
                listener.reset();
                this.addRequest();
                Future<Void> handleRequestFuture = executor.submit(requestHandler);
                Future<Boolean> subscribeFuture = executor.submit(subscriber);
                handleRequestFuture.get();
                boolean subscribed = subscribeFuture.get();
                ((AbstractIntegerAssert)Assertions.assertThat((int)listener.getNumberOfNotifications()).withFailMessage(subscribed ? "Race: Successfully subscribed, but was never notified." : "Race: Never subscribed successfully, but was notified.", new Object[0])).isEqualTo(subscribed ? 1 : 0);
            }
        }
        finally {
            executor.shutdownNow();
        }
    }

    private void addRequest() throws IOException {
        this.writer.writeBlock(mockBuffer);
    }

    private void handleRequest() {
        this.writer.handleProcessedBuffer((Object)mockBuffer, null);
    }

    private static /* synthetic */ void lambda$testAddWithFailingWriter$0(AsynchronousBufferFileWriter writer, Buffer buffer) throws Throwable {
        writer.writeBlock(buffer);
    }
}

