/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultSelectStrategyFactory;
import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategy;
import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategyFactory;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.CheckedSupplier;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.InstanceOfAssertFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class RestClientTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static final String unroutableIp = "240.0.0.0";
    private static final long TIMEOUT = 10L;

    RestClientTest() {
    }

    @Test
    void testConnectionTimeout() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.CONNECTION_TIMEOUT, 1L);
        try (RestClient restClient = new RestClient(config, Executors.directExecutor());){
            CompletableFuture future = restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance());
            ((AbstractThrowableAssert)FlinkAssertions.assertThatFuture((CompletableFuture)future).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(ConnectTimeoutException.class).extracting(Throwable::getCause, Assertions.as((InstanceOfAssertFactory)InstanceOfAssertFactories.THROWABLE))).hasMessageContaining(unroutableIp);
        }
    }

    @Test
    public void testInvalidVersionRejection() throws Exception {
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor());){
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), (RestAPIVersion)RuntimeRestAPIVersion.V0)).as("The request should have been rejected due to a version mismatch.", new Object[0])).isInstanceOf(IllegalArgumentException.class);
        }
    }

    @Test
    public void testConnectionClosedHandling() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
        try (ServerSocket serverSocket = new ServerSocket(0);
             RestClient restClient = new RestClient(config, (Executor)EXECUTOR_EXTENSION.getExecutor());){
            String targetAddress = "localhost";
            int targetPort = serverSocket.getLocalPort();
            CompletableFuture socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket)));
            CompletableFuture responseFuture = restClient.sendRequest("localhost", targetPort, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList());
            Socket connectionSocket = null;
            try {
                connectionSocket = (Socket)socketCompletableFuture.get(10L, TimeUnit.SECONDS);
            }
            catch (TimeoutException ignored) {
                socketCompletableFuture.cancel(true);
            }
            if (connectionSocket != null) {
                connectionSocket.close();
            }
            FlinkAssertions.assertThatFuture((CompletableFuture)responseFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(IOException.class);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestClientClosedHandling() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
        try (Socket connectionSocket = null;
             ServerSocket serverSocket = new ServerSocket(0);
             RestClient restClient = new RestClient(config, (Executor)EXECUTOR_EXTENSION.getExecutor());){
            String targetAddress = "localhost";
            int targetPort = serverSocket.getLocalPort();
            CompletableFuture socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket)));
            CompletableFuture responseFuture = restClient.sendRequest("localhost", targetPort, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList());
            try {
                connectionSocket = (Socket)socketCompletableFuture.get(10L, TimeUnit.SECONDS);
            }
            catch (TimeoutException ignored) {
                socketCompletableFuture.cancel(true);
            }
            restClient.close();
            FlinkAssertions.assertThatFuture((CompletableFuture)responseFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(IOException.class);
        }
    }

    @Test
    public void testCloseClientBeforeRequest() throws Exception {
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor());){
            restClient.close();
            CompletableFuture future = restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance());
            ((AbstractThrowableAssert)FlinkAssertions.assertThatFuture((CompletableFuture)future).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(IllegalStateException.class).extracting(Throwable::getCause, Assertions.as((InstanceOfAssertFactory)InstanceOfAssertFactories.THROWABLE))).hasMessage("RestClient is already closed");
        }
    }

    @Test
    public void testCloseClientWhileProcessingRequest() throws Exception {
        OneShotLatch connectTriggered = new OneShotLatch();
        OneShotLatch closeTriggered = new OneShotLatch();
        SelectStrategy fallbackSelectStrategy = DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
        SelectStrategyFactory selectStrategyFactory = () -> (selectSupplier, hasTasks) -> {
            connectTriggered.trigger();
            closeTriggered.awaitQuietly();
            return fallbackSelectStrategy.calculateStrategy(selectSupplier, hasTasks);
        };
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor(), selectStrategyFactory);){
            Assertions.assertThat((Collection)restClient.getResponseChannelFutures()).isEmpty();
            CompletableFuture requestFuture = restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance());
            Assertions.assertThat((Collection)restClient.getResponseChannelFutures()).hasSize(1);
            connectTriggered.await();
            CompletableFuture closeFuture = restClient.closeAsync();
            closeTriggered.trigger();
            ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture((CompletableFuture)closeFuture).as("Close should have had completed.", new Object[0])).eventuallySucceeds();
            ((AbstractThrowableAssert)FlinkAssertions.assertThatFuture((CompletableFuture)requestFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(IllegalStateException.class).extracting(Throwable::getCause, Assertions.as((InstanceOfAssertFactory)InstanceOfAssertFactories.THROWABLE))).hasMessage("executor not accepting a task");
        }
    }

    @Test
    public void testResponseChannelFuturesResolvedExceptionallyOnClose() throws Exception {
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor());){
            CompletableFuture responseChannelFuture = new CompletableFuture();
            restClient.getResponseChannelFutures().add(responseChannelFuture);
            restClient.close();
            Assertions.assertThat((Collection)restClient.getResponseChannelFutures()).isEmpty();
            ((AbstractThrowableAssert)FlinkAssertions.assertThatFuture(responseChannelFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(IllegalStateException.class).extracting(Throwable::getCause, Assertions.as((InstanceOfAssertFactory)InstanceOfAssertFactories.THROWABLE))).hasMessage("RestClient closed before request completed");
        }
    }

    private static class TestMessageHeaders
    implements RuntimeMessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private TestMessageHeaders() {
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/";
        }
    }
}

