/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.rpc.remote;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import java.lang.reflect.InvocationTargetException;
import lombok.Generated;
import org.apache.dolphinscheduler.rpc.client.ConsumerConfig;
import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.rpc.protocol.EventType;
import org.apache.dolphinscheduler.rpc.protocol.MessageHeader;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class NettyClientHandler
extends ChannelInboundHandlerAdapter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
    private static final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE;

    public void channelInactive(ChannelHandlerContext ctx) {
        ctx.channel().close();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        RpcProtocol rpcProtocol = (RpcProtocol)msg;
        RpcResponse rsp = (RpcResponse)rpcProtocol.getBody();
        long reqId = rpcProtocol.getMsgHeader().getRequestId();
        RpcRequestCache rpcRequest = RpcRequestTable.get(reqId);
        if (null == rpcRequest) {
            log.warn("rpc read error,this request does not exist");
            return;
        }
        threadPoolManager.addExecuteTask(() -> this.readHandler(rsp, rpcRequest, reqId));
    }

    private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest, long reqId) {
        String serviceName = rpcRequest.getServiceName();
        ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
        if (Boolean.FALSE.equals(consumerConfig.getAsync())) {
            RpcFuture future = rpcRequest.getRpcFuture();
            RpcRequestTable.remove(reqId);
            future.done(rsp);
            return;
        }
        if (Boolean.FALSE.equals(consumerConfig.getCallBack())) {
            return;
        }
        if (rsp.getStatus() == 0) {
            try {
                consumerConfig.getServiceCallBackClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]).run(rsp.getResult());
            }
            catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                log.error("rpc service call back error,serviceName {},rsp {}", (Object)serviceName, (Object)rsp);
            }
        } else {
            log.error("rpc response error ,serviceName {},rsp {}", (Object)serviceName, (Object)rsp);
        }
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            RpcProtocol rpcProtocol = new RpcProtocol();
            MessageHeader messageHeader = new MessageHeader();
            messageHeader.setEventType(EventType.HEARTBEAT.getType());
            rpcProtocol.setMsgHeader(messageHeader);
            ctx.channel().writeAndFlush(rpcProtocol);
            log.debug("send heart beat msg...");
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("exceptionCaught : {}", (Object)cause.getMessage(), (Object)cause);
        ctx.channel().close();
    }
}

