/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.GenericFutureListener;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.thread.PositiveAtomicCounter;
import io.seata.core.protocol.MessageFuture;
import io.seata.core.protocol.MessageTypeAware;
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.Disposable;
import io.seata.core.rpc.hook.RpcHook;
import io.seata.core.rpc.netty.ChannelUtil;
import io.seata.core.rpc.netty.NettyClientConfig;
import io.seata.core.rpc.processor.Pair;
import io.seata.core.rpc.processor.RemotingProcessor;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class AbstractNettyRemoting
implements Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemoting.class);
    protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("timeoutChecker", 1, true));
    protected final ThreadPoolExecutor messageExecutor;
    protected final PositiveAtomicCounter idGenerator = new PositiveAtomicCounter();
    protected final ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap();
    private static final long NOT_WRITEABLE_CHECK_MILLS = 10L;
    protected volatile long nowMills = 0L;
    private static final int TIMEOUT_CHECK_INTERVAL = 3000;
    protected final Object lock = new Object();
    protected volatile boolean isSending = false;
    private String group = "DEFAULT";
    protected final HashMap<Integer, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap(32);
    protected final List<RpcHook> rpcHooks = EnhancedServiceLoader.loadAll(RpcHook.class);
    boolean allowDumpStack = false;

    public void init() {
        this.timerExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                for (Map.Entry<Integer, MessageFuture> entry : AbstractNettyRemoting.this.futures.entrySet()) {
                    MessageFuture future = entry.getValue();
                    if (!future.isTimeout()) continue;
                    AbstractNettyRemoting.this.futures.remove(entry.getKey());
                    RpcMessage rpcMessage = future.getRequestMessage();
                    future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                    if (!LOGGER.isDebugEnabled()) continue;
                    LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                }
                AbstractNettyRemoting.this.nowMills = System.currentTimeMillis();
            }
        }, 3000L, 3000L, TimeUnit.MILLISECONDS);
    }

    public AbstractNettyRemoting(ThreadPoolExecutor messageExecutor) {
        this.messageExecutor = messageExecutor;
    }

    public int getNextMessageId() {
        return this.idGenerator.incrementAndGet();
    }

    public ConcurrentHashMap<Integer, MessageFuture> getFutures() {
        return this.futures;
    }

    public String getGroup() {
        return this.group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public void destroyChannel(Channel channel) {
        this.destroyChannel(this.getAddressFromChannel(channel), channel);
    }

    @Override
    public void destroy() {
        this.timerExecutor.shutdown();
        this.messageExecutor.shutdown();
    }

    protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
        if (timeoutMillis <= 0L) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        if (channel == null) {
            LOGGER.warn("sendSync nothing, caused by null channel.");
            return null;
        }
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        this.futures.put(rpcMessage.getId(), messageFuture);
        this.channelWritableCheck(channel, rpcMessage.getBody());
        String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
        this.doBeforeRpcHooks(remoteAddr, rpcMessage);
        channel.writeAndFlush((Object)rpcMessage).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            if (!future.isSuccess()) {
                MessageFuture messageFuture1 = this.futures.remove(rpcMessage.getId());
                if (messageFuture1 != null) {
                    messageFuture1.setResultMessage(future.cause());
                }
                this.destroyChannel(future.channel());
            }
        }));
        try {
            Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            this.doAfterRpcHooks(remoteAddr, rpcMessage, result);
            return result;
        }
        catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}", new Object[]{exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody()});
            if (exx instanceof TimeoutException) {
                throw (TimeoutException)exx;
            }
            throw new RuntimeException(exx);
        }
    }

    protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
        this.channelWritableCheck(channel, rpcMessage.getBody());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        this.doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
        channel.writeAndFlush((Object)rpcMessage).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            if (!future.isSuccess()) {
                this.destroyChannel(future.channel());
            }
        }));
    }

    protected RpcMessage buildRequestMessage(Object msg, byte messageType) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(this.getNextMessageId());
        rpcMessage.setMessageType(messageType);
        rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
        rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
        rpcMessage.setBody(msg);
        return rpcMessage;
    }

    protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) {
        RpcMessage rpcMsg = new RpcMessage();
        rpcMsg.setMessageType(messageType);
        rpcMsg.setCodec(rpcMessage.getCodec());
        rpcMsg.setCompressor(rpcMessage.getCompressor());
        rpcMsg.setBody(msg);
        rpcMsg.setId(rpcMessage.getId());
        return rpcMsg;
    }

    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        block13: {
            Object body;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
            }
            if ((body = rpcMessage.getBody()) instanceof MessageTypeAware) {
                MessageTypeAware messageTypeAware = (MessageTypeAware)body;
                Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get(messageTypeAware.getTypeCode());
                if (pair != null) {
                    if (pair.getSecond() != null) {
                        try {
                            pair.getSecond().execute(() -> {
                                try {
                                    ((RemotingProcessor)pair.getFirst()).process(ctx, rpcMessage);
                                }
                                catch (Throwable th) {
                                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), (Object)th.getMessage(), (Object)th);
                                }
                                finally {
                                    MDC.clear();
                                }
                            });
                        }
                        catch (RejectedExecutionException e) {
                            LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), (Object)("thread pool is full, current max pool size is " + this.messageExecutor.getActiveCount()));
                            if (!this.allowDumpStack) break block13;
                            String name = ManagementFactory.getRuntimeMXBean().getName();
                            String pid = name.split("@")[0];
                            long idx = System.currentTimeMillis();
                            try {
                                String jstackFile = idx + ".log";
                                LOGGER.info("jstack command will dump to " + jstackFile);
                                Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                            }
                            catch (IOException exx) {
                                LOGGER.error(exx.getMessage());
                            }
                            this.allowDumpStack = false;
                        }
                    } else {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        }
                        catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), (Object)th.getMessage(), (Object)th);
                        }
                    }
                } else {
                    LOGGER.error("This message type [{}] has no processor.", (Object)messageTypeAware.getTypeCode());
                }
            } else {
                LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
            }
        }
    }

    protected String getAddressFromContext(ChannelHandlerContext ctx) {
        return this.getAddressFromChannel(ctx.channel());
    }

    protected String getAddressFromChannel(Channel channel) {
        SocketAddress socketAddress = channel.remoteAddress();
        String address = socketAddress.toString();
        if (socketAddress.toString().indexOf(NettyClientConfig.getSocketAddressStartChar()) == 0) {
            address = socketAddress.toString().substring(NettyClientConfig.getSocketAddressStartChar().length());
        }
        return address;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void channelWritableCheck(Channel channel, Object msg) {
        int tryTimes = 0;
        Object object = this.lock;
        synchronized (object) {
            while (!channel.isWritable()) {
                try {
                    if (++tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {
                        this.destroyChannel(channel);
                        throw new FrameworkException("msg:" + (msg == null ? "null" : msg.toString()), FrameworkErrorCode.ChannelIsNotWritable);
                    }
                    this.lock.wait(10L);
                }
                catch (InterruptedException exx) {
                    LOGGER.error(exx.getMessage());
                }
            }
        }
    }

    public abstract void destroyChannel(String var1, Channel var2);

    protected void doBeforeRpcHooks(String remoteAddr, RpcMessage request) {
        for (RpcHook rpcHook : this.rpcHooks) {
            rpcHook.doBeforeRequest(remoteAddr, request);
        }
    }

    protected void doAfterRpcHooks(String remoteAddr, RpcMessage request, Object response) {
        for (RpcHook rpcHook : this.rpcHooks) {
            rpcHook.doAfterResponse(remoteAddr, request, response);
        }
    }
}

