/*
 * Decompiled with CFR 0.152.
 */
package io.fabric8.kubernetes.client.http;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.WebSocket;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import javax.net.ServerSocketFactory;
import okhttp3.Protocol;
import okhttp3.Response;
import okhttp3.WebSocketListener;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

public abstract class AbstractSimultaneousConnectionsTest {
    private static final int MAX_HTTP_1_CONNECTIONS = 2048;
    private static final int MAX_HTTP_1_WS_CONNECTIONS = 1024;
    private RegisteredServerSocketFactory serverSocketFactory;
    private MockWebServer mockWebServer;
    private ExecutorService httpExecutor;
    private HttpServer httpServer;
    private HttpClient.Builder clientBuilder;

    @BeforeEach
    void prepareServerAndBuilder() throws IOException {
        this.serverSocketFactory = new RegisteredServerSocketFactory();
        this.mockWebServer = new MockWebServer();
        this.mockWebServer.setServerSocketFactory((ServerSocketFactory)this.serverSocketFactory);
        this.httpExecutor = Executors.newCachedThreadPool();
        this.httpServer = HttpServer.create(new InetSocketAddress(0), 0);
        this.httpServer.setExecutor(this.httpExecutor);
        this.httpServer.start();
        this.clientBuilder = this.getHttpClientFactory().newBuilder().connectTimeout(60L, TimeUnit.SECONDS);
    }

    @AfterEach
    void stopServer() throws IOException {
        this.serverSocketFactory.close();
        this.mockWebServer.shutdown();
        this.httpServer.stop(0);
        this.httpExecutor.shutdownNow();
    }

    protected abstract HttpClient.Factory getHttpClientFactory();

    private void withHttp1() throws IOException {
        this.mockWebServer.setProtocols(Collections.singletonList(Protocol.HTTP_1_1));
        this.mockWebServer.start();
    }

    @Test
    @DisplayName(value="Should be able to make 2048 simultaneous HTTP/1.x connections before processing the response")
    @DisabledOnOs(value={OS.WINDOWS})
    public void http1Connections() throws Exception {
        DelayedResponseHandler handler = new DelayedResponseHandler(2048, exchange -> exchange.sendResponseHeaders(204, -1L));
        this.httpServer.createContext("/http", handler);
        try (HttpClient client = this.clientBuilder.build();){
            ConcurrentHashMap.KeySetView asyncResponses = ConcurrentHashMap.newKeySet();
            HttpRequest request = client.newHttpRequestBuilder().uri(String.format("http://localhost:%s/http", this.httpServer.getAddress().getPort())).build();
            for (int it = 0; it < 2048; ++it) {
                asyncResponses.add(client.consumeBytes(request, (value, asyncBody) -> asyncBody.consume()));
                handler.await();
            }
            CompletableFuture.allOf(asyncResponses.toArray(new CompletableFuture[0])).get(60L, TimeUnit.SECONDS);
            ((AbstractCollectionAssert)Assertions.assertThat(asyncResponses).hasSize(2048)).extracting(CompletableFuture::join).extracting(response -> {
                ((AsyncBody)response.body()).consume();
                return response.code();
            }).containsOnly((Object[])new Integer[]{204});
        }
    }

