/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.distributed.internal.direct;

import java.io.IOException;
import java.io.NotSerializableException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.direct.ShunnedMemberException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MemberShunnedException;
import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.distributed.internal.membership.api.Message;
import org.apache.geode.distributed.internal.membership.api.MessageListener;
import org.apache.geode.internal.cache.DirectReplyMessage;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.tcp.BaseMsgStreamer;
import org.apache.geode.internal.tcp.ConnectExceptions;
import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.internal.tcp.ConnectionException;
import org.apache.geode.internal.tcp.MsgStreamer;
import org.apache.geode.internal.tcp.TCPConduit;
import org.apache.geode.internal.util.Breadcrumbs;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.logging.log4j.Logger;

public class DirectChannel {
    private static final Logger logger = LogService.getLogger();
    private final transient TCPConduit conduit;
    private final ClusterDistributionManager dm;
    private volatile boolean disconnected = true;
    private volatile boolean disconnectCompleted = true;
    private final MessageListener receiver;
    private final InetAddress address;
    InternalDistributedMember localAddr;

    public void setLocalAddr(InternalDistributedMember localAddr) {
        this.localAddr = localAddr;
        this.conduit.setMemberId(localAddr);
        if (this.disconnected) {
            this.disconnected = false;
            this.disconnectCompleted = false;
        }
    }

    public CancelCriterion getCancelCriterion() {
        return this.conduit.getCancelCriterion();
    }

    public DirectChannel(Membership<InternalDistributedMember> mgr, MessageListener<InternalDistributedMember> listener, ClusterDistributionManager dm) throws ConnectionException {
        this.receiver = listener;
        this.dm = dm;
        DistributionConfig dc = dm.getConfig();
        this.address = this.initAddress(dc);
        boolean isBindAddress = dc.getBindAddress() != null;
        try {
            Properties props;
            int port = Integer.getInteger("tcpServerPort", 0);
            if (port == 0) {
                port = dc.getTcpPort();
            }
            if ((props = System.getProperties()).getProperty("p2p.shareSockets") == null) {
                props.setProperty("p2p.shareSockets", String.valueOf(dc.getConserveSockets()));
            }
            if (dc.getSocketBufferSize() != 32768) {
                props.setProperty("p2p.tcpBufferSize", String.valueOf(dc.getSocketBufferSize()));
            }
            if (props.getProperty("p2p.idleConnectionTimeout") == null) {
                props.setProperty("p2p.idleConnectionTimeout", String.valueOf(dc.getSocketLeaseTime()));
            }
            int[] range = dc.getMembershipPortRange();
            props.setProperty("membership_port_range_start", "" + range[0]);
            props.setProperty("membership_port_range_end", "" + range[1]);
            this.conduit = new TCPConduit(mgr, port, this.address, isBindAddress, this, props);
            this.disconnected = false;
            this.disconnectCompleted = false;
            logger.info("GemFire P2P Listener started on {}", (Object)this.conduit.getSocketId());
        }
        catch (ConnectionException ce) {
            logger.fatal(String.format("Unable to initialize direct channel because: %s", ce.getMessage()), (Throwable)ce);
            throw ce;
        }
    }

    boolean threadOwnsResources() {
        if (this.dm != null) {
            return this.dm.getSystem().threadOwnsResources() && !AlertingAction.isThreadAlerting();
        }
        return false;
    }

