/*
 * Decompiled with CFR 0.152.
 */
package org.newsclub.net.unix;

import com.kohlschutter.testutil.AvailabilityRequirement;
import com.kohlschutter.testutil.SystemPropertyRequirement;
import com.kohlschutter.util.SystemPropertyUtil;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.newsclub.net.unix.AFUNIXDatagramChannel;
import org.newsclub.net.unix.AFUNIXDatagramSocket;
import org.newsclub.net.unix.AFUNIXSelectorProvider;
import org.newsclub.net.unix.AFUNIXServerSocket;
import org.newsclub.net.unix.AFUNIXServerSocketChannel;
import org.newsclub.net.unix.AFUNIXSocket;
import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketCapability;
import org.newsclub.net.unix.AFUNIXSocketCapabilityRequirement;
import org.newsclub.net.unix.SocketTestBase;

/*
 * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
 */
@TestMethodOrder(value=MethodOrderer.MethodName.class)
public class ThroughputTest
extends SocketTestBase {
    private static final int ENABLED = SystemPropertyUtil.getIntSystemProperty((String)"org.newsclub.net.unix.throughput-test.enabled", (int)1);
    private static final int PAYLOAD_SIZE = SystemPropertyUtil.getIntSystemProperty((String)"org.newsclub.net.unix.throughput-test.payload-size", (int)2048);
    private static final int NUM_SECONDS = SystemPropertyUtil.getIntSystemProperty((String)"org.newsclub.net.unix.throughput-test.seconds", (int)0);
    private static final int NUM_MILLISECONDS = Math.max(50, NUM_SECONDS * 1000);

    private static byte[] createTestData(int size) {
        byte[] buf = new byte[size];
        for (int i = 0; i < buf.length; ++i) {
            buf[i] = (byte)(i % 256);
        }
        return buf;
    }

    private static void reportResults(String testType, String s) {
        if (NUM_SECONDS == 0) {
            return;
        }
        System.out.println("ThroughputTest (" + testType + "): " + s);
    }

    @Test
    public void testJUnixSocket() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        try (SocketTestBase.ServerThread serverThread = new SocketTestBase.ServerThread(){

            @Override
            protected void handleConnection(AFUNIXSocket sock) throws IOException {
                byte[] buf = new byte[PAYLOAD_SIZE];
                try (InputStream inputStream = sock.getInputStream();
                     OutputStream outputStream = sock.getOutputStream();){
                    int read;
                    while ((read = inputStream.read(buf)) >= 0) {
                        outputStream.write(buf, 0, read);
                    }
                }
            }
        };){
            AtomicBoolean keepRunning = new AtomicBoolean(true);
            Executors.newSingleThreadScheduledExecutor().schedule(() -> keepRunning.set(false), (long)NUM_MILLISECONDS, TimeUnit.MILLISECONDS);
            try (AFUNIXSocket sock = this.connectToServer();){
                byte[] buf = ThroughputTest.createTestData(PAYLOAD_SIZE);
                try (InputStream inputStream = sock.getInputStream();
                     OutputStream outputStream = sock.getOutputStream();){
                    long readTotal = 0L;
                    long time = System.currentTimeMillis();
                    while (keepRunning.get()) {
                        int read;
                        outputStream.write(buf);
                        int remaining = buf.length;
                        int offset = 0;
                        while (keepRunning.get() && remaining > 0 && (read = inputStream.read(buf, offset, remaining)) >= 0) {
                            if (read <= 0) continue;
                            int pos = ThreadLocalRandom.current().nextInt(read) + offset;
                            if ((buf[pos] & 0xFF) != pos % 256) {
                                throw new IllegalStateException("Unexpected response from read: value@pos " + pos + "=" + (buf[pos] & 0xFF) + " != " + pos % 256);
                            }
                            remaining -= read;
                            offset += read;
                            readTotal += (long)read;
                        }
                    }
                    time = System.currentTimeMillis() - time;
                    ThroughputTest.reportResults("junixsocket byte[]", 1000.0f * (float)readTotal / (float)time / 1000.0f / 1000.0f + " MB/s for payload size " + PAYLOAD_SIZE);
                }
            }
        }
    }

    @Test
    public void testJUnixSocketChannel() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        this.runTestJUnixSocketChannel(false);
    }

    @Test
    public void testJUnixSocketChannelDirectBuffer() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        this.runTestJUnixSocketChannel(true);
    }

    @Test
    @AvailabilityRequirement(classes={"java.net.UnixDomainSocketAddress"}, message="This test requires Java 16 or later")
    public void testJEP380() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        this.runTestJEP380(false);
    }

    @Test
    @AvailabilityRequirement(classes={"java.net.UnixDomainSocketAddress"}, message="This test requires Java 16 or later")
    public void testJEP380directBuffer() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        this.runTestJEP380(true);
    }

    private static SocketAddress jep380SocketAddress(String path) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException, SecurityException {
        try {
            Class<?> klazz = Class.forName("java.net.UnixDomainSocketAddress");
            return (SocketAddress)klazz.getMethod("of", String.class).invoke(null, path);
        }
        catch (ClassNotFoundException | NoSuchMethodException e) {
            Assumptions.assumeTrue((boolean)false, (String)"java.net.UnixDomainSocketAddress (JEP 380) not supported by JVM");
            return null;
        }
    }

    private void runTestJEP380(boolean direct) throws Exception {
        ServerSocketChannel ssc;
        SocketAddress sa = ThroughputTest.jep380SocketAddress(this.getServerAddress().getPath());
        try {
            ssc = (ServerSocketChannel)ServerSocketChannel.class.getMethod("open", ProtocolFamily.class).invoke(null, StandardProtocolFamily.valueOf("UNIX"));
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
        this.runTestSocketChannel("JEP380 SocketChannel", sa, ssc, () -> SocketChannel.open(sa), direct);
    }

    @Test
    @SystemPropertyRequirement(property="org.newsclub.net.unix.throughput-test.ip.enabled", value="1", message="Loopback TCP/IP testing is disabled")
    public void testTCPLoopback() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        this.runTestTCPLoopback(false);
    }

    @Test
    @SystemPropertyRequirement(property="org.newsclub.net.unix.throughput-test.ip.enabled", value="1", message="Loopback TCP/IP testing is disabled")
    public void testTCPLoopbackDirectBuffer() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        this.runTestTCPLoopback(true);
    }

    private void runTestTCPLoopback(boolean direct) throws Exception {
        InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        this.runTestSocketChannel("TCP-Loopback", sa, ssc, () -> SocketChannel.open(ssc.getLocalAddress()), direct);
    }

    private void runTestJUnixSocketChannel(boolean direct) throws Exception {
        AFUNIXSocketAddress sa = this.getServerAddress();
        AFUNIXSelectorProvider sp = AFUNIXSelectorProvider.getInstance();
        AFUNIXServerSocketChannel ssc = sp.openServerSocketChannel();
        this.runTestSocketChannel("junixsocket SocketChannel", (SocketAddress)sa, (ServerSocketChannel)ssc, () -> ThroughputTest.lambda$runTestJUnixSocketChannel$3(sp, (SocketAddress)sa), direct);
    }

    private void runTestSocketChannel(String implId, final SocketAddress sa, final ServerSocketChannel ssc, SupplierWithException<SocketChannel, IOException> sscSupp, final boolean direct) throws Exception {
        final AtomicBoolean keepRunning = new AtomicBoolean(true);
        try (SocketTestBase.ServerThread serverThread = new SocketTestBase.ServerThread(){

            @Override
            protected AFUNIXServerSocket startServer() throws IOException {
                ssc.bind(sa);
                return null;
            }

            @Override
            public void shutdown() throws IOException {
                super.shutdown();
                ssc.close();
            }

            @Override
            protected void onServerSocketClose() {
                keepRunning.set(false);
                super.onServerSocketClose();
            }

            @Override
            protected void acceptAndHandleConnection() throws IOException {
                ByteBuffer bb = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE) : ByteBuffer.allocate(PAYLOAD_SIZE);
                try (SocketChannel sc = ssc.accept();){
                    try {
                        while (sc.read(bb) >= 0) {
                            bb.flip();
                            sc.write(bb);
                            bb.flip();
                        }
                    }
                    catch (SocketException e) {
                        if (keepRunning.get()) {
                            throw e;
                        }
                    }
                }
            }

            @Override
            protected void handleConnection(AFUNIXSocket sock) throws IOException {
                throw new IllegalStateException();
            }
        };){
            Executors.newSingleThreadScheduledExecutor().schedule(() -> keepRunning.set(false), (long)NUM_MILLISECONDS, TimeUnit.MILLISECONDS);
            try (SocketChannel sc = sscSupp.get();){
                ByteBuffer bb = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE) : ByteBuffer.allocate(PAYLOAD_SIZE);
                bb.put(ThroughputTest.createTestData(PAYLOAD_SIZE));
                bb.flip();
                long readTotal = 0L;
                long time = System.currentTimeMillis();
                while (keepRunning.get()) {
                    long read;
                    int remaining = sc.write(bb);
                    bb.flip();
                    while (keepRunning.get() && remaining > 0 && (read = (long)sc.read(bb)) >= 0L) {
                        remaining = (int)((long)remaining - read);
                        readTotal += read;
                    }
                    int pos = ThreadLocalRandom.current().nextInt(bb.limit());
                    if ((bb.get(pos) & 0xFF) == pos % 256) continue;
                    throw new IllegalStateException("Unexpected response from read");
                }
                time = System.currentTimeMillis() - time;
                ThroughputTest.reportResults(implId + " direct=" + direct, 1000.0f * (float)readTotal / (float)time / 1000.0f / 1000.0f + " MB/s for payload size " + PAYLOAD_SIZE);
            }
        }
    }

    @AFUNIXSocketCapabilityRequirement(value={AFUNIXSocketCapability.CAPABILITY_DATAGRAMS})
    @Test
    public void testJUnixSocketDatagramPacket() throws Exception {
        AFUNIXSocketAddress dsAddr = AFUNIXSocketAddress.of((File)SocketTestBase.newTempFile());
        AFUNIXSocketAddress dcAddr = AFUNIXSocketAddress.of((File)SocketTestBase.newTempFile());
        Assertions.assertNotEquals((Object)dsAddr, (Object)dcAddr);
        try (final AFUNIXDatagramSocket ds = AFUNIXDatagramSocket.newInstance();
             AFUNIXDatagramSocket dc = AFUNIXDatagramSocket.newInstance();){
            ds.bind((SocketAddress)dsAddr);
            dc.bind((SocketAddress)dcAddr);
            dc.connect((SocketAddress)dsAddr);
            ds.setReceiveBufferSize(PAYLOAD_SIZE + 82);
            final AtomicBoolean keepRunning = new AtomicBoolean(true);
            Executors.newSingleThreadScheduledExecutor().schedule(() -> keepRunning.set(false), (long)NUM_MILLISECONDS, TimeUnit.MILLISECONDS);
            final AtomicLong readTotal = new AtomicLong();
            long sentTotal = 0L;
            new Thread(){
                final DatagramPacket dp = new DatagramPacket(new byte[ThroughputTest.access$000()], ThroughputTest.access$000());

                @Override
                public void run() {
                    try {
                        while (!Thread.interrupted()) {
                            ds.receive(this.dp);
                            int read = this.dp.getLength();
                            if (read != PAYLOAD_SIZE && read != 0) {
                                throw new IOException("Unexpected response length: " + read);
                            }
                            readTotal.addAndGet(this.dp.getLength());
                        }
                    }
                    catch (SocketException e) {
                        if (keepRunning.get()) {
                            e.printStackTrace();
                        }
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
            long time = System.currentTimeMillis();
            DatagramPacket dp = new DatagramPacket(new byte[PAYLOAD_SIZE], PAYLOAD_SIZE);
            byte[] data = dp.getData();
            for (int i = 0; i < data.length; ++i) {
                data[i] = (byte)i;
            }
            while (keepRunning.get()) {
                dc.send(dp);
                sentTotal += (long)PAYLOAD_SIZE;
            }
            time = System.currentTimeMillis() - time;
            ds.close();
            long readTotal0 = readTotal.get();
            ThroughputTest.reportResults("junixsocket DatagramPacket", 1000.0f * (float)readTotal0 / (float)time / 1000.0f / 1000.0f + " MB/s for payload size " + PAYLOAD_SIZE + "; " + String.format(Locale.ENGLISH, "%.1f%% packet loss", Float.valueOf(100.0f * (1.0f - (float)readTotal0 / (float)sentTotal))));
        }
    }

    @Test
    @AFUNIXSocketCapabilityRequirement(value={AFUNIXSocketCapability.CAPABILITY_DATAGRAMS})
    public void testJUnixSocketDatagramChannel() throws Exception {
        this.testJUnixSocketDatagramChannel(false);
    }

    @Test
    @AFUNIXSocketCapabilityRequirement(value={AFUNIXSocketCapability.CAPABILITY_DATAGRAMS})
    public void testJUnixSocketDatagramChannelDirect() throws Exception {
        this.testJUnixSocketDatagramChannel(true);
    }

    private void testJUnixSocketDatagramChannel(boolean direct) throws Exception {
        AFUNIXSocketAddress dsAddr = AFUNIXSocketAddress.of((File)SocketTestBase.newTempFile());
        AFUNIXSocketAddress dcAddr = AFUNIXSocketAddress.of((File)SocketTestBase.newTempFile());
        Assertions.assertNotEquals((Object)dsAddr, (Object)dcAddr);
        try (AFUNIXDatagramChannel ds = AFUNIXDatagramSocket.newInstance().getChannel();
             AFUNIXDatagramChannel dc = AFUNIXDatagramSocket.newInstance().getChannel();){
            ds.bind((SocketAddress)dsAddr);
            dc.bind((SocketAddress)dcAddr).connect((SocketAddress)dsAddr);
            this.testSocketDatagramChannel("junixsocket DatagramChannel", (DatagramChannel)ds, (DatagramChannel)dc, direct);
        }
    }

    @Test
    @SystemPropertyRequirement(property="org.newsclub.net.unix.throughput-test.ip.enabled", value="1", message="Loopback UDP/IP testing is disabled")
    public void testUDPLoopback() throws Exception {
        this.testUDPLoopbackDatagramChannel(false);
    }

    @Test
    @SystemPropertyRequirement(property="org.newsclub.net.unix.throughput-test.ip.enabled", value="1", message="Loopback UDP/IP testing is disabled")
    public void testUDPLoopbackDirectBuffer() throws Exception {
        this.testUDPLoopbackDatagramChannel(true);
    }

    private void testUDPLoopbackDatagramChannel(boolean direct) throws Exception {
        InetSocketAddress dsAddr = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
        InetSocketAddress dcAddr = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
        try (DatagramChannel ds = SelectorProvider.provider().openDatagramChannel();
             DatagramChannel dc = SelectorProvider.provider().openDatagramChannel();){
            ds.bind(dsAddr);
            dc.bind(dcAddr).connect(ds.getLocalAddress());
            ds.connect(dc.getLocalAddress());
            Assertions.assertNotEquals((Object)ds.getLocalAddress(), (Object)dc.getLocalAddress());
            this.testSocketDatagramChannel("UDP-Loopback DatagramChannel", ds, dc, direct);
        }
    }

    private void testSocketDatagramChannel(String id, final DatagramChannel ds, DatagramChannel dc, final boolean direct) throws IOException {
        ByteBuffer sendBuffer;
        ds.setOption((SocketOption)StandardSocketOptions.SO_RCVBUF, (Object)(PAYLOAD_SIZE + 82));
        final AtomicBoolean keepRunning = new AtomicBoolean(true);
        Executors.newSingleThreadScheduledExecutor().schedule(() -> keepRunning.set(false), (long)NUM_MILLISECONDS, TimeUnit.MILLISECONDS);
        final AtomicLong readTotal = new AtomicLong();
        long sentTotal = 0L;
        new Thread(){

            @Override
            public void run() {
                ByteBuffer receiveBuffer = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE) : ByteBuffer.allocate(PAYLOAD_SIZE);
                try {
                    while (!Thread.interrupted()) {
                        int read = ds.read(receiveBuffer);
                        receiveBuffer.rewind();
                        if (read != PAYLOAD_SIZE && read != 0) {
                            throw new IOException("Unexpected response length: " + read);
                        }
                        readTotal.addAndGet(read);
                    }
                }
                catch (SocketException | ClosedChannelException e) {
                    if (keepRunning.get()) {
                        e.printStackTrace();
                    }
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }.start();
        long time = System.currentTimeMillis();
        ByteBuffer byteBuffer = sendBuffer = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE) : ByteBuffer.allocate(PAYLOAD_SIZE);
        while (keepRunning.get()) {
            int written = dc.write(sendBuffer);
            if (written != PAYLOAD_SIZE && written != 0) {
                throw new IOException("Unexpected written length: " + written);
            }
            sentTotal += (long)PAYLOAD_SIZE;
            sendBuffer.rewind();
        }
        time = System.currentTimeMillis() - time;
        ds.close();
        long readTotal0 = readTotal.get();
        ThroughputTest.reportResults(id + " direct=" + direct, 1000.0f * (float)readTotal0 / (float)time / 1000.0f / 1000.0f + " MB/s for payload size " + PAYLOAD_SIZE + "; " + String.format(Locale.ENGLISH, "%.1f%% packet loss", Float.valueOf(100.0f * (1.0f - (float)readTotal0 / (float)sentTotal))));
    }

    private static /* synthetic */ SocketChannel lambda$runTestJUnixSocketChannel$3(AFUNIXSelectorProvider sp, SocketAddress sa) throws IOException {
        return sp.openSocketChannel(sa);
    }

    /*
     * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
     */
    @FunctionalInterface
    static interface SupplierWithException<T, E extends Exception> {
        public T get() throws E;
    }
}

