/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.remote.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class NettyClientHandler
extends ChannelInboundHandlerAdapter {
    private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
    private final NettyRemotingClient nettyRemotingClient;
    private static byte[] heartBeatData = "heart_beat".getBytes();
    private final ExecutorService callbackExecutor;
    private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors;
    private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);

    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
        this.nettyRemotingClient = nettyRemotingClient;
        this.callbackExecutor = callbackExecutor;
        this.processors = new ConcurrentHashMap();
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
        ctx.channel().close();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        this.processReceived(ctx.channel(), (Command)msg);
    }

    public void registerProcessor(CommandType commandType, NettyRequestProcessor processor) {
        this.registerProcessor(commandType, processor, null);
    }

    public void registerProcessor(CommandType commandType, NettyRequestProcessor processor, ExecutorService executor) {
        ExecutorService executorRef = executor;
        if (executorRef == null) {
            executorRef = this.defaultExecutor;
        }
        this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
    }

    private void processReceived(Channel channel, Command command) {
        ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
        if (future != null) {
            future.setResponseCommand(command);
            future.release();
            if (future.getInvokeCallback() != null) {
                this.callbackExecutor.submit(future::executeInvokeCallback);
            } else {
                future.putResponse(command);
            }
        } else {
            this.processByCommandType(channel, command);
        }
    }

    public void processByCommandType(Channel channel, Command command) {
        Pair<NettyRequestProcessor, ExecutorService> pair = this.processors.get((Object)command.getType());
        if (pair != null) {
            Runnable run = () -> {
                try {
                    ((NettyRequestProcessor)pair.getLeft()).process(channel, command);
                }
                catch (Exception e) {
                    this.logger.error(String.format("process command %s exception", command), (Throwable)e);
                }
            };
            try {
                pair.getRight().submit(run);
            }
            catch (RejectedExecutionException e) {
                this.logger.warn("thread pool is full, discard command {} from {}", (Object)command, (Object)ChannelUtils.getRemoteAddress(channel));
            }
        } else {
            this.logger.warn("receive response {}, but not matched any request ", (Object)command);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        this.logger.error("exceptionCaught : {}", (Object)cause.getMessage(), (Object)cause);
        this.nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
        ctx.channel().close();
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            Command heartBeat = new Command();
            heartBeat.setType(CommandType.HEART_BEAT);
            heartBeat.setBody(heartBeatData);
            ctx.channel().writeAndFlush((Object)heartBeat).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Client send heart beat to: {}", (Object)ChannelUtils.getRemoteAddress(ctx.channel()));
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