    private int sendToOne(Membership mgr, InternalDistributedMember[] p_destinations, DistributionMessage msg, long ackWaitThreshold, long ackSAThreshold) throws ConnectExceptions, NotSerializableException {
        return this.sendToMany(mgr, p_destinations, msg, ackWaitThreshold, ackSAThreshold);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int sendToMany(Membership mgr, InternalDistributedMember[] p_destinations, DistributionMessage msg, long ackWaitThreshold, long ackSAThreshold) throws ConnectExceptions, NotSerializableException {
        InternalDistributedMember[] destinations = p_destinations;
        ConnectExceptions failedCe = null;
        ConnectExceptions retryInfo = null;
        int bytesWritten = 0;
        boolean retry = false;
        boolean orderedMsg = msg.orderedDelivery() || Connection.isDominoThread();
        ArrayList totalSentCons = new ArrayList(destinations.length);
        boolean interrupted = false;
        long ackTimeout = 0L;
        long ackSDTimeout = 0L;
        long startTime = 0L;
        DirectReplyMessage directMsg = msg instanceof DirectReplyMessage ? (DirectReplyMessage)((Object)msg) : null;
        if (directMsg != null || msg.getProcessorId() > 0) {
            ackTimeout = (int)(ackWaitThreshold * 1000L);
            if (msg.isSevereAlertCompatible() || ReplyProcessor21.isSevereAlertProcessingForced()) {
                ackSDTimeout = (int)(ackSAThreshold * 1000L);
                if (ReplyProcessor21.getShortSevereAlertProcessing()) {
                    ackSDTimeout = (int)(ReplyProcessor21.PR_SEVERE_ALERT_RATIO * (double)ackSDTimeout);
                }
            }
        }
        boolean directReply = false;
        if (directMsg != null && directMsg.supportsDirectAck() && this.threadOwnsResources()) {
            directReply = true;
        }
        if (!directReply && directMsg != null) {
            directMsg.registerProcessor();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Sending ({}) to {} peers ({}) via tcp/ip", (Object)msg, (Object)p_destinations.length, (Object)Arrays.toString(p_destinations));
        }
        try {
            do {
                List<?> sentCons;
                boolean bl = interrupted = Thread.interrupted() || interrupted;
                if (retryInfo != null) {
                    List<InternalDistributedMember> retryMembers = retryInfo.getMembers();
                    InternalDistributedMember[] retryDest = new InternalDistributedMember[retryMembers.size()];
                    retryDest = retryMembers.toArray(retryDest);
                    destinations = retryDest;
                    retryInfo = null;
                    retry = true;
                }
                ArrayList cons = new ArrayList(destinations.length);
                ConnectExceptions ce = this.getConnections(mgr, msg, destinations, orderedMsg, retry, ackTimeout, ackSDTimeout, cons);
                if (directReply && msg.getProcessorId() > 0) {
                    directReply = false;
                }
                if (ce != null) {
                    if (failedCe != null) {
                        failedCe.getMembers().addAll(ce.getMembers());
                        failedCe.getCauses().addAll(ce.getCauses());
                    } else {
                        failedCe = ce;
                    }
                    ce = null;
                }
                if (cons.isEmpty()) {
                    if (failedCe != null) {
                        throw failedCe;
                    }
                    int n = bytesWritten;
                    return n;
                }
                if (retry && logger.isDebugEnabled()) {
                    logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip", (Object)msg, (Object)cons.size(), cons);
                }
                DMStats stats = this.getDMStats();
                BaseMsgStreamer ms = MsgStreamer.create(cons, msg, directReply, stats, this.getConduit().getBufferPool());
                try {
                    startTime = 0L;
                    if (ackTimeout > 0L) {
                        startTime = System.currentTimeMillis();
                    }
                    ms.reserveConnections(startTime, ackTimeout, ackSDTimeout);
                    int result = ms.writeMessage();
                    if (bytesWritten == 0) {
                        bytesWritten = result;
                    }
                    ce = ms.getConnectExceptions();
                    sentCons = ms.getSentConnections();
                    totalSentCons.addAll(sentCons);
                }
                catch (NotSerializableException e) {
                    throw e;
                }
                catch (IOException ex) {
                    throw new InternalGemFireException("Unknown error serializing message", ex);
                }
                finally {
                    try {
                        ms.close();
                    }
                    catch (IOException e) {
                        throw new InternalGemFireException("Unknown error serializing message", e);
                    }
                }
                if (ce != null) {
                    retryInfo = ce;
                    ce = null;
                }
                if (directReply && !sentCons.isEmpty()) {
                    long readAckStart = 0L;
                    if (stats != null) {
                        readAckStart = stats.startReplyWait();
                    }
                    try {
                        ce = this.readAcks(sentCons, startTime, ackTimeout, ackSDTimeout, ce, directMsg.getDirectReplyProcessor());
                    }
                    finally {
                        if (stats != null) {
                            stats.endReplyWait(readAckStart, startTime);
                        }
                    }
                }
                if (ce != null) {
                    if (retryInfo != null) {
                        retryInfo.getMembers().addAll(ce.getMembers());
                        retryInfo.getCauses().addAll(ce.getCauses());
                    } else {
                        retryInfo = ce;
                    }
                    ce = null;
                }
                if (retryInfo == null) continue;
                this.conduit.getCancelCriterion().checkCancelInProgress(null);
            } while (retryInfo != null);
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
            for (Connection con : totalSentCons) {
                con.setInUse(false, 0L, 0L, 0L, null);
            }
        }
        if (failedCe != null) {
            throw failedCe;
        }
        return bytesWritten;
    }

    private ConnectExceptions readAcks(List sentCons, long startTime, long ackTimeout, long ackSDTimeout, ConnectExceptions cumulativeExceptions, DirectReplyProcessor processor) {
        ConnectExceptions ce = cumulativeExceptions;
        for (Connection con : sentCons) {
            if (con.isSharedResource()) continue;
            try {
                try {
                    con.readAck(processor);
                }
                catch (SocketTimeoutException ex) {
                    this.handleAckTimeout(ackTimeout, ackSDTimeout, con, processor);
                }
            }
            catch (ConnectionException conEx) {
                if (ce == null) {
                    ce = new ConnectExceptions();
                }
                ce.addFailure(con.getRemoteAddress(), conEx);
            }
        }
        return ce;
    }

    private ConnectExceptions getConnections(Membership mgr, DistributionMessage msg, InternalDistributedMember[] destinations, boolean preserveOrder, boolean retry, long ackTimeout, long ackSDTimeout, List cons) {
        ConnectExceptions ce = null;
        for (int i = 0; i < destinations.length; ++i) {
            InternalDistributedMember destination = destinations[i];
            if (destination == null || this.localAddr.equals(destination)) continue;
            if (!mgr.memberExists((MemberIdentifier)destination) || mgr.shutdownInProgress() || mgr.isShunned((MemberIdentifier)destination)) {
                if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
                    logger.trace(LogMarker.DM_VERBOSE, "Not a member: {}", (Object)destination);
                }
                if (ce == null) {
                    ce = new ConnectExceptions();
                }
                ce.addFailure(destination, new ShunnedMemberException(String.format("Member is being shunned: %s", destination)));
                continue;
            }
            try {
                long startTime = 0L;
                if (ackTimeout > 0L) {
                    startTime = System.currentTimeMillis();
                }
                Connection con = this.conduit.getConnection(destination, preserveOrder, retry, startTime, ackTimeout, ackSDTimeout);
                con.setInUse(true, startTime, 0L, 0L, null);
                cons.add(con);
                if (!con.isSharedResource() || !(msg instanceof DirectReplyMessage)) continue;
                DirectReplyMessage directMessage = (DirectReplyMessage)((Object)msg);
                directMessage.registerProcessor();
                continue;
            }
            catch (IOException ex) {
                if (ce == null) {
                    ce = new ConnectExceptions();
                }
                ce.addFailure(destination, ex);
            }
        }
        return ce;
    }

