/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.jraft.rpc.impl.core;

import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventProcessor;
import com.alipay.remoting.rpc.protocol.UserProcessor;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RaftServerService;
import com.alipay.sofa.jraft.rpc.RpcRequestClosure;
import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.alipay.sofa.jraft.rpc.impl.core.NodeRequestProcessor;
import com.alipay.sofa.jraft.util.Utils;
import com.google.protobuf.Message;
import io.netty.util.concurrent.DefaultEventExecutor;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.commons.lang.StringUtils;

public class AppendEntriesRequestProcessor
extends NodeRequestProcessor<RpcRequests.AppendEntriesRequest>
implements ConnectionEventProcessor {
    static final String PEER_ATTR = "jraft-peer";
    private final ConcurrentMap<String, ConcurrentMap<String, PeerRequestContext>> peerRequestContexts = new ConcurrentHashMap<String, ConcurrentMap<String, PeerRequestContext>>();
    private final UserProcessor.ExecutorSelector executorSelector = new PeerExecutorSelector();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendSequenceResponse(String groupId, String peerId, int seq, AsyncContext asyncContext, BizContext bizContext, Message msg) {
        Connection connection = bizContext.getConnection();
        PeerRequestContext ctx = this.getPeerRequestContext(groupId, peerId, connection);
        PriorityQueue respQueue = ctx.responseQueue;
        assert (respQueue != null);
        PriorityQueue priorityQueue = Utils.withLockObject(respQueue);
        synchronized (priorityQueue) {
            respQueue.add(new SequanceMessage(asyncContext, msg, seq));
            if (!ctx.hasTooManyPendingResponses()) {
                SequanceMessage queuedPipelinedResponse;
                while (!respQueue.isEmpty() && (queuedPipelinedResponse = (SequanceMessage)respQueue.peek()).sequence == this.getNextRequiredSequence(groupId, peerId, connection)) {
                    respQueue.remove();
                    try {
                        queuedPipelinedResponse.sendResponse();
                    }
                    finally {
                        this.getAndIncrementNextRequiredSequence(groupId, peerId, connection);
                    }
                }
            } else {
                LOG.warn("Closed connection to peer {}/{}, because of too many pending responses, queued={}, max={}", new Object[]{ctx.groupId, peerId, respQueue.size(), ctx.maxPendingResponses});
                connection.close();
                this.removePeerRequestContext(groupId, peerId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    PeerRequestContext getPeerRequestContext(String groupId, String peerId, Connection conn) {
        PeerRequestContext peerCtx;
        ConcurrentMap existsCtxs;
        ConcurrentMap<String, PeerRequestContext> groupContexts = (ConcurrentHashMap)this.peerRequestContexts.get(groupId);
        if (groupContexts == null && (existsCtxs = (ConcurrentMap)this.peerRequestContexts.putIfAbsent(groupId, groupContexts = new ConcurrentHashMap())) != null) {
            groupContexts = existsCtxs;
        }
        if ((peerCtx = (PeerRequestContext)groupContexts.get(peerId)) == null) {
            ConcurrentMap concurrentMap = Utils.withLockObject(groupContexts);
            synchronized (concurrentMap) {
                peerCtx = (PeerRequestContext)groupContexts.get(peerId);
                if (peerCtx == null) {
                    PeerId peer = new PeerId();
                    boolean parsed = peer.parse(peerId);
                    assert (parsed);
                    Node node = NodeManager.getInstance().get(groupId, peer);
                    assert (node != null);
                    peerCtx = new PeerRequestContext(groupId, peerId, node.getRaftOptions().getMaxReplicatorInflightMsgs());
                    groupContexts.put(peerId, peerCtx);
                }
            }
        }
        if (conn != null && conn.getAttribute(PEER_ATTR) == null) {
            conn.setAttribute(PEER_ATTR, (Object)peerId);
        }
        return peerCtx;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removePeerRequestContext(String groupId, String peerId) {
        ConcurrentMap groupContexts = (ConcurrentMap)this.peerRequestContexts.get(groupId);
        if (groupContexts == null) {
            return;
        }
        ConcurrentMap concurrentMap = Utils.withLockObject(groupContexts);
        synchronized (concurrentMap) {
            PeerRequestContext ctx = (PeerRequestContext)groupContexts.remove(peerId);
            if (ctx != null) {
                ctx.destroy();
            }
        }
    }

    public AppendEntriesRequestProcessor(Executor executor) {
        super(executor);
    }

    @Override
    protected String getPeerId(RpcRequests.AppendEntriesRequest request) {
        return request.getPeerId();
    }

    @Override
    protected String getGroupId(RpcRequests.AppendEntriesRequest request) {
        return request.getGroupId();
    }

    private int getAndIncrementSequence(String groupId, String peerId, Connection conn) {
        return this.getPeerRequestContext(groupId, peerId, conn).getAndIncrementSequence();
    }

    private int getNextRequiredSequence(String groupId, String peerId, Connection conn) {
        return this.getPeerRequestContext(groupId, peerId, conn).getNextRequiredSequence();
    }

    private int getAndIncrementNextRequiredSequence(String groupId, String peerId, Connection conn) {
        return this.getPeerRequestContext(groupId, peerId, conn).getAndIncrementNextRequiredSequence();
    }

    @Override
    public Message processRequest0(RaftServerService service, RpcRequests.AppendEntriesRequest request, RpcRequestClosure done) {
        Node node = (Node)((Object)service);
        if (node.getRaftOptions().isReplicatorPipeline()) {
            String peerId;
            String groupId = request.getGroupId();
            int reqSequence = this.getAndIncrementSequence(groupId, peerId = request.getPeerId(), done.getBizContext().getConnection());
            Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done, reqSequence, groupId, peerId));
            if (response != null) {
                this.sendSequenceResponse(groupId, peerId, reqSequence, done.getAsyncContext(), done.getBizContext(), response);
            }
            return null;
        }
        return service.handleAppendEntriesRequest(request, done);
    }

    public String interest() {
        return RpcRequests.AppendEntriesRequest.class.getName();
    }

    public UserProcessor.ExecutorSelector getExecutorSelector() {
        return this.executorSelector;
    }

    public void destroy() {
        for (ConcurrentMap map : this.peerRequestContexts.values()) {
            for (PeerRequestContext ctx : map.values()) {
                ctx.destroy();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(String remoteAddr, Connection conn) {
        PeerId peer = new PeerId();
        String peerAttr = (String)conn.getAttribute(PEER_ATTR);
        if (!StringUtils.isBlank((String)peerAttr) && peer.parse(peerAttr)) {
            for (Map.Entry entry : this.peerRequestContexts.entrySet()) {
                ConcurrentMap groupCtxs = (ConcurrentMap)entry.getValue();
                ConcurrentMap concurrentMap = Utils.withLockObject(groupCtxs);
                synchronized (concurrentMap) {
                    PeerRequestContext ctx = (PeerRequestContext)groupCtxs.remove(peer.toString());
                    if (ctx != null) {
                        ctx.destroy();
                    }
                }
            }
        } else {
            LOG.info("Connection disconnected: {}", (Object)remoteAddr);
        }
    }

    static class PeerRequestContext {
        private final String groupId;
        private final String peerId;
        private DefaultEventExecutor executor;
        private int sequence;
        private int nextRequiredSequence;
        private final PriorityQueue<SequanceMessage> responseQueue;
        private final int maxPendingResponses;

        public PeerRequestContext(String groupId, String peerId, int maxPendingResponses) {
            this.peerId = peerId;
            this.groupId = groupId;
            this.executor = new DefaultEventExecutor(JRaftUtils.createThreadFactory(groupId + "/" + peerId + "-AppendEntriesThread"));
            this.sequence = 0;
            this.nextRequiredSequence = 0;
            this.maxPendingResponses = maxPendingResponses;
            this.responseQueue = new PriorityQueue(50);
        }

        boolean hasTooManyPendingResponses() {
            return this.responseQueue.size() > this.maxPendingResponses;
        }

        int getAndIncrementSequence() {
            int prev = this.sequence++;
            if (this.sequence < 0) {
                this.sequence = 0;
            }
            return prev;
        }

        synchronized void destroy() {
            if (this.executor != null) {
                LOG.info("Destroyed peer request context for {}/{}", (Object)this.groupId, (Object)this.peerId);
                this.executor.shutdownGracefully();
                this.executor = null;
            }
        }

        int getNextRequiredSequence() {
            return this.nextRequiredSequence;
        }

        int getAndIncrementNextRequiredSequence() {
            int prev = this.nextRequiredSequence++;
            if (this.nextRequiredSequence < 0) {
                this.nextRequiredSequence = 0;
            }
            return prev;
        }
    }

    static class SequanceMessage
    implements Comparable<SequanceMessage> {
        public final Message msg;
        private final int sequence;
        private final AsyncContext asyncContex;

        public SequanceMessage(AsyncContext asyncContex, Message msg, int sequence) {
            this.asyncContex = asyncContex;
            this.msg = msg;
            this.sequence = sequence;
        }

        void sendResponse() {
            this.asyncContex.sendResponse((Object)this.msg);
        }

        @Override
        public int compareTo(SequanceMessage o) {
            return Integer.compare(this.sequence, o.sequence);
        }
    }

    class SequenceRpcRequestClosure
    extends RpcRequestClosure {
        private final int reqSequence;
        private final String groupId;
        private final String peerId;

        public SequenceRpcRequestClosure(RpcRequestClosure parent, int sequence, String groupId, String peerId) {
            super(parent.getBizContext(), parent.getAsyncContext());
            this.reqSequence = sequence;
            this.groupId = groupId;
            this.peerId = peerId;
        }

        @Override
        public void sendResponse(Message msg) {
            AppendEntriesRequestProcessor.this.sendSequenceResponse(this.groupId, this.peerId, this.reqSequence, this.getAsyncContext(), this.getBizContext(), msg);
        }
    }

    final class PeerExecutorSelector
    implements UserProcessor.ExecutorSelector {
        PeerExecutorSelector() {
        }

        public Executor select(String requestClass, Object requestHeader) {
            RpcRequests.AppendEntriesRequestHeader header = (RpcRequests.AppendEntriesRequestHeader)requestHeader;
            String groupId = header.getGroupId();
            PeerId peer = new PeerId();
            String peerId = header.getPeerId();
            if (!peer.parse(peerId)) {
                return AppendEntriesRequestProcessor.this.getExecutor();
            }
            Node node = NodeManager.getInstance().get(groupId, peer);
            if (node == null || !node.getRaftOptions().isReplicatorPipeline()) {
                return AppendEntriesRequestProcessor.this.getExecutor();
            }
            Utils.ensureBoltPipeline();
            PeerRequestContext ctx = AppendEntriesRequestProcessor.this.getPeerRequestContext(groupId, peerId, null);
            return ctx.executor;
        }
    }
}

