/*
 * Decompiled with CFR 0.152.
 */
package org.fisco.bcos.sdk.channel;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.ChannelMsgHandler;
import org.fisco.bcos.sdk.channel.ChannelVersionNegotiation;
import org.fisco.bcos.sdk.channel.PeerSelectRule;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.channel.model.ChannelMessageError;
import org.fisco.bcos.sdk.channel.model.ChannelPrococolExceiption;
import org.fisco.bcos.sdk.channel.model.HeartBeatParser;
import org.fisco.bcos.sdk.channel.model.NodeHeartbeat;
import org.fisco.bcos.sdk.channel.model.Options;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.config.exceptions.ConfigException;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.network.ConnectionInfo;
import org.fisco.bcos.sdk.network.MsgHandler;
import org.fisco.bcos.sdk.network.Network;
import org.fisco.bcos.sdk.network.NetworkException;
import org.fisco.bcos.sdk.network.NetworkImp;
import org.fisco.bcos.sdk.utils.ChannelUtils;
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
import org.fisco.bcos.sdk.utils.ThreadPoolService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChannelImp
implements Channel {
    private static Logger logger = LoggerFactory.getLogger(ChannelImp.class);
    private Integer connectSeconds = 30;
    private Integer connectSleepPerMillis = 30;
    private boolean running = false;
    private ChannelMsgHandler msgHandler;
    private Network network;
    private Map<String, List<String>> groupId2PeerIpPortList;
    private Timer timeoutHandler = new HashedWheelTimer();
    private long heartBeatDelay = 2000L;
    private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

    public ChannelImp(ConfigOption configOption) throws ConfigException {
        this.msgHandler = new ChannelMsgHandler();
        this.network = new NetworkImp(configOption, this.msgHandler);
    }

    @Override
    public Network getNetwork() {
        return this.network;
    }

    @Override
    public void start() {
        try {
            if (this.running) {
                logger.warn("The channel has already been started!");
            }
            this.network.start();
            this.checkConnectionsToStartPeriodTask();
            this.running = true;
            logger.debug("====> Start the channel success");
        }
        catch (NetworkException e) {
            this.network.stop();
            logger.error("====> init channel network error, {} ", (Object)e.getMessage());
            throw new ChannelException("init channel network error!\n" + e.getMessage(), (Throwable)e);
        }
    }

    private void checkConnectionsToStartPeriodTask() {
        try {
            for (int sleepTime = 0; this.getAvailablePeer().size() <= 0 && sleepTime <= this.connectSeconds * 1000; sleepTime += this.connectSleepPerMillis.intValue()) {
                Thread.sleep(this.connectSleepPerMillis.intValue());
            }
            List<String> peers = this.getAvailablePeer();
            String connectionInfoStr = "";
            for (String peer : peers) {
                connectionInfoStr = connectionInfoStr + peer + ", ";
            }
            String baseMessage = " nodes: " + connectionInfoStr;
            if (this.getAvailablePeer().size() == 0) {
                String errorMessage = " Failed to connect to " + baseMessage;
                logger.error(errorMessage);
                throw new Exception(errorMessage);
            }
            logger.info(" Connect to " + baseMessage);
            this.startPeriodTask();
        }
        catch (InterruptedException e) {
            logger.warn(" thread interrupted exception: ", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            logger.error(" service init failed, error message: {}, error: ", (Object)e.getMessage(), (Object)e);
        }
    }

    private void startPeriodTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(() -> this.broadcastHeartbeat(), 0L, this.heartBeatDelay, TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() {
        if (!this.running) {
            logger.warn("The channel has already been stopped!");
        }
        logger.debug("stop channel...");
        this.timeoutHandler.stop();
        ThreadPoolService.stopThreadPool(this.scheduledExecutorService);
        this.network.stop();
        Thread.currentThread().interrupt();
        this.running = false;
        logger.debug("stop channel succ...");
    }

    @Override
    public void addConnectHandler(MsgHandler handler) {
        this.msgHandler.addConnectHandler(handler);
    }

    @Override
    public void addEstablishHandler(MsgHandler handler) {
        this.msgHandler.addEstablishHandler(handler);
    }

    @Override
    public void addMessageHandler(MsgType type, MsgHandler handler) {
        this.msgHandler.addMessageHandler(type, handler);
    }

    @Override
    public void addDisconnectHandler(MsgHandler handler) {
        this.msgHandler.addDisconnectHandler(handler);
    }

    public void setGroupId2PeerIpPortList(Map<String, List<String>> groupId2PeerIpPortList) {
        this.groupId2PeerIpPortList = groupId2PeerIpPortList;
    }

    @Override
    public void broadcastToGroup(Message out, String groupId) {
        List<String> peerIpPortList = this.groupId2PeerIpPortList.get(groupId);
        for (String peerIpPort : peerIpPortList) {
            if (!this.msgHandler.getAvailablePeer().containsKey(peerIpPort)) continue;
            this.sendToPeer(out, peerIpPort);
        }
    }

    @Override
    public void broadcast(Message out) {
        this.msgHandler.getAvailablePeer().forEach((peer, ctx) -> {
            ctx.writeAndFlush((Object)out);
            logger.trace("send message to {} success ", peer);
        });
    }

    @Override
    public Response sendToPeer(Message out, String peerIpPort) {
        Options options = new Options();
        options.setTimeout(10000L);
        return this.sendToPeerWithTimeOut(out, peerIpPort, options);
    }

    public void waitResponse(Callback callback, Options options) {
        try {
            callback.semaphore.acquire(1);
        }
        catch (InterruptedException e) {
            logger.error("waitResponse exception, error info: {}", (Object)e.getMessage());
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public Response sendToPeerWithTimeOut(Message out, String peerIpPort, Options options) {
        Callback callback = new Callback();
        this.asyncSendToPeer(out, peerIpPort, callback, options);
        this.waitResponse(callback, options);
        return callback.retResponse;
    }

    @Override
    public Response sendToRandomWithTimeOut(Message out, Options options) {
        Callback callback = new Callback();
        this.asyncSendToRandom(out, callback, options);
        this.waitResponse(callback, options);
        return callback.retResponse;
    }

    @Override
    public Response sendToPeerByRuleWithTimeOut(Message out, PeerSelectRule rule, Options options) {
        Callback callback = new Callback();
        this.asyncSendToPeerByRule(out, rule, callback, options);
        this.waitResponse(callback, options);
        return callback.retResponse;
    }

    @Override
    public void asyncSendToPeer(final Message out, String peerIpPort, final ResponseCallback callback, Options options) {
        ChannelHandlerContext ctx = null;
        if (this.msgHandler.getAvailablePeer() != null) {
            ctx = this.msgHandler.getAvailablePeer().get(peerIpPort);
        }
        if (ctx != null) {
            if (callback == null) {
                ctx.writeAndFlush((Object)out);
                return;
            }
            this.msgHandler.addSeq2CallBack(out.getSeq(), callback);
            if (options.getTimeout() > 0L) {
                callback.setTimeout(this.timeoutHandler.newTimeout(new TimerTask(){

                    public void run(Timeout timeout) {
                        callback.onTimeout();
                        ChannelImp.this.msgHandler.removeSeq(out.getSeq());
                    }
                }, options.getTimeout(), TimeUnit.MILLISECONDS));
            }
            ctx.writeAndFlush((Object)out);
            logger.trace("send message {} to {} success ", (Object)out.getSeq(), (Object)peerIpPort);
        } else {
            logger.warn("send message with seq {} to {} failed ", (Object)out.getSeq(), (Object)peerIpPort);
            Response response = new Response();
            response.setErrorCode(ChannelMessageError.CONNECTION_INVALID.getError());
            String errorContent = "Send message " + peerIpPort + " failed for connect failed, current available peers: " + this.getAvailablePeer().toString();
            response.setErrorMessage(errorContent);
            response.setContent(errorContent);
            response.setMessageID(out.getSeq());
            if (callback != null) {
                callback.onResponse(response);
            }
        }
    }

    @Override
    public void asyncSendToRandom(Message out, ResponseCallback callback, Options options) {
        List<String> peerList = this.getAvailablePeer();
        if (peerList.size() == 0) {
            logger.warn("no available peer to send to, seq: {}, type: {}", (Object)out.getSeq(), (Object)out.getType());
            return;
        }
        int random = (int)(Math.random() * (double)peerList.size());
        String peerIpPort = peerList.get(random);
        logger.trace("send message to random peer {} ", (Object)peerIpPort);
        this.asyncSendToPeer(out, peerIpPort, callback, options);
    }

    @Override
    public void asyncSendToPeerByRule(Message out, PeerSelectRule rule, ResponseCallback callback, Options options) {
        String target = rule.select(this.getConnectionInfo());
        this.asyncSendToPeer(out, target, callback, options);
    }

    @Override
    public List<ConnectionInfo> getConnectionInfo() {
        return this.network.getConnectionInfo();
    }

    @Override
    public List<String> getAvailablePeer() {
        ArrayList<String> peerList = new ArrayList<String>();
        this.msgHandler.getAvailablePeer().forEach((peer, ctx) -> peerList.add((String)peer));
        return peerList;
    }

    private void broadcastHeartbeat() {
        try {
            this.msgHandler.getAvailablePeer().forEach((peer, ctx) -> {
                this.sendHeartbeatMessage((ChannelHandlerContext)ctx);
                logger.trace("broadcastHeartbeat to {} success ", peer);
            });
        }
        catch (Exception e) {
            logger.error("broadcastHeartbeat failed, error info: {}", (Object)e.getMessage());
        }
    }

    public void sendHeartbeatMessage(final ChannelHandlerContext ctx) {
        String seq = ChannelUtils.newSeq();
        Message message = new Message();
        try {
            message.setSeq(seq);
            message.setResult(0);
            message.setType((short)MsgType.CLIENT_HEARTBEAT.getType());
            HeartBeatParser heartBeatParser = new HeartBeatParser(ChannelVersionNegotiation.getProtocolVersion(ctx));
            message.setData(heartBeatParser.encode("0"));
            logger.trace("encodeHeartbeatToMessage, seq: {}, content: {}, messageType: {}", new Object[]{message.getSeq(), heartBeatParser.toString(), message.getType()});
        }
        catch (JsonProcessingException e) {
            logger.error("sendHeartbeatMessage failed for decode the message exception, errorMessage: {}", (Object)e.getMessage());
            return;
        }
        ResponseCallback callback = new ResponseCallback(){

            @Override
            public void onResponse(Response response) {
                Boolean disconnect = true;
                try {
                    if (response.getErrorCode() != 0) {
                        logger.error(" channel protocol heartbeat request failed, code: {}, message: {}", (Object)response.getErrorCode(), (Object)response.getErrorMessage());
                        throw new ChannelPrococolExceiption(" channel protocol heartbeat request failed, code: " + response.getErrorCode() + ", message: " + response.getErrorMessage());
                    }
                    NodeHeartbeat nodeHeartbeat = (NodeHeartbeat)ObjectMapperFactory.getObjectMapper().readValue(response.getContent(), NodeHeartbeat.class);
                    int heartBeat = nodeHeartbeat.getHeartBeat();
                    logger.trace(" heartbeat packet, heartbeat is {} ", (Object)heartBeat);
                    disconnect = false;
                }
                catch (Exception e) {
                    logger.error(" channel protocol heartbeat failed, exception: {}", (Object)e.getMessage());
                }
                if (disconnect.booleanValue()) {
                    String host = ChannelVersionNegotiation.getPeerHost(ctx);
                    ChannelImp.this.network.removeConnection(host);
                }
            }
        };
        ctx.writeAndFlush((Object)message);
        this.msgHandler.addSeq2CallBack(seq, callback);
    }

    @Override
    public void setThreadPool(ExecutorService threadPool) {
        this.network.setMsgHandleThreadPool(threadPool);
    }

    class Callback
    extends ResponseCallback {
        public transient Response retResponse;
        public transient Semaphore semaphore = new Semaphore(1, true);

        Callback() {
            try {
                this.semaphore.acquire(1);
            }
            catch (InterruptedException e) {
                logger.error("error :", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }

        @Override
        public void onTimeout() {
            super.onTimeout();
            this.semaphore.release();
        }

        @Override
        public void onResponse(Response response) {
            this.retResponse = response;
            if (this.retResponse != null && this.retResponse.getContent() != null) {
                logger.trace("response: {}", (Object)this.retResponse.getContent());
            } else {
                logger.error("response is null");
            }
            this.semaphore.release();
        }
    }
}

