/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.CheckedSupplier;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class RestClientTest
extends TestLogger {
    private static final String unroutableIp = "240.0.0.0";
    private static final long TIMEOUT = 10L;

    @Test
    public void testConnectionTimeout() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.CONNECTION_TIMEOUT, 1L);
        try (RestClient restClient = new RestClient(config, Executors.directExecutor());){
            restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance()).get(60L, TimeUnit.SECONDS);
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertThat((Object)throwable, (Matcher)Matchers.instanceOf(ConnectTimeoutException.class));
            Assert.assertThat((Object)throwable.getMessage(), (Matcher)Matchers.containsString((String)unroutableIp));
        }
    }

    @Test
    public void testInvalidVersionRejection() throws Exception {
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor());){
            CompletableFuture invalidVersionResponse = restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), RestAPIVersion.V0);
            Assert.fail((String)"The request should have been rejected due to a version mismatch.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testConnectionClosedHandling() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
        try (ServerSocket serverSocket = new ServerSocket(0);
             RestClient restClient = new RestClient(config, (Executor)TestingUtils.defaultExecutor());){
            String targetAddress = "localhost";
            int targetPort = serverSocket.getLocalPort();
            CompletableFuture socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket)));
            CompletableFuture responseFuture = restClient.sendRequest("localhost", targetPort, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList());
            Socket connectionSocket = null;
            try {
                connectionSocket = (Socket)socketCompletableFuture.get(10L, TimeUnit.SECONDS);
            }
            catch (TimeoutException ignored) {
                socketCompletableFuture.cancel(true);
            }
            if (connectionSocket != null) {
                connectionSocket.close();
            }
            try {
                responseFuture.get();
            }
            catch (ExecutionException ee) {
                if (!ExceptionUtils.findThrowable((Throwable)ee, IOException.class).isPresent()) {
                    throw ee;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestClientClosedHandling() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
        try (Socket connectionSocket = null;
             ServerSocket serverSocket = new ServerSocket(0);
             RestClient restClient = new RestClient(config, (Executor)TestingUtils.defaultExecutor());){
            String targetAddress = "localhost";
            int targetPort = serverSocket.getLocalPort();
            CompletableFuture socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket)));
            CompletableFuture responseFuture = restClient.sendRequest("localhost", targetPort, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList());
            try {
                connectionSocket = (Socket)socketCompletableFuture.get(10L, TimeUnit.SECONDS);
            }
            catch (TimeoutException ignored) {
                socketCompletableFuture.cancel(true);
            }
            restClient.close();
            try {
                responseFuture.get();
            }
            catch (ExecutionException ee) {
                if (!ExceptionUtils.findThrowable((Throwable)ee, IOException.class).isPresent()) {
                    throw ee;
                }
            }
        }
    }

    private static class TestMessageHeaders
    implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private TestMessageHeaders() {
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/";
        }
    }
}