    @Test
    @DisplayName(value="Should be able to make 1024 simultaneous HTTP connections before upgrading to WebSocket")
    @DisabledOnOs(value={OS.WINDOWS})
    public void http1WebSocketConnectionsBeforeUpgrade() throws Exception {
        DelayedResponseHandler handler = new DelayedResponseHandler(1024, exchange -> exchange.sendResponseHeaders(404, -1L));
        this.httpServer.createContext("/http", handler);
        try (HttpClient client = this.clientBuilder.build();){
            for (int it = 0; it < 1024; ++it) {
                client.newWebSocketBuilder().uri(URI.create(String.format("http://localhost:%s/http", this.httpServer.getAddress().getPort()))).buildAsync(new WebSocket.Listener(){});
                handler.await();
            }
        }
        Assertions.assertThat((Integer)((Integer)handler.connectionCount.get(60L, TimeUnit.SECONDS))).isEqualTo(1024);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @DisplayName(value="Should be able to make 1024 simultaneous upgraded WebSocket connections")
    @DisabledOnOs(value={OS.WINDOWS})
    public void http1WebSocketConnections() throws Exception {
        this.withHttp1();
        final ConcurrentHashMap.KeySetView serverSockets = ConcurrentHashMap.newKeySet();
        final ConcurrentHashMap.KeySetView clientSockets = ConcurrentHashMap.newKeySet();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch latch = new CountDownLatch(1024);
        MockResponse response = new MockResponse().withWebSocketUpgrade(new WebSocketListener(){

            public void onOpen(okhttp3.WebSocket webSocket, Response response) {
                try {
                    cyclicBarrier.await(1L, TimeUnit.SECONDS);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                serverSockets.add(webSocket);
                webSocket.send("go on");
            }
        });
        IntStream.range(0, 1024).forEach(i -> this.mockWebServer.enqueue(response));
        try (HttpClient client = this.clientBuilder.build();){
            for (int it = 0; it < 1024; ++it) {
                client.newWebSocketBuilder().uri(this.mockWebServer.url("/").uri()).buildAsync(new WebSocket.Listener(){

                    public void onMessage(WebSocket webSocket, String text) {
                        clientSockets.add(webSocket);
                        latch.countDown();
                        webSocket.request();
                    }
                });
                cyclicBarrier.await(1L, TimeUnit.SECONDS);
            }
            Assertions.assertThat((boolean)latch.await(60L, TimeUnit.SECONDS)).isTrue();
            Assertions.assertThat((int)serverSockets.size()).isEqualTo(1024).isLessThanOrEqualTo((int)this.serverSocketFactory.activeConnections());
        }
        finally {
            for (okhttp3.WebSocket socket : serverSockets) {
                socket.close(1000, "done");
            }
        }
    }

    private static class RegisteredServerSocketFactory
    extends ServerSocketFactory
    implements Closeable {
        private final Set<Socket> connections = new HashSet<Socket>();

        private RegisteredServerSocketFactory() {
        }

        final long activeConnections() {
            return this.connections.stream().filter(Socket::isConnected).filter(s -> !s.isClosed()).count();
        }

        @Override
        public final void close() {
            for (Socket socket : this.connections) {
                try {
                    socket.close();
                }
                catch (IOException iOException) {}
            }
        }

        @Override
        public ServerSocket createServerSocket() throws IOException {
            return new ServerSocket(){

                @Override
                public Socket accept() throws IOException {
                    Socket socket = super.accept();
                    connections.add(socket);
                    return socket;
                }
            };
        }

        @Override
        public ServerSocket createServerSocket(int port) throws IOException {
            throw new SocketException("not implemented");
        }

        @Override
        public ServerSocket createServerSocket(int port, int backlog) throws IOException {
            throw new SocketException("not implemented");
        }

        @Override
        public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
            throw new SocketException("not implemented");
        }
    }

    private static class DelayedResponseHandler
    implements HttpHandler {
        private final int requestCount;
        private final CyclicBarrier barrier;
        private final Set<HttpExchange> exchanges;
        private final CompletableFuture<Integer> connectionCount;
        private final ExecutorService executorService;

        private DelayedResponseHandler(int requestCount, HttpHandler handler) {
            this.requestCount = requestCount;
            this.barrier = new CyclicBarrier(2);
            this.exchanges = ConcurrentHashMap.newKeySet();
            this.connectionCount = new CompletableFuture();
            this.executorService = Executors.newFixedThreadPool(1);
            ((CompletableFuture)this.connectionCount.thenRunAsync(() -> {
                for (HttpExchange exchange : this.exchanges) {
                    try {
                        handler.handle(exchange);
                    }
                    catch (IOException iOException) {}
                }
            }, this.executorService)).whenComplete((unused, throwable) -> this.executorService.shutdownNow());
        }

        @Override
        public void handle(HttpExchange exchange) throws IOException {
            this.exchanges.add(exchange);
            this.await();
            if (this.exchanges.size() == this.requestCount) {
                this.connectionCount.complete(this.requestCount);
            }
        }

        public final void await() {
            try {
                this.barrier.await(1L, TimeUnit.SECONDS);
            }
            catch (Exception ex) {
                throw new RuntimeException("Failed to await the barrier");
            }
        }
    }
}

