/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.io.Serializable;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.net.RedirectingSslHandler;
import org.apache.flink.runtime.rest.FileUploadHandler;
import org.apache.flink.runtime.rest.FlinkHttpObjectAggregator;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.RestService;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrapConfig;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RestServerEndpoint
implements RestService {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Object lock = new Object();
    private final Configuration configuration;
    private final String restAddress;
    private final String restBindAddress;
    private final String restBindPortRange;
    @Nullable
    private final SSLHandlerFactory sslHandlerFactory;
    private final int maxContentLength;
    protected final Path uploadDir;
    protected final Map<String, String> responseHeaders;
    private final CompletableFuture<Void> terminationFuture;
    private List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
    private ServerBootstrap bootstrap;
    private Channel serverChannel;
    private String restBaseUrl;
    private int port;
    private State state = State.CREATED;
    @VisibleForTesting
    List<InboundChannelHandlerFactory> inboundChannelHandlerFactories;

    public RestServerEndpoint(Configuration configuration) throws IOException, ConfigurationException {
        Preconditions.checkNotNull((Object)configuration);
        RestServerEndpointConfiguration restConfiguration = RestServerEndpointConfiguration.fromConfiguration(configuration);
        Preconditions.checkNotNull((Object)restConfiguration);
        this.configuration = configuration;
        this.restAddress = restConfiguration.getRestAddress();
        this.restBindAddress = restConfiguration.getRestBindAddress();
        this.restBindPortRange = restConfiguration.getRestBindPortRange();
        this.sslHandlerFactory = restConfiguration.getSslHandlerFactory();
        this.uploadDir = restConfiguration.getUploadDir();
        RestServerEndpoint.createUploadDir(this.uploadDir, this.log, true);
        this.maxContentLength = restConfiguration.getMaxContentLength();
        this.responseHeaders = restConfiguration.getResponseHeaders();
        this.terminationFuture = new CompletableFuture();
        this.inboundChannelHandlerFactories = new ArrayList<InboundChannelHandlerFactory>();
        ServiceLoader<InboundChannelHandlerFactory> loader = ServiceLoader.load(InboundChannelHandlerFactory.class);
        Iterator<InboundChannelHandlerFactory> factories = loader.iterator();
        while (factories.hasNext()) {
            try {
                InboundChannelHandlerFactory factory = factories.next();
                if (factory == null) continue;
                this.inboundChannelHandlerFactories.add(factory);
                this.log.info("Loaded channel inbound factory: {}", (Object)factory);
            }
            catch (Throwable e) {
                this.log.error("Could not load channel inbound factory.", e);
                throw e;
            }
        }
        this.inboundChannelHandlerFactories.sort(Comparator.comparingInt(InboundChannelHandlerFactory::priority).reversed());
    }

    protected abstract List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void start() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Iterator portsIterator;
            Preconditions.checkState((this.state == State.CREATED ? 1 : 0) != 0, (Object)"The RestServerEndpoint cannot be restarted.");
            this.log.info("Starting rest endpoint.");
            final Router router = new Router();
            final CompletableFuture<String> restAddressFuture = new CompletableFuture<String>();
            this.handlers = this.initializeHandlers(restAddressFuture);
            Collections.sort(this.handlers, RestHandlerUrlComparator.INSTANCE);
            RestServerEndpoint.checkAllEndpointsAndHandlersAreUnique(this.handlers);
            this.handlers.forEach(handler -> RestServerEndpoint.registerHandler(router, (Tuple2<RestHandlerSpecification, ChannelInboundHandler>)handler, this.log));
            ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>(){

                protected void initChannel(SocketChannel ch) throws ConfigurationException {
                    RouterHandler handler = new RouterHandler(router, RestServerEndpoint.this.responseHeaders);
                    if (RestServerEndpoint.this.isHttpsEnabled()) {
                        ch.pipeline().addLast("ssl", (ChannelHandler)new RedirectingSslHandler(RestServerEndpoint.this.restAddress, restAddressFuture, RestServerEndpoint.this.sslHandlerFactory));
                    }
                    ch.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new FileUploadHandler(RestServerEndpoint.this.uploadDir)}).addLast(new ChannelHandler[]{new FlinkHttpObjectAggregator(RestServerEndpoint.this.maxContentLength, RestServerEndpoint.this.responseHeaders)});
                    for (InboundChannelHandlerFactory factory : RestServerEndpoint.this.inboundChannelHandlerFactories) {
                        Optional<ChannelHandler> channelHandler = factory.createHandler(RestServerEndpoint.this.configuration, RestServerEndpoint.this.responseHeaders);
                        if (!channelHandler.isPresent()) continue;
                        ch.pipeline().addLast(new ChannelHandler[]{channelHandler.get()});
                    }
                    ch.pipeline().addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(handler.getName(), (ChannelHandler)handler).addLast(new ChannelHandler[]{new PipelineErrorHandler(RestServerEndpoint.this.log, RestServerEndpoint.this.responseHeaders)});
                }
            };
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, (ThreadFactory)new ExecutorThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, (ThreadFactory)new ExecutorThreadFactory("flink-rest-server-netty-worker"));
            this.bootstrap = new ServerBootstrap();
            ((ServerBootstrap)this.bootstrap.group((EventLoopGroup)bossGroup, (EventLoopGroup)workerGroup).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)initializer);
            try {
                portsIterator = NetUtils.getPortRangeFromString((String)this.restBindPortRange);
            }
            catch (IllegalConfigurationException e) {
                throw e;
            }
            catch (Exception e) {
                throw new IllegalArgumentException("Invalid port range definition: " + this.restBindPortRange);
            }
            int chosenPort = 0;
            while (portsIterator.hasNext()) {
                try {
                    chosenPort = (Integer)portsIterator.next();
                    ChannelFuture channel = this.restBindAddress == null ? this.bootstrap.bind(chosenPort) : this.bootstrap.bind(this.restBindAddress, chosenPort);
                    this.serverChannel = channel.syncUninterruptibly().channel();
                    break;
                }
                catch (Exception e) {
                    if (e instanceof BindException) continue;
                    throw e;
                }
            }
            if (this.serverChannel == null) {
                throw new BindException("Could not start rest endpoint on any port in port range " + this.restBindPortRange);
            }
            this.log.debug("Binding rest endpoint to {}:{}.", (Object)this.restBindAddress, (Object)chosenPort);
            InetSocketAddress bindAddress = (InetSocketAddress)this.serverChannel.localAddress();
            String advertisedAddress = bindAddress.getAddress().isAnyLocalAddress() ? this.restAddress : bindAddress.getAddress().getHostAddress();
            this.port = bindAddress.getPort();
            this.log.info("Rest endpoint listening at {}:{}", (Object)advertisedAddress, (Object)this.port);
            this.restBaseUrl = new URL(this.determineProtocol(), advertisedAddress, this.port, "").toString();
            restAddressFuture.complete(this.restBaseUrl);
            this.state = State.RUNNING;
            this.startInternal();
        }
    }

    protected abstract void startInternal() throws Exception;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public InetSocketAddress getServerAddress() {
        Object object = this.lock;
        synchronized (object) {
            this.assertRestServerHasBeenStarted();
            Channel server = this.serverChannel;
            if (server != null) {
                try {
                    return (InetSocketAddress)server.localAddress();
                }
                catch (Exception e) {
                    this.log.error("Cannot access local server address", (Throwable)e);
                }
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String getRestBaseUrl() {
        Object object = this.lock;
        synchronized (object) {
            this.assertRestServerHasBeenStarted();
            return this.restBaseUrl;
        }
    }

    private void assertRestServerHasBeenStarted() {
        Preconditions.checkState((this.state != State.CREATED ? 1 : 0) != 0, (Object)"The RestServerEndpoint has not been started yet.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getRestPort() {
        Object object = this.lock;
        synchronized (object) {
            this.assertRestServerHasBeenStarted();
            return this.port;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        Object object = this.lock;
        synchronized (object) {
            this.log.info("Shutting down rest endpoint.");
            if (this.state == State.RUNNING) {
                CompletableFuture shutDownFuture = FutureUtils.composeAfterwards(this.closeHandlersAsync(), this::shutDownInternal);
                shutDownFuture.whenComplete((ignored, throwable) -> {
                    this.log.info("Shut down complete.");
                    if (throwable != null) {
                        this.terminationFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        this.terminationFuture.complete(null);
                    }
                });
                this.state = State.SHUTDOWN;
            } else if (this.state == State.CREATED) {
                this.terminationFuture.complete(null);
                this.state = State.SHUTDOWN;
            }
            return this.terminationFuture;
        }
    }

    private FutureUtils.ConjunctFuture<Void> closeHandlersAsync() {
        return FutureUtils.waitForAll((Collection)this.handlers.stream().map(tuple -> (ChannelInboundHandler)tuple.f1).filter(handler -> handler instanceof AutoCloseableAsync).map(handler -> ((AutoCloseableAsync)handler).closeAsync()).collect(Collectors.toList()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Void> shutDownInternal() {
        Object object = this.lock;
        synchronized (object) {
            CompletableFuture channelFuture = new CompletableFuture();
            if (this.serverChannel != null) {
                this.serverChannel.close().addListener(finished -> {
                    if (finished.isSuccess()) {
                        channelFuture.complete(null);
                    } else {
                        channelFuture.completeExceptionally(finished.cause());
                    }
                });
                this.serverChannel = null;
            }
            CompletableFuture<Void> channelTerminationFuture = new CompletableFuture<Void>();
            channelFuture.thenRun(() -> {
                CompletableFuture<Object> groupFuture = new CompletableFuture<Object>();
                CompletableFuture<Object> childGroupFuture = new CompletableFuture<Object>();
                Time gracePeriod = Time.seconds((long)10L);
                if (this.bootstrap != null) {
                    ServerBootstrapConfig config = this.bootstrap.config();
                    EventLoopGroup group = config.group();
                    if (group != null) {
                        group.shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(finished -> {
                            if (finished.isSuccess()) {
                                groupFuture.complete(null);
                            } else {
                                groupFuture.completeExceptionally(finished.cause());
                            }
                        });
                    } else {
                        groupFuture.complete(null);
                    }
                    EventLoopGroup childGroup = config.childGroup();
                    if (childGroup != null) {
                        childGroup.shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(finished -> {
                            if (finished.isSuccess()) {
                                childGroupFuture.complete(null);
                            } else {
                                childGroupFuture.completeExceptionally(finished.cause());
                            }
                        });
                    } else {
                        childGroupFuture.complete(null);
                    }
                    this.bootstrap = null;
                } else {
                    groupFuture.complete(null);
                    childGroupFuture.complete(null);
                }
                FutureUtils.ConjunctFuture combinedFuture = FutureUtils.completeAll(Arrays.asList(groupFuture, childGroupFuture));
                combinedFuture.whenComplete((ignored, throwable) -> {
                    if (throwable != null) {
                        channelTerminationFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        channelTerminationFuture.complete(null);
                    }
                });
            });
            return channelTerminationFuture;
        }
    }

    private boolean isHttpsEnabled() {
        return this.sslHandlerFactory != null;
    }

    private String determineProtocol() {
        return this.isHttpsEnabled() ? "https" : "http";
    }

    private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, Logger log) {
        String handlerURL = ((RestHandlerSpecification)specificationHandler.f0).getTargetRestEndpointURL();
        for (RestAPIVersion<?> supportedVersion : ((RestHandlerSpecification)specificationHandler.f0).getSupportedAPIVersions()) {
            String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL;
            log.debug("Register handler {} under {}@{}.", new Object[]{specificationHandler.f1, ((RestHandlerSpecification)specificationHandler.f0).getHttpMethod(), versionedHandlerURL});
            RestServerEndpoint.registerHandler(router, versionedHandlerURL, ((RestHandlerSpecification)specificationHandler.f0).getHttpMethod(), (ChannelInboundHandler)specificationHandler.f1);
            if (!supportedVersion.isDefaultVersion()) continue;
            log.debug("Register handler {} under {}@{}.", new Object[]{specificationHandler.f1, ((RestHandlerSpecification)specificationHandler.f0).getHttpMethod(), handlerURL});
            RestServerEndpoint.registerHandler(router, handlerURL, ((RestHandlerSpecification)specificationHandler.f0).getHttpMethod(), (ChannelInboundHandler)specificationHandler.f1);
        }
    }

    private static void registerHandler(Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
        switch (httpMethod) {
            case GET: {
                router.addGet(handlerURL, handler);
                break;
            }
            case POST: {
                router.addPost(handlerURL, handler);
                break;
            }
            case DELETE: {
                router.addDelete(handlerURL, handler);
                break;
            }
            case PATCH: {
                router.addPatch(handlerURL, handler);
                break;
            }
            case PUT: {
                router.addPut(handlerURL, handler);
                break;
            }
            default: {
                throw new RuntimeException("Unsupported http method: " + (Object)((Object)httpMethod) + '.');
            }
        }
    }

    static void createUploadDir(Path uploadDir, Logger log, boolean initialCreation) throws IOException {
        if (!Files.exists(uploadDir, new LinkOption[0])) {
            if (initialCreation) {
                log.info("Upload directory {} does not exist. ", (Object)uploadDir);
            } else {
                log.warn("Upload directory {} has been deleted externally. Previously uploaded files are no longer available.", (Object)uploadDir);
            }
            RestServerEndpoint.checkAndCreateUploadDir(uploadDir, log);
        }
    }

    private static synchronized void checkAndCreateUploadDir(Path uploadDir, Logger log) throws IOException {
        if (Files.exists(uploadDir, new LinkOption[0]) && Files.isWritable(uploadDir)) {
            log.info("Using directory {} for file uploads.", (Object)uploadDir);
        } else if (Files.isWritable(Files.createDirectories(uploadDir, new FileAttribute[0]))) {
            log.info("Created directory {} for file uploads.", (Object)uploadDir);
        } else {
            log.warn("Upload directory {} cannot be created or is not writable.", (Object)uploadDir);
            throw new IOException(String.format("Upload directory %s cannot be created or is not writable.", uploadDir));
        }
    }

    private static void checkAllEndpointsAndHandlersAreUnique(List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) {
        HashSet<String> uniqueEndpoints = new HashSet<String>();
        Set distinctHandlers = Collections.newSetFromMap(new IdentityHashMap());
        for (Tuple2<RestHandlerSpecification, ChannelInboundHandler> handler : handlers) {
            boolean isNewHandler = distinctHandlers.add(handler.f1);
            if (!isNewHandler) {
                throw new FlinkRuntimeException("Duplicate REST handler instance found. Please ensure each instance is registered only once.");
            }
            RestHandlerSpecification headers = (RestHandlerSpecification)handler.f0;
            for (RestAPIVersion<?> supportedRestAPIVersion : headers.getSupportedAPIVersions()) {
                String parameterizedEndpoint = supportedRestAPIVersion.toString() + (Object)((Object)headers.getHttpMethod()) + headers.getTargetRestEndpointURL();
                String normalizedEndpoint = parameterizedEndpoint.replaceAll(":[\\w-]+", ":param");
                boolean isNewEndpoint = uniqueEndpoints.add(normalizedEndpoint);
                if (isNewEndpoint) continue;
                throw new FlinkRuntimeException(String.format("REST handler registration overlaps with another registration for: version=%s, method=%s, url=%s.", new Object[]{supportedRestAPIVersion, headers.getHttpMethod(), headers.getTargetRestEndpointURL()}));
            }
        }
    }

    private static enum State {
        CREATED,
        RUNNING,
        SHUTDOWN;

    }

    public static final class RestHandlerUrlComparator
    implements Comparator<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>,
    Serializable {
        private static final long serialVersionUID = 2388466767835547926L;
        private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();
        static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();

        @Override
        public int compare(Tuple2<RestHandlerSpecification, ChannelInboundHandler> o1, Tuple2<RestHandlerSpecification, ChannelInboundHandler> o2) {
            int urlComparisonResult = CASE_INSENSITIVE_ORDER.compare(((RestHandlerSpecification)o1.f0).getTargetRestEndpointURL(), ((RestHandlerSpecification)o2.f0).getTargetRestEndpointURL());
            if (urlComparisonResult != 0) {
                return urlComparisonResult;
            }
            Collection<? extends RestAPIVersion<?>> o1APIVersions = ((RestHandlerSpecification)o1.f0).getSupportedAPIVersions();
            RestAPIVersion<?> o1Version = Collections.min(o1APIVersions);
            Collection<? extends RestAPIVersion<?>> o2APIVersions = ((RestHandlerSpecification)o2.f0).getSupportedAPIVersions();
            RestAPIVersion<?> o2Version = Collections.min(o2APIVersions);
            return o1Version.compareTo(o2Version);
        }

        public static final class CaseInsensitiveOrderComparator
        implements Comparator<String>,
        Serializable {
            private static final long serialVersionUID = 8550835445193437027L;

            @Override
            public int compare(String s1, String s2) {
                int n1 = s1.length();
                int n2 = s2.length();
                int min = Math.min(n1, n2);
                for (int i = 0; i < min; ++i) {
                    char c2;
                    char c1 = s1.charAt(i);
                    if (c1 == (c2 = s2.charAt(i)) || (c1 = Character.toUpperCase(c1)) == (c2 = Character.toUpperCase(c2)) || (c1 = Character.toLowerCase(c1)) == (c2 = Character.toLowerCase(c2))) continue;
                    if (c1 == ':') {
                        return 1;
                    }
                    if (c2 == ':') {
                        return -1;
                    }
                    return c1 - c2;
                }
                return n1 - n2;
            }
        }
    }
}

