/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.remote.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import lombok.Generated;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class NettyClientHandler
extends ChannelInboundHandlerAdapter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
    private final NettyRemotingClient nettyRemotingClient;
    private static byte[] heartBeatData = "heart_beat".getBytes();
    private final ExecutorService callbackExecutor;
    private final ConcurrentHashMap<MessageType, Pair<NettyRequestProcessor, ExecutorService>> processors;
    private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);

    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
        this.nettyRemotingClient = nettyRemotingClient;
        this.callbackExecutor = callbackExecutor;
        this.processors = new ConcurrentHashMap();
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
        ctx.channel().close();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        this.processReceived(ctx.channel(), (Message)msg);
    }

    public void registerProcessor(MessageType messageType, NettyRequestProcessor processor) {
        this.registerProcessor(messageType, processor, null);
    }

    public void registerProcessor(MessageType messageType, NettyRequestProcessor processor, ExecutorService executor) {
        ExecutorService executorRef = executor;
        if (executorRef == null) {
            executorRef = this.defaultExecutor;
        }
        this.processors.putIfAbsent(messageType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
    }

    private void processReceived(Channel channel, Message message) {
        ResponseFuture future = ResponseFuture.getFuture(message.getOpaque());
        if (future != null) {
            future.setResponseCommand(message);
            future.release();
            if (future.getInvokeCallback() != null) {
                future.removeFuture();
                this.callbackExecutor.submit(future::executeInvokeCallback);
            } else {
                future.putResponse(message);
            }
        } else {
            this.processByCommandType(channel, message);
        }
    }

    public void processByCommandType(Channel channel, Message message) {
        Pair<NettyRequestProcessor, ExecutorService> pair = this.processors.get((Object)message.getType());
        if (pair != null) {
            Runnable run = () -> {
                try {
                    ((NettyRequestProcessor)pair.getLeft()).process(channel, message);
                }
                catch (Exception e) {
                    log.error(String.format("process command %s exception", message), (Throwable)e);
                }
            };
            try {
                pair.getRight().submit(run);
            }
            catch (RejectedExecutionException e) {
                log.warn("thread pool is full, discard command {} from {}", (Object)message, (Object)ChannelUtils.getRemoteAddress(channel));
            }
        } else {
            log.warn("receive response {}, but not matched any request ", (Object)message);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("exceptionCaught : {}", (Object)cause.getMessage(), (Object)cause);
        this.nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
        ctx.channel().close();
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            Message heartBeat = new Message();
            heartBeat.setType(MessageType.HEART_BEAT);
            heartBeat.setBody(heartBeatData);
            ctx.channel().writeAndFlush((Object)heartBeat).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
            if (log.isDebugEnabled()) {
                log.debug("Client send heart beat to: {}", (Object)ChannelUtils.getRemoteAddress(ctx.channel()));
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

