/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.NettyBufferPool;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.shaded.guava33.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private static final int LOW_WATER_MARK = 8192;
    private static final int HIGH_WATER_MARK = 32768;
    private final String serverName;
    private final String bindAddress;
    private final Set<Integer> bindPortRange;
    private final int numEventLoopThreads;
    private final int numQueryThreads;
    private final AtomicReference<CompletableFuture<Void>> serverShutdownFuture = new AtomicReference<Object>(null);
    private ServerBootstrap bootstrap;
    private ExecutorService queryExecutor;
    private InetSocketAddress serverAddress;
    private AbstractServerHandler<REQ, RESP> handler;

    protected AbstractServerBase(String serverName, String bindAddress, Iterator<Integer> bindPortIterator, Integer numEventLoopThreads, Integer numQueryThreads) {
        Preconditions.checkNotNull(bindPortIterator);
        Preconditions.checkArgument((numEventLoopThreads >= 1 ? 1 : 0) != 0, (Object)"Non-positive number of event loop threads.");
        Preconditions.checkArgument((numQueryThreads >= 1 ? 1 : 0) != 0, (Object)"Non-positive number of query threads.");
        this.serverName = (String)Preconditions.checkNotNull((Object)serverName);
        this.bindAddress = (String)Preconditions.checkNotNull((Object)bindAddress);
        this.numEventLoopThreads = numEventLoopThreads;
        this.numQueryThreads = numQueryThreads;
        this.bindPortRange = new HashSet<Integer>();
        while (bindPortIterator.hasNext()) {
            int port = bindPortIterator.next();
            Preconditions.checkArgument((port >= 0 && port <= 65535 ? 1 : 0) != 0, (Object)("Invalid port configuration. Port must be between 0 and 65535, but was " + port + "."));
            this.bindPortRange.add(port);
        }
    }

    private ExecutorService createQueryExecutor() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink " + this.getServerName() + " Thread %d").build();
        return Executors.newFixedThreadPool(this.numQueryThreads, threadFactory);
    }

    protected ExecutorService getQueryExecutor() {
        return this.queryExecutor;
    }

    public String getServerName() {
        return this.serverName;
    }

    public abstract AbstractServerHandler<REQ, RESP> initializeHandler();

    public InetSocketAddress getServerAddress() {
        Preconditions.checkState((this.serverAddress != null ? 1 : 0) != 0, (Object)("Server " + this.serverName + " has not been started."));
        return this.serverAddress;
    }

    public void start() throws Throwable {
        Preconditions.checkState((this.serverAddress == null && this.serverShutdownFuture.get() == null ? 1 : 0) != 0, (Object)(this.serverName + " is already running @ " + String.valueOf(this.serverAddress) + ". "));
        Iterator<Integer> portIterator = this.bindPortRange.iterator();
        while (portIterator.hasNext() && !this.attemptToBind(portIterator.next())) {
        }
        if (this.serverAddress == null) {
            this.log.info("Unable to start {}. All ports in provided range ({}) are occupied.", (Object)this.serverName, this.bindPortRange);
            throw new FlinkRuntimeException("Unable to start " + this.serverName + ". All ports in provided range are occupied.");
        }
        this.log.info("Started {} @ {}.", (Object)this.serverName, (Object)this.serverAddress);
    }

    private boolean attemptToBind(int port) throws Throwable {
        this.log.debug("Attempting to start {} on port {}.", (Object)this.serverName, (Object)port);
        this.queryExecutor = this.createQueryExecutor();
        this.handler = this.initializeHandler();
        NettyBufferPool bufferPool = new NettyBufferPool(this.numEventLoopThreads);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink " + this.serverName + " EventLoop Thread %d").build();
        NioEventLoopGroup nioGroup = new NioEventLoopGroup(this.numEventLoopThreads, threadFactory);
        this.bootstrap = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().localAddress(this.bindAddress, port)).group((EventLoopGroup)nioGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.ALLOCATOR, (Object)bufferPool)).childOption(ChannelOption.ALLOCATOR, (Object)bufferPool).childHandler(new ServerChannelInitializer<REQ, RESP>(this.handler));
        int defaultHighWaterMark = 65536;
        this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)8192);
        this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)32768);
        try {
            ChannelFuture future = this.bootstrap.bind().sync();
            if (future.isSuccess()) {
                InetSocketAddress localAddress = (InetSocketAddress)future.channel().localAddress();
                this.serverAddress = new InetSocketAddress(localAddress.getAddress(), localAddress.getPort());
                return true;
            }
            throw future.cause();
        }
        catch (BindException e) {
            this.log.debug("Failed to start {} on port {}: {}.", new Object[]{this.serverName, port, e.getMessage()});
            try {
                ((CompletableFuture)this.shutdownServer().whenComplete((ignoredV, ignoredT) -> this.serverShutdownFuture.getAndSet(null))).get();
            }
            catch (Exception r) {
                this.log.warn("Problem while shutting down {}: {}", (Object)this.serverName, (Object)r.getMessage());
            }
            return false;
        }
    }

    public CompletableFuture<Void> shutdownServer() {
        CompletableFuture shutdownFuture = new CompletableFuture();
        if (this.serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
            this.log.info("Shutting down {} @ {}", (Object)this.serverName, (Object)this.serverAddress);
            CompletableFuture<Object> groupShutdownFuture = new CompletableFuture<Object>();
            if (this.bootstrap != null) {
                EventLoopGroup group = this.bootstrap.config().group();
                if (group != null && !group.isShutdown()) {
                    group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).addListener(finished -> {
                        if (finished.isSuccess()) {
                            groupShutdownFuture.complete(null);
                        } else {
                            groupShutdownFuture.completeExceptionally(finished.cause());
                        }
                    });
                } else {
                    groupShutdownFuture.complete(null);
                }
            } else {
                groupShutdownFuture.complete(null);
            }
            CompletableFuture<Object> handlerShutdownFuture = new CompletableFuture<Object>();
            if (this.handler == null) {
                handlerShutdownFuture.complete(null);
            } else {
                this.handler.shutdown().whenComplete((result, throwable) -> {
                    if (throwable != null) {
                        handlerShutdownFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        handlerShutdownFuture.complete(null);
                    }
                });
            }
            CompletableFuture<Void> queryExecShutdownFuture = CompletableFuture.runAsync(() -> {
                if (this.queryExecutor != null) {
                    ExecutorUtils.gracefulShutdown((long)10L, (TimeUnit)TimeUnit.MINUTES, (ExecutorService[])new ExecutorService[]{this.queryExecutor});
                }
            });
            CompletableFuture.allOf(queryExecShutdownFuture, groupShutdownFuture, handlerShutdownFuture).whenComplete((result, throwable) -> {
                if (throwable != null) {
                    shutdownFuture.completeExceptionally((Throwable)throwable);
                } else {
                    shutdownFuture.complete(null);
                }
            });
        }
        return this.serverShutdownFuture.get();
    }

    @VisibleForTesting
    public boolean isEventGroupShutdown() {
        return this.bootstrap == null || this.bootstrap.config().group().isTerminated();
    }

    private static final class ServerChannelInitializer<REQ extends MessageBody, RESP extends MessageBody>
    extends ChannelInitializer<SocketChannel> {
        private final AbstractServerHandler<REQ, RESP> sharedRequestHandler;

        ServerChannelInitializer(AbstractServerHandler<REQ, RESP> sharedRequestHandler) {
            this.sharedRequestHandler = (AbstractServerHandler)((Object)Preconditions.checkNotNull(sharedRequestHandler, (String)"MessageBody handler"));
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(new ChannelHandler[]{this.sharedRequestHandler});
        }
    }
}

