/*
 * Decompiled with CFR 0.152.
 */
package org.tron.common.overlay.server;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.tron.common.overlay.message.Message;
import org.tron.common.overlay.message.PingMessage;
import org.tron.common.overlay.message.PongMessage;
import org.tron.common.overlay.server.Channel;
import org.tron.common.overlay.server.MessageRoundtrip;
import org.tron.core.net.message.InventoryMessage;
import org.tron.core.net.message.TransactionsMessage;
import org.tron.protos.Protocol;

@Component
@Scope(value="prototype")
public class MessageQueue {
    private static final Logger logger = LoggerFactory.getLogger((String)"MessageQueue");
    private volatile boolean sendMsgFlag = false;
    private volatile long sendTime;
    private Thread sendMsgThread;
    private Channel channel;
    private ChannelHandlerContext ctx = null;
    private Queue<MessageRoundtrip> requestQueue = new ConcurrentLinkedQueue<MessageRoundtrip>();
    private BlockingQueue<Message> msgQueue = new LinkedBlockingQueue<Message>();
    private static ScheduledExecutorService sendTimer = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "sendTimer"));
    private ScheduledFuture<?> sendTask;

    public void activate(ChannelHandlerContext ctx) {
        this.ctx = ctx;
        this.sendMsgFlag = true;
        this.sendTask = sendTimer.scheduleAtFixedRate(() -> {
            try {
                if (this.sendMsgFlag) {
                    this.send();
                }
            }
            catch (Exception e) {
                logger.error("Unhandled exception", (Throwable)e);
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS);
        this.sendMsgThread = new Thread(() -> {
            while (this.sendMsgFlag) {
                try {
                    if (this.msgQueue.isEmpty()) {
                        Thread.sleep(10L);
                        continue;
                    }
                    Message msg = this.msgQueue.take();
                    ctx.writeAndFlush((Object)msg.getSendData()).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                        if (!future.isSuccess()) {
                            logger.error("Fail send to {}, {}", (Object)ctx.channel().remoteAddress(), (Object)msg);
                        }
                    }));
                }
                catch (Exception e) {
                    logger.error("Fail send to {}, error info: {}", (Object)ctx.channel().remoteAddress(), (Object)e.getMessage());
                }
            }
        });
        this.sendMsgThread.setName("sendMsgThread-" + ctx.channel().remoteAddress());
        this.sendMsgThread.start();
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    public boolean sendMessage(Message msg) {
        if (msg instanceof PingMessage && this.sendTime > System.currentTimeMillis() - 10000L) {
            return false;
        }
        if (this.needToLog(msg)) {
            logger.info("Send to {}, {} ", (Object)this.ctx.channel().remoteAddress(), (Object)msg);
        }
        this.channel.getNodeStatistics().messageStatistics.addTcpOutMessage(msg);
        this.sendTime = System.currentTimeMillis();
        if (msg.getAnswerMessage() != null) {
            this.requestQueue.add(new MessageRoundtrip(msg));
        } else {
            this.msgQueue.offer(msg);
        }
        return true;
    }

    public void receivedMessage(Message msg) {
        if (this.needToLog(msg)) {
            logger.info("Receive from {}, {}", (Object)this.ctx.channel().remoteAddress(), (Object)msg);
        }
        this.channel.getNodeStatistics().messageStatistics.addTcpInMessage(msg);
        MessageRoundtrip messageRoundtrip = this.requestQueue.peek();
        if (messageRoundtrip != null && messageRoundtrip.getMsg().getAnswerMessage() == msg.getClass()) {
            this.requestQueue.remove();
        }
    }

    public void close() {
        this.sendMsgFlag = false;
        if (this.sendTask != null && !this.sendTask.isCancelled()) {
            this.sendTask.cancel(false);
            this.sendTask = null;
        }
        if (this.sendMsgThread != null) {
            try {
                this.sendMsgThread.join(20L);
                this.sendMsgThread = null;
            }
            catch (Exception e) {
                logger.warn("Join send thread failed, peer {}", (Object)this.ctx.channel().remoteAddress());
            }
        }
    }

    private boolean needToLog(Message msg) {
        if (msg instanceof PingMessage || msg instanceof PongMessage || msg instanceof TransactionsMessage) {
            return false;
        }
        return !(msg instanceof InventoryMessage) || !((InventoryMessage)msg).getInventoryType().equals((Object)Protocol.Inventory.InventoryType.TRX);
    }

    private void send() {
        MessageRoundtrip messageRoundtrip = this.requestQueue.peek();
        if (!this.sendMsgFlag || messageRoundtrip == null) {
            return;
        }
        if (messageRoundtrip.getRetryTimes() > 0L && !messageRoundtrip.hasToRetry()) {
            return;
        }
        if (messageRoundtrip.getRetryTimes() > 0L) {
            this.channel.getNodeStatistics().nodeDisconnectedLocal(Protocol.ReasonCode.PING_TIMEOUT);
            logger.warn("Wait {} timeout. close channel {}.", messageRoundtrip.getMsg().getAnswerMessage(), (Object)this.ctx.channel().remoteAddress());
            this.channel.close();
            return;
        }
        Message msg = messageRoundtrip.getMsg();
        this.ctx.writeAndFlush((Object)msg.getSendData()).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            if (!future.isSuccess()) {
                logger.error("Fail send to {}, {}", (Object)this.ctx.channel().remoteAddress(), (Object)msg);
            }
        }));
        messageRoundtrip.incRetryTimes();
        messageRoundtrip.saveTime();
    }
}

