/*
 * Decompiled with CFR 0.152.
 */
package io.fabric8.kubernetes.client.http;

import io.fabric8.kubernetes.client.RequestConfigBuilder;
import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.mockwebserver.MockWebServer;
import io.fabric8.mockwebserver.MockWebServerListener;
import io.fabric8.mockwebserver.http.MockResponse;
import io.fabric8.mockwebserver.http.RecordedHttpConnection;
import io.fabric8.mockwebserver.http.Response;
import io.fabric8.mockwebserver.http.WebSocketListener;
import io.fabric8.mockwebserver.vertx.Protocol;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpVersion;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

public abstract class AbstractSimultaneousConnectionsTest {
    private static final int MAX_HTTP_1_CONNECTIONS = 2048;
    private static final int MAX_HTTP_1_WS_CONNECTIONS = 1024;
    private RegisteredConnections registeredConnections;
    private MockWebServer mockWebServer;
    private Vertx vertx;
    private HttpClient.Builder clientBuilder;

    @BeforeEach
    void prepareServerAndBuilder() {
        this.registeredConnections = new RegisteredConnections();
        this.mockWebServer = new MockWebServer();
        this.mockWebServer.addListener((MockWebServerListener)this.registeredConnections);
        this.vertx = Vertx.vertx();
        this.clientBuilder = this.getHttpClientFactory().newBuilder().connectTimeout(60L, TimeUnit.SECONDS);
    }

    @AfterEach
    void stopServer() {
        this.mockWebServer.shutdown();
        this.vertx.close();
    }

    protected abstract HttpClient.Factory getHttpClientFactory();

    private void withHttp1() {
        this.mockWebServer.setProtocols(Collections.singletonList(Protocol.HTTP_1_1));
        this.mockWebServer.start();
    }

    @Test
    @DisplayName(value="Should be able to make 2048 simultaneous HTTP/1.x connections before processing the response")
    @DisabledOnOs(value={OS.WINDOWS})
    public void http1Connections() throws Exception {
        ConcurrentHashMap.KeySetView asyncResponses = ConcurrentHashMap.newKeySet();
        try (DelayedResponseHttp1Server server = new DelayedResponseHttp1Server(this.vertx, 2048);
             HttpClient client = this.clientBuilder.tag((Object)((RequestConfigBuilder)new RequestConfigBuilder().withRequestRetryBackoffLimit(Integer.valueOf(0))).build()).build();){
            for (int it = 0; it < 2048; ++it) {
                HttpRequest request = client.newHttpRequestBuilder().uri(server.uri() + "?" + it).build();
                asyncResponses.add(client.consumeBytes(request, (value, asyncBody) -> asyncBody.consume()));
            }
            server.await();
            Assertions.assertThat(server.requests).hasSize(2048);
            for (HttpServerRequest serverRequest : server.requests) {
                serverRequest.response().setStatusCode(204).end();
            }
            CompletableFuture.allOf(asyncResponses.toArray(new CompletableFuture[0])).get(70L, TimeUnit.SECONDS);
            ((AbstractCollectionAssert)Assertions.assertThat(asyncResponses).hasSize(2048)).extracting(CompletableFuture::join).extracting(response -> {
                ((AsyncBody)response.body()).consume();
                return response.code();
            }).containsOnly((Object[])new Integer[]{204});
        }
    }