    public int send(Membership mgr, InternalDistributedMember[] destinations, DistributionMessage msg, long ackWaitThreshold, long ackSAThreshold) throws ConnectExceptions, NotSerializableException {
        if (this.disconnected) {
            if (logger.isDebugEnabled()) {
                logger.debug("Returning from DirectChannel send because channel is disconnected: {}", (Object)msg);
            }
            return 0;
        }
        if (destinations == null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Returning from DirectChannel send because null set passed in: {}", (Object)msg);
            }
            return 0;
        }
        if (destinations.length == 0) {
            if (logger.isDebugEnabled()) {
                logger.debug("Returning from DirectChannel send because empty destinations passed in {}", (Object)msg);
            }
            return 0;
        }
        msg.setSender(this.localAddr);
        if (destinations.length == 1) {
            return this.sendToOne(mgr, destinations, msg, ackWaitThreshold, ackSAThreshold);
        }
        return this.sendToMany(mgr, destinations, msg, ackWaitThreshold, ackSAThreshold);
    }

    public DMStats getDMStats() {
        if (this.dm != null) {
            return this.dm.getStats();
        }
        return null;
    }

    public DistributionConfig getDMConfig() {
        if (this.dm != null) {
            return this.dm.getConfig();
        }
        return null;
    }

    public DistributionManager getDM() {
        return this.dm;
    }

    private void handleAckTimeout(long ackTimeout, long ackSATimeout, Connection c, DirectReplyProcessor processor) throws ConnectionException {
        Set<InternalDistributedMember> activeMembers = this.dm.getDistributionManagerIds();
        this.dm.getStats().incReplyTimeouts();
        String msg = "%s seconds have elapsed while waiting for reply from %s on %s whose current membership list is: [%s]";
        Object[] msgArgs = new Object[]{ackTimeout / 1000L, c.getRemoteAddress(), this.dm.getId(), activeMembers};
        logger.warn(String.format(msg, msgArgs));
        msgArgs[3] = "(omitted)";
        Breadcrumbs.setProblem(msg, msgArgs);
        if (ReplyProcessor21.THROW_EXCEPTION_ON_TIMEOUT) {
            TimeoutException cause = new TimeoutException("Timed out waiting for ACKS.");
            throw new InternalGemFireException(String.format(msg, msgArgs), cause);
        }
        if (activeMembers.contains(c.getRemoteAddress())) {
            if (ackSATimeout > 0L) {
                try {
                    c.readAck(processor);
                    return;
                }
                catch (SocketTimeoutException e) {
                    Object[] args = new Object[]{(ackSATimeout + ackTimeout) / 1000L, c.getRemoteAddress(), this.dm.getId(), activeMembers};
                    logger.fatal("{} seconds have elapsed while waiting for reply from {} on {} whose currentFull membership list is: [{}]", args);
                }
            }
            try {
                c.readAck(processor);
            }
            catch (SocketTimeoutException ex) {
                logger.error(String.format("Unexpected timeout while waiting for ack from %s", c.getRemoteAddress()), (Throwable)ex);
            }
        } else {
            logger.warn("View no longer has {} as an active member, so we will no longer wait for it.", (Object)c.getRemoteAddress());
            processor.memberDeparted(this.getDM(), c.getRemoteAddress(), true);
        }
    }

    public void receive(DistributionMessage msg, int bytesRead) throws MemberShunnedException {
        block5: {
            if (this.disconnected) {
                return;
            }
            try {
                this.receiver.messageReceived((Message)msg);
            }
            catch (MemberShunnedException e) {
                throw e;
            }
            catch (CancelException e) {
            }
            catch (Exception ex) {
                if (this.conduit.getCancelCriterion().isCancelInProgress()) break block5;
                logger.fatal("While pulling a message", (Throwable)ex);
            }
        }
    }

    public void emergencyClose() {
        this.conduit.emergencyClose();
    }

    public synchronized void disconnect(Exception cause) {
        this.disconnected = true;
        this.disconnectCompleted = false;
        this.conduit.stop(cause);
        this.disconnectCompleted = true;
    }

    public boolean isOpen() {
        return !this.disconnectCompleted;
    }

    protected MessageListener getReceiver() {
        return this.receiver;
    }

    public int getPort() {
        return this.conduit.getPort();
    }

    public TCPConduit getConduit() {
        return this.conduit;
    }

    private InetAddress initAddress(DistributionConfig dc) {
        String bindAddress = dc.getBindAddress();
        try {
            if (bindAddress != null && bindAddress.length() > 0) {
                return InetAddress.getByName(bindAddress);
            }
            return LocalHostUtil.getLocalHost();
        }
        catch (UnknownHostException unhe) {
            throw new RuntimeException(unhe);
        }
    }

    public void closeEndpoint(InternalDistributedMember member, String reason) {
        this.closeEndpoint(member, reason, true);
    }

    public void closeEndpoint(InternalDistributedMember member, String reason, boolean notifyDisconnect) {
        TCPConduit tc = this.conduit;
        if (tc != null) {
            tc.removeEndpoint(member, reason, notifyDisconnect);
        }
    }

    public void getChannelStates(DistributedMember member, Map result) {
        TCPConduit tc = this.conduit;
        if (tc != null) {
            tc.getThreadOwnedOrderedConnectionState(member, result);
        }
    }

    public void waitForChannelState(DistributedMember member, Map channelState) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        TCPConduit tc = this.conduit;
        if (tc != null) {
            tc.waitForThreadOwnedOrderedConnectionState(member, channelState);
        }
    }

    public boolean hasReceiversFor(DistributedMember mbr) {
        return this.conduit.hasReceiversFor(mbr);
    }
}

