/*
 * Decompiled with CFR 0.152.
 */
package org.newsclub.net.unix;

import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
import com.kohlschutter.testutil.TestAsyncUtil;
import com.kohlschutter.testutil.TestStackTraceUtil;
import com.kohlschutter.util.SystemPropertyUtil;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.PortUnreachableException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.time.Duration;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.platform.commons.JUnitException;
import org.newsclub.net.unix.AFSocketCapability;
import org.newsclub.net.unix.AFSocketCapabilityRequirement;
import org.newsclub.net.unix.AddressSpecifics;
import org.newsclub.net.unix.SocketTestBase;
import org.newsclub.net.unix.TestUtil;
import org.opentest4j.AssertionFailedError;

/*
 * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
 */
@TestMethodOrder(value=MethodOrderer.MethodName.class)
@SuppressFBWarnings(value={"THROWS_METHOD_THROWS_CLAUSE_THROWABLE", "THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION"})
public abstract class ThroughputTest<A extends SocketAddress>
extends SocketTestBase<A> {
    protected static final int ENABLED = SystemPropertyUtil.getIntSystemProperty((String)"org.newsclub.net.unix.throughput-test.enabled", (int)1);
    protected static final int PAYLOAD_SIZE = SystemPropertyUtil.getIntSystemProperty((String)"org.newsclub.net.unix.throughput-test.payload-size", (int)2048);
    protected static final int PAYLOAD_SIZE_DATAGRAM = SystemPropertyUtil.getIntSystemProperty((String)"org.newsclub.net.unix.throughput-test.payload-size.datagram", (int)PAYLOAD_SIZE);
    protected static final int NUM_SECONDS = SystemPropertyUtil.getIntSystemProperty((String)"org.newsclub.net.unix.throughput-test.seconds", (int)0);
    protected static final int GRACE_TIME_NUM_SECONDS = SystemPropertyUtil.getIntSystemProperty((String)"org.newsclub.net.unix.throughput-test.gracetime.seconds", (int)5);
    protected static final int NUM_MILLISECONDS = Math.max(50, NUM_SECONDS * 1000);

    protected ThroughputTest(AddressSpecifics<A> asp) {
        super(asp);
    }

    private static byte[] createTestData(int size) {
        byte[] buf = new byte[size];
        for (int i = 0; i < buf.length; ++i) {
            buf[i] = (byte)(i % 256);
        }
        return buf;
    }

    private static void reportResults(String testType, String s) {
        if (NUM_SECONDS == 0) {
            return;
        }
        System.out.println("ThroughputTest (" + testType + "): " + s);
    }

    @Test
    public void testSocket() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(NUM_SECONDS + GRACE_TIME_NUM_SECONDS), () -> {
            try (SocketTestBase.ServerThread serverThread = new SocketTestBase.ServerThread(){

                @Override
                protected void handleConnection(Socket sock) throws IOException {
                    byte[] buf = new byte[PAYLOAD_SIZE];
                    try (InputStream inputStream = sock.getInputStream();
                         OutputStream outputStream = sock.getOutputStream();){
                        int read;
                        while ((read = inputStream.read(buf)) >= 0) {
                            outputStream.write(buf, 0, read);
                        }
                    }
                }
            };){
                AtomicBoolean keepRunning = new AtomicBoolean(true);
                TestAsyncUtil.runAsyncDelayed((long)NUM_MILLISECONDS, (TimeUnit)TimeUnit.MILLISECONDS, () -> keepRunning.set(false));
                try (Socket sock = this.connectTo(serverThread.getServerAddress());){
                    byte[] buf = ThroughputTest.createTestData(PAYLOAD_SIZE);
                    try (InputStream inputStream = sock.getInputStream();
                         OutputStream outputStream = sock.getOutputStream();){
                        long readTotal = 0L;
                        long time = System.currentTimeMillis();
                        while (keepRunning.get()) {
                            int remaining;
                            block27: {
                                outputStream.write(buf);
                                remaining = buf.length;
                                int offset = 0;
                                try {
                                    int read;
                                    while (remaining > 0 && (read = inputStream.read(buf, offset, remaining)) >= 0) {
                                        if (read <= 0) continue;
                                        remaining -= read;
                                        offset += read;
                                        readTotal += (long)read;
                                    }
                                }
                                catch (SocketTimeoutException e) {
                                    if (!keepRunning.get()) break block27;
                                    throw e;
                                }
                            }
                            Assertions.assertEquals((int)0, (int)remaining);
                        }
                        time = System.currentTimeMillis() - time;
                        ThroughputTest.reportResults(this.stbTestType() + " byte[]", 1000.0f * (float)readTotal / (float)time / 1000.0f / 1000.0f + " MB/s for payload size " + PAYLOAD_SIZE);
                    }
                }
            }
        });
    }

    @Test
    public void testSocketChannel() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(NUM_SECONDS + GRACE_TIME_NUM_SECONDS), () -> this.runtestSocketChannel(false));
    }

    @Test
    public void testSocketChannelDirectBuffer() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(NUM_SECONDS + GRACE_TIME_NUM_SECONDS), () -> this.runtestSocketChannel(true));
    }

    private void runtestSocketChannel(boolean direct) throws Exception {
        SelectorProvider sp = this.selectorProvider();
        ServerSocketChannel ssc = sp.openServerSocketChannel();
        this.runTestSocketChannel(this.stbTestType() + " SocketChannel", this.getServerBindAddress(), ssc, () -> {
            SocketChannel sc = sp.openSocketChannel();
            this.connectSocket(sc, ssc.getLocalAddress());
            return sc;
        }, direct);
    }

    @Override
    protected boolean shouldDoConnectionCheckUponAccept() {
        return true;
    }

    protected void runTestSocketChannel(String implId, final SocketAddress sba, final ServerSocketChannel ssc, SupplierWithException<SocketChannel, IOException> sscSupp, final boolean direct) throws Exception {
        final AtomicBoolean keepRunning = new AtomicBoolean(true);
        try (SocketTestBase.ServerThread unused = new SocketTestBase.ServerThread(){

            @Override
            protected ServerSocket startServer() throws IOException {
                ThroughputTest.this.bindServerSocket(ssc, sba);
                return null;
            }

            @Override
            public void shutdown() throws IOException {
                super.shutdown();
                ssc.close();
            }

            @Override
            protected void onServerSocketClose() {
                keepRunning.set(false);
                super.onServerSocketClose();
            }

            @Override
            protected void acceptAndHandleConnection() throws IOException {
                ByteBuffer bb = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE) : ByteBuffer.allocate(PAYLOAD_SIZE);
                try (SocketChannel sc = ssc.accept();){
                    try {
                        bb.clear();
                        while (sc.read(bb) >= 0) {
                            bb.flip();
                            sc.write(bb);
                            bb.clear();
                        }
                    }
                    catch (SocketException | SocketTimeoutException e) {
                        if (keepRunning.get()) {
                            throw e;
                        }
                    }
                }
            }

            @Override
            protected void handleConnection(Socket sock) throws IOException {
                throw new IllegalStateException();
            }
        };){
            TestAsyncUtil.runAsyncDelayed((long)NUM_MILLISECONDS, (TimeUnit)TimeUnit.MILLISECONDS, () -> keepRunning.set(false));
            try (SocketChannel sc = sscSupp.get();){
                ByteBuffer bb = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE) : ByteBuffer.allocate(PAYLOAD_SIZE);
                long readTotal = 0L;
                long time = System.currentTimeMillis();
                while (keepRunning.get()) {
                    long read;
                    bb.clear();
                    bb.put(ThroughputTest.createTestData(PAYLOAD_SIZE));
                    bb.flip();
                    long remaining = sc.write(bb);
                    bb.clear();
                    while (remaining > 0L && (read = (long)sc.read(bb)) >= 0L) {
                        remaining -= read;
                        readTotal += read;
                        bb.clear();
                    }
                    Assertions.assertEquals((long)0L, (long)remaining);
                }
                time = System.currentTimeMillis() - time;
                ThroughputTest.reportResults(implId + " direct=" + direct, 1000.0f * (float)readTotal / (float)time / 1000.0f / 1000.0f + " MB/s for payload size " + PAYLOAD_SIZE);
            }
        }
    }

    @AFSocketCapabilityRequirement(value={AFSocketCapability.CAPABILITY_UNIX_DATAGRAMS})
    @Test
    public void testDatagramPacket() throws Exception {
        try {
            Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(NUM_SECONDS + GRACE_TIME_NUM_SECONDS), () -> {
                SocketAddress dsAddr = this.newTempAddressForDatagram();
                SocketAddress dcAddr = this.newTempAddressForDatagram();
                try (final DatagramSocket ds = this.newDatagramSocket();
                     DatagramSocket dc = this.newDatagramSocket();){
                    if (!ds.isBound()) {
                        ds.bind(dsAddr);
                    }
                    if (!dc.isBound()) {
                        dc.bind(dcAddr);
                    }
                    dsAddr = ds.getLocalSocketAddress();
                    dcAddr = dc.getLocalSocketAddress();
                    Assertions.assertNotEquals((Object)dsAddr, (Object)dcAddr);
                    dc.connect(dsAddr);
                    final AtomicBoolean keepRunning = new AtomicBoolean(true);
                    TestAsyncUtil.runAsyncDelayed((long)NUM_MILLISECONDS, (TimeUnit)TimeUnit.MILLISECONDS, () -> keepRunning.set(false));
                    final AtomicLong readTotal = new AtomicLong();
                    long sentTotal = 0L;
                    new Thread(){
                        final DatagramPacket dp = new DatagramPacket(new byte[PAYLOAD_SIZE_DATAGRAM], PAYLOAD_SIZE_DATAGRAM);

                        @Override
                        public void run() {
                            try {
                                while (!Thread.interrupted() && !ds.isClosed()) {
                                    try {
                                        ds.receive(this.dp);
                                    }
                                    catch (SocketTimeoutException e) {
                                        continue;
                                    }
                                    int read = this.dp.getLength();
                                    if (read != PAYLOAD_SIZE_DATAGRAM && read != 0) {
                                        throw new IOException("Unexpected response length: " + read);
                                    }
                                    readTotal.addAndGet(this.dp.getLength());
                                }
                            }
                            catch (SocketException e) {
                                if (keepRunning.get()) {
                                    TestStackTraceUtil.printStackTrace((Throwable)e);
                                }
                            }
                            catch (IOException e) {
                                TestStackTraceUtil.printStackTrace((Throwable)e);
                            }
                        }
                    }.start();
                    long time = System.currentTimeMillis();
                    DatagramPacket dp = new DatagramPacket(new byte[PAYLOAD_SIZE_DATAGRAM], PAYLOAD_SIZE_DATAGRAM);
                    byte[] data = dp.getData();
                    for (int i = 0; i < data.length; ++i) {
                        data[i] = (byte)i;
                    }
                    while (keepRunning.get()) {
                        try {
                            dc.send(dp);
                        }
                        catch (PortUnreachableException e) {
                            e.addSuppressed(new Exception(dp.getSocketAddress().toString()));
                            throw e;
                        }
                        sentTotal += (long)PAYLOAD_SIZE_DATAGRAM;
                    }
                    time = System.currentTimeMillis() - time;
                    keepRunning.set(false);
                    ds.close();
                    long readTotal0 = readTotal.get();
                    ThroughputTest.reportResults(this.stbTestType() + " DatagramPacket", 1000.0f * (float)readTotal0 / (float)time / 1000.0f / 1000.0f + " MB/s for datagram payload size " + PAYLOAD_SIZE_DATAGRAM + "; " + String.format(Locale.ENGLISH, "%.1f%% packet loss", Float.valueOf(100.0f * (1.0f - (float)readTotal0 / (float)sentTotal))));
                }
            });
        }
        catch (JUnitException e) {
            TestStackTraceUtil.printStackTrace((Throwable)e);
        }
    }

    @Test
    @AFSocketCapabilityRequirement(value={AFSocketCapability.CAPABILITY_UNIX_DATAGRAMS})
    public void testDatagramChannel() throws Exception {
        try {
            Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(NUM_SECONDS + GRACE_TIME_NUM_SECONDS), () -> this.testDatagramChannel(false, true));
        }
        catch (AssertionFailedError e) {
            if (TestUtil.isHaikuOS()) {
                throw TestUtil.haikuBug18535(e);
            }
            throw e;
        }
        catch (JUnitException e) {
            TestStackTraceUtil.printStackTrace((Throwable)e);
        }
    }

    @Test
    @AFSocketCapabilityRequirement(value={AFSocketCapability.CAPABILITY_UNIX_DATAGRAMS})
    public void testDatagramChannelDirect() throws Exception {
        try {
            Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(NUM_SECONDS + GRACE_TIME_NUM_SECONDS), () -> this.testDatagramChannel(true, true));
        }
        catch (AssertionFailedError e) {
            if (TestUtil.isHaikuOS()) {
                throw TestUtil.haikuBug18535(e);
            }
            throw e;
        }
        catch (JUnitException e) {
            TestStackTraceUtil.printStackTrace((Throwable)e);
        }
    }

    @Test
    @AFSocketCapabilityRequirement(value={AFSocketCapability.CAPABILITY_UNIX_DATAGRAMS})
    public void testDatagramChannelNonBlocking() throws Exception {
        try {
            Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(NUM_SECONDS + GRACE_TIME_NUM_SECONDS), () -> this.testDatagramChannel(false, false));
        }
        catch (JUnitException e) {
            TestStackTraceUtil.printStackTrace((Throwable)e);
        }
    }

    @Test
    @AFSocketCapabilityRequirement(value={AFSocketCapability.CAPABILITY_UNIX_DATAGRAMS})
    public void testDatagramChannelNonBlockingDirect() throws Exception {
        try {
            Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(NUM_SECONDS + GRACE_TIME_NUM_SECONDS), () -> this.testDatagramChannel(true, false));
        }
        catch (JUnitException e) {
            TestStackTraceUtil.printStackTrace((Throwable)e);
        }
    }

    private void testDatagramChannel(boolean direct, boolean blocking) throws Exception {
        SocketAddress dsAddr = this.newTempAddressForDatagram();
        SocketAddress dcAddr = this.newTempAddressForDatagram();
        try (DatagramChannel ds = this.newDatagramChannel();
             DatagramChannel dc = this.newDatagramChannel();){
            SelectorProvider sp;
            if (!ds.socket().isBound()) {
                ds.bind(dsAddr);
                dsAddr = ds.getLocalAddress();
            }
            if (!dc.socket().isBound()) {
                dc.bind(dcAddr);
                dcAddr = dc.getLocalAddress();
            }
            dc.connect(dsAddr);
            ds.connect(dcAddr);
            if (blocking) {
                sp = null;
            } else {
                sp = this.selectorProvider();
                dc.configureBlocking(false);
                ds.configureBlocking(false);
            }
            this.testSocketDatagramChannel(this.stbTestType() + " DatagramChannel", ds, dc, sp, direct, blocking);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void testSocketDatagramChannel(String id, final DatagramChannel ds, DatagramChannel dc, SelectorProvider sp, final boolean direct, boolean blocking) throws IOException {
        long time;
        final AtomicBoolean keepRunning = new AtomicBoolean(true);
        TestAsyncUtil.runAsyncDelayed((long)NUM_MILLISECONDS, (TimeUnit)TimeUnit.MILLISECONDS, () -> {
            keepRunning.set(false);
            try {
                ds.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        final AtomicLong readTotal = new AtomicLong();
        long sentTotal = 0L;
        final CompletableFuture bytesRead = new CompletableFuture();
        try (final AbstractSelector readSelector = sp == null ? null : sp.openSelector();){
            new Thread(){

                @Override
                public void run() {
                    ByteBuffer receiveBuffer = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE_DATAGRAM) : ByteBuffer.allocate(PAYLOAD_SIZE_DATAGRAM);
                    try {
                        SelectionKey key = readSelector != null ? ds.register(readSelector, 1) : null;
                        while (!Thread.interrupted() && keepRunning.get() && !bytesRead.isCancelled()) {
                            if (readSelector != null) {
                                int numReady = readSelector.select();
                                if (numReady == 0) continue;
                                Assertions.assertEquals((int)1, (int)numReady);
                                Objects.requireNonNull(key);
                            }
                            int read = ds.read(receiveBuffer);
                            receiveBuffer.rewind();
                            if (read != PAYLOAD_SIZE_DATAGRAM && read != 0 && read != -1) {
                                throw new IOException("Unexpected response length: " + read);
                            }
                            readTotal.addAndGet(read);
                        }
                        bytesRead.complete(readTotal.get());
                    }
                    catch (SocketException | ClosedChannelException e) {
                        if (keepRunning.get()) {
                            keepRunning.set(false);
                            bytesRead.completeExceptionally(e);
                        } else {
                            bytesRead.complete(readTotal.get());
                        }
                    }
                    catch (Exception e) {
                        keepRunning.set(false);
                        bytesRead.completeExceptionally(e);
                    }
                }
            }.start();
            time = System.currentTimeMillis();
            ByteBuffer sendBuffer = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE_DATAGRAM) : ByteBuffer.allocate(PAYLOAD_SIZE_DATAGRAM);
            try (AbstractSelector writeSelector = sp == null ? null : sp.openSelector();){
                if (sp != null) {
                    dc.register(writeSelector, 4);
                }
                while (keepRunning.get()) {
                    int written;
                    if (writeSelector != null) {
                        int numReady = writeSelector.select();
                        if (numReady == 0) continue;
                        Assertions.assertEquals((int)1, (int)numReady);
                    }
                    try {
                        written = dc.write(sendBuffer);
                    }
                    catch (SocketException e) {
                        if (keepRunning.get()) {
                            throw e;
                        }
                        written = 0;
                    }
                    if (written != PAYLOAD_SIZE_DATAGRAM && written != 0) {
                        throw new IOException("Unexpected written length: " + written);
                    }
                    sentTotal += (long)PAYLOAD_SIZE_DATAGRAM;
                    sendBuffer.rewind();
                }
            }
            finally {
                time = System.currentTimeMillis() - time;
                keepRunning.set(false);
                ds.close();
            }
        }
        try {
            bytesRead.get(NUM_MILLISECONDS, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            if (NUM_SECONDS != 0) {
                TestStackTraceUtil.printStackTrace((Throwable)e);
            }
        }
        catch (InterruptedException | ExecutionException e) {
            TestStackTraceUtil.printStackTrace((Throwable)e);
        }
        long readTotal0 = readTotal.get();
        ThroughputTest.reportResults(id + " direct=" + direct + ";blocking=" + blocking, 1000.0f * (float)readTotal0 / (float)time / 1000.0f / 1000.0f + " MB/s for datagram payload size " + PAYLOAD_SIZE_DATAGRAM + "; " + String.format(Locale.ENGLISH, "%.1f%% packet loss", Float.valueOf(100.0f * (1.0f - (float)readTotal0 / (float)sentTotal))));
    }

    protected String stbTestType() {
        return "junixsocket";
    }

    /*
     * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
     */
    @FunctionalInterface
    protected static interface SupplierWithException<T, E extends Exception> {
        public T get() throws E;
    }
}

