/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.remote;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTooMuchRequestException;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyRemotingClient {
    private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
    private final Bootstrap bootstrap = new Bootstrap();
    private final NettyEncoder encoder = new NettyEncoder();
    private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final EventLoopGroup workerGroup;
    private final NettyClientConfig clientConfig;
    private final Semaphore asyncSemaphore = new Semaphore(200, true);
    private final ExecutorService callbackExecutor;
    private final NettyClientHandler clientHandler;
    private final ScheduledExecutorService responseFutureExecutor;

    public NettyRemotingClient(NettyClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        this.workerGroup = NettyUtils.useEpoll() ? new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory(){
            private final AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
            }
        }) : new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory(){
            private final AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
            }
        });
        this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(1000), new NamedThreadFactory("CallbackExecutor", 10), new CallerThreadExecutePolicy());
        this.clientHandler = new NettyClientHandler(this, this.callbackExecutor);
        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
        this.start();
    }

    private void start() {
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(this.workerGroup)).channel(NettyUtils.getSocketChannelClass())).option(ChannelOption.SO_KEEPALIVE, (Object)this.clientConfig.isSoKeepalive())).option(ChannelOption.TCP_NODELAY, (Object)this.clientConfig.isTcpNoDelay())).option(ChannelOption.SO_SNDBUF, (Object)this.clientConfig.getSendBufferSize())).option(ChannelOption.SO_RCVBUF, (Object)this.clientConfig.getReceiveBufferSize())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.clientConfig.getConnectTimeoutMillis())).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast("client-idle-handler", (ChannelHandler)new IdleStateHandler(6000L, 0L, 0L, TimeUnit.MILLISECONDS)).addLast(new ChannelHandler[]{new NettyDecoder(), NettyRemotingClient.this.clientHandler, NettyRemotingClient.this.encoder});
            }
        });
        this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000L, 1000L, TimeUnit.MILLISECONDS);
        this.isStarted.compareAndSet(false, true);
    }

    public void sendAsync(Host host, Command command, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
        Channel channel = this.getChannel(host);
        if (channel == null) {
            throw new RemotingException("network error");
        }
        long opaque = command.getOpaque();
        boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
            ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, releaseSemaphore);
            try {
                channel.writeAndFlush((Object)command).addListener(future -> {
                    if (future.isSuccess()) {
                        responseFuture.setSendOk(true);
                        return;
                    }
                    responseFuture.setSendOk(false);
                    responseFuture.setCause(future.cause());
                    responseFuture.putResponse(null);
                    try {
                        responseFuture.executeInvokeCallback();
                    }
                    catch (Exception ex) {
                        this.logger.error("execute callback error", (Throwable)ex);
                    }
                    finally {
                        responseFuture.release();
                    }
                });
            }
            catch (Exception ex) {
                responseFuture.release();
                throw new RemotingException(String.format("send command to host: %s failed", host), ex);
            }
        } else {
            String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits());
            throw new RemotingTooMuchRequestException(message);
        }
    }

    public Command sendSync(Host host, Command command, long timeoutMillis) throws InterruptedException, RemotingException {
        Channel channel = this.getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        long opaque = command.getOpaque();
        ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
        channel.writeAndFlush((Object)command).addListener(future -> {
            if (future.isSuccess()) {
                responseFuture.setSendOk(true);
                return;
            }
            responseFuture.setSendOk(false);
            responseFuture.setCause(future.cause());
            responseFuture.putResponse(null);
            this.logger.error("send command {} to host {} failed", (Object)command, (Object)host);
        });
        Command result = responseFuture.waitResponse();
        if (result == null) {
            if (responseFuture.isSendOK()) {
                throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
            }
            throw new RemotingException(host.toString(), responseFuture.getCause());
        }
        return result;
    }

    public void send(Host host, Command command) throws RemotingException {
        Channel channel = this.getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        try {
            ChannelFuture future = channel.writeAndFlush((Object)command).await();
            if (!future.isSuccess()) {
                String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
                this.logger.error(msg, future.cause());
                throw new RemotingException(msg);
            }
            this.logger.debug("send command : {} , to : {} successfully.", (Object)command, (Object)host.getAddress());
        }
        catch (Exception e) {
            this.logger.error("Send command {} to address {} encounter error.", (Object)command, (Object)host.getAddress());
            throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
        }
    }

    public void registerProcessor(CommandType commandType, NettyRequestProcessor processor) {
        this.registerProcessor(commandType, processor, null);
    }

    public void registerProcessor(CommandType commandType, NettyRequestProcessor processor, ExecutorService executor) {
        this.clientHandler.registerProcessor(commandType, processor, executor);
    }

    public Channel getChannel(Host host) {
        Channel channel = this.channels.get(host);
        if (channel != null && channel.isActive()) {
            return channel;
        }
        return this.createChannel(host, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Channel createChannel(Host host, boolean isSync) {
        try {
            ChannelFuture future;
            Bootstrap bootstrap = this.bootstrap;
            synchronized (bootstrap) {
                future = this.bootstrap.connect((SocketAddress)new InetSocketAddress(host.getIp(), host.getPort()));
            }
            if (isSync) {
                future.sync();
            }
            if (future.isSuccess()) {
                Channel channel = future.channel();
                this.channels.put(host, channel);
                return channel;
            }
        }
        catch (Exception ex) {
            this.logger.warn(String.format("connect to %s error", host), (Throwable)ex);
        }
        return null;
    }

    public void close() {
        if (this.isStarted.compareAndSet(true, false)) {
            try {
                this.closeChannels();
                if (this.workerGroup != null) {
                    this.workerGroup.shutdownGracefully();
                }
                if (this.callbackExecutor != null) {
                    this.callbackExecutor.shutdownNow();
                }
                if (this.responseFutureExecutor != null) {
                    this.responseFutureExecutor.shutdownNow();
                }
            }
            catch (Exception ex) {
                this.logger.error("netty client close exception", (Throwable)ex);
            }
            this.logger.info("netty client closed");
        }
    }

    private void closeChannels() {
        for (Channel channel : this.channels.values()) {
            channel.close();
        }
        this.channels.clear();
    }

    public void closeChannel(Host host) {
        Channel channel = this.channels.remove(host);
        if (channel != null) {
            channel.close();
        }
    }
}

