/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.rpc.remote;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.apache.dolphinscheduler.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.rpc.codec.NettyDecoder;
import org.apache.dolphinscheduler.rpc.codec.NettyEncoder;
import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import org.apache.dolphinscheduler.rpc.remote.NettyClientHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClient {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyClient.class);
    private final EventLoopGroup workerGroup;
    private final NettyClientConfig clientConfig;
    private final Bootstrap bootstrap = new Bootstrap();
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);

    public static NettyClient getInstance() {
        return NettyClientInner.INSTANCE;
    }

    private Channel getChannel(Host host) {
        Channel channel = this.channels.get(host);
        if (channel != null && channel.isActive()) {
            return channel;
        }
        return this.createChannel(host, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Channel createChannel(Host host, boolean isSync) {
        try {
            ChannelFuture future;
            Bootstrap bootstrap = this.bootstrap;
            synchronized (bootstrap) {
                future = this.bootstrap.connect((SocketAddress)new InetSocketAddress(host.getIp(), host.getPort()));
            }
            if (isSync) {
                future.sync();
            }
            if (future.isSuccess()) {
                Channel channel = future.channel();
                this.channels.put(host, channel);
                return channel;
            }
        }
        catch (Exception ex) {
            log.warn(String.format("connect to %s error", host), (Throwable)ex);
        }
        return null;
    }

    private NettyClient(NettyClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        this.workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory(){
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
            }
        }) : new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory(){
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
            }
        });
        this.start();
    }

    private void start() {
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(this.workerGroup)).channel(NettyUtils.getSocketChannelClass())).option(ChannelOption.SO_KEEPALIVE, (Object)this.clientConfig.isSoKeepalive())).option(ChannelOption.TCP_NODELAY, (Object)this.clientConfig.isTcpNoDelay())).option(ChannelOption.SO_SNDBUF, (Object)this.clientConfig.getSendBufferSize())).option(ChannelOption.SO_RCVBUF, (Object)this.clientConfig.getReceiveBufferSize())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.clientConfig.getConnectTimeoutMillis())).handler((ChannelHandler)new LoggingHandler(LogLevel.DEBUG))).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new ChannelHandler[]{new NettyEncoder()}).addLast(new ChannelHandler[]{new NettyDecoder(RpcResponse.class)}).addLast("client-idle-handler", (ChannelHandler)new IdleStateHandler(6000L, 0L, 0L, TimeUnit.MILLISECONDS)).addLast(new ChannelHandler[]{new NettyClientHandler()});
            }
        });
        this.isStarted.compareAndSet(false, true);
    }

    public RpcResponse sendMsg(Host host, RpcProtocol<RpcRequest> protocol, Boolean async) {
        Channel channel = this.getChannel(host);
        assert (channel != null);
        RpcRequest request = protocol.getBody();
        RpcRequestCache rpcRequestCache = new RpcRequestCache();
        String serviceName = request.getClassName() + request.getMethodName();
        rpcRequestCache.setServiceName(serviceName);
        long reqId = protocol.getMsgHeader().getRequestId();
        RpcFuture future = null;
        if (Boolean.FALSE.equals(async)) {
            future = new RpcFuture(request, reqId);
            rpcRequestCache.setRpcFuture(future);
        }
        RpcRequestTable.put(protocol.getMsgHeader().getRequestId(), rpcRequestCache);
        channel.writeAndFlush(protocol);
        RpcResponse result = null;
        if (Boolean.TRUE.equals(async)) {
            result = new RpcResponse();
            result.setStatus((byte)0);
            result.setResult(true);
            return result;
        }
        try {
            assert (future != null);
            result = future.get();
        }
        catch (InterruptedException e) {
            log.error("send msg error\uff0cservice name is {}", (Object)serviceName, (Object)e);
            Thread.currentThread().interrupt();
        }
        return result;
    }

    public void close() {
        if (this.isStarted.compareAndSet(true, false)) {
            try {
                this.closeChannels();
                if (this.workerGroup != null) {
                    this.workerGroup.shutdownGracefully();
                }
            }
            catch (Exception ex) {
                log.error("netty client close exception", (Throwable)ex);
            }
            log.info("netty client closed");
        }
    }

    private void closeChannels() {
        for (Channel channel : this.channels.values()) {
            channel.close();
        }
        this.channels.clear();
    }

    private static class NettyClientInner {
        private static final NettyClient INSTANCE = new NettyClient(new NettyClientConfig());

        private NettyClientInner() {
        }
    }
}