    @Test
    @DisplayName(value="Should be able to make 1024 simultaneous HTTP connections before upgrading to WebSocket")
    @DisabledOnOs(value={OS.WINDOWS})
    public void http1WebSocketConnectionsBeforeUpgrade() throws Exception {
        try (DelayedResponseHttp1Server server = new DelayedResponseHttp1Server(this.vertx, 1024);
             HttpClient client = this.clientBuilder.build();){
            for (int it = 0; it < 1024; ++it) {
                client.newWebSocketBuilder().uri(URI.create(server.uri())).buildAsync(new WebSocket.Listener(){});
            }
            server.await();
            Assertions.assertThat(server.requests).hasSize(1024);
            server.requests.forEach(request -> request.response().setStatusCode(101).end());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @DisplayName(value="Should be able to make 1024 simultaneous upgraded WebSocket connections")
    @DisabledOnOs(value={OS.WINDOWS})
    public void http1WebSocketConnections() throws Exception {
        this.withHttp1();
        final ConcurrentHashMap.KeySetView serverSockets = ConcurrentHashMap.newKeySet();
        final ConcurrentHashMap.KeySetView clientSockets = ConcurrentHashMap.newKeySet();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch latch = new CountDownLatch(1024);
        MockResponse response = new MockResponse().withWebSocketUpgrade(new WebSocketListener(){

            public void onOpen(io.fabric8.mockwebserver.http.WebSocket webSocket, Response response) {
                try {
                    cyclicBarrier.await(1L, TimeUnit.SECONDS);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                serverSockets.add(webSocket);
                webSocket.send("go on");
            }
        });
        IntStream.range(0, 1024).forEach(i -> this.mockWebServer.enqueue(response));
        try (HttpClient client = this.clientBuilder.build();){
            for (int it = 0; it < 1024; ++it) {
                client.newWebSocketBuilder().uri(this.mockWebServer.url("/").uri()).buildAsync(new WebSocket.Listener(){

                    public void onMessage(WebSocket webSocket, String text) {
                        clientSockets.add(webSocket);
                        latch.countDown();
                        webSocket.request();
                    }
                });
                cyclicBarrier.await(1L, TimeUnit.SECONDS);
            }
            Assertions.assertThat((boolean)latch.await(60L, TimeUnit.SECONDS)).isTrue();
            Assertions.assertThat((int)serverSockets.size()).isEqualTo(1024).isLessThanOrEqualTo(this.registeredConnections.activeConnections());
        }
        finally {
            for (io.fabric8.mockwebserver.http.WebSocket socket : serverSockets) {
                socket.close(1000, "done");
            }
        }
    }

    private static class RegisteredConnections
    implements MockWebServerListener {
        private final Set<RecordedHttpConnection> connections = ConcurrentHashMap.newKeySet();

        private RegisteredConnections() {
        }

        final int activeConnections() {
            return this.connections.size();
        }

        public void onConnection(RecordedHttpConnection connection) {
            this.connections.add(connection);
            super.onConnection(connection);
        }

        public void onConnectionClosed(RecordedHttpConnection connection) {
            this.connections.remove(connection);
        }
    }

    private static class DelayedResponseHttp1Server
    implements AutoCloseable {
        private final int connections;
        private final HttpServer httpServer;
        private final Collection<HttpServerRequest> requests;
        private final CountDownLatch connectionLatch;

        private DelayedResponseHttp1Server(Vertx vertx, int connections) throws Exception {
            this.connections = connections;
            this.requests = ConcurrentHashMap.newKeySet();
            this.connectionLatch = new CountDownLatch(connections);
            this.httpServer = vertx.createHttpServer(new HttpServerOptions().setPort(0).setAlpnVersions(Collections.singletonList(HttpVersion.HTTP_1_1)));
            this.httpServer.connectionHandler(event -> this.connectionLatch.countDown());
            this.httpServer.requestHandler(this.requests::add);
            this.httpServer.listen().toCompletionStage().toCompletableFuture().get(10L, TimeUnit.SECONDS);
        }

        @Override
        public void close() throws Exception {
            this.requests.forEach(request -> request.connection().close());
            this.requests.clear();
            this.httpServer.close().toCompletionStage().toCompletableFuture().get(10L, TimeUnit.SECONDS);
        }

        private String uri() {
            return String.format("http://localhost:%s/http-1-connections", this.httpServer.actualPort());
        }

        private void await() {
            try {
                if (!this.connectionLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new AssertionError((Object)("Failed to await the connection latch, remaining connections to open: " + this.connectionLatch.getCount()));
                }
                Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> this.requests.size() == this.connections);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Failed to await the connection latch (interrupted)", e);
            }
        }
    }
}

