/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.protocol;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Objects;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.CommsTimingDetails;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNodeProtocolSender
implements NodeProtocolSender {
    private static final Logger logger = LoggerFactory.getLogger(AbstractNodeProtocolSender.class);
    private final SocketConfiguration socketConfiguration;
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final ProtocolMessageMarshaller<ProtocolMessage> marshaller;
    private final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller;

    public AbstractNodeProtocolSender(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
        this.marshaller = protocolContext.createMarshaller();
        this.unmarshaller = protocolContext.createUnmarshaller();
    }

    @Override
    public ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg, boolean allowConnectToSelf) throws ProtocolException, UnknownServiceAddressException {
        ProtocolMessage response;
        block10: {
            ConnectionResponseMessage connectionResponseMessage;
            Socket socket = null;
            try {
                ConnectionResponseMessage connectionResponse;
                InetSocketAddress socketAddress;
                try {
                    socketAddress = this.getServiceAddress();
                }
                catch (IOException e) {
                    throw new ProtocolException("Could not determined address of Cluster Coordinator", e);
                }
                if (!allowConnectToSelf) {
                    this.validateNotConnectingToSelf(msg, socketAddress);
                }
                logger.info("Cluster Coordinator is located at {}. Will send Cluster Connection Request to this address", (Object)socketAddress);
                socket = this.createSocket(socketAddress);
                try {
                    ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                    marshaller.marshal(msg, socket.getOutputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed marshalling '" + String.valueOf((Object)msg.getType()) + "' protocol message due to: " + String.valueOf(ioe), ioe);
                }
                try {
                    ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                    response = unmarshaller.unmarshal(socket.getInputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed unmarshalling '" + String.valueOf((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' protocol message from " + String.valueOf(socket.getRemoteSocketAddress()) + " due to: " + String.valueOf(ioe), ioe);
                }
                if (ProtocolMessage.MessageType.CONNECTION_RESPONSE != response.getType()) break block10;
                connectionResponseMessage = connectionResponse = (ConnectionResponseMessage)response;
            }
            catch (Throwable throwable) {
                SocketUtils.closeQuietly(socket);
                throw throwable;
            }
            SocketUtils.closeQuietly(socket);
            return connectionResponseMessage;
        }
        throw new ProtocolException("Expected message type '" + String.valueOf((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' but found '" + String.valueOf((Object)response.getType()) + "'");
    }

    private void validateNotConnectingToSelf(ConnectionRequestMessage msg, InetSocketAddress socketAddress) {
        NodeIdentifier localNodeIdentifier = msg.getConnectionRequest().getProposedNodeIdentifier();
        if (localNodeIdentifier == null) {
            return;
        }
        String localAddress = localNodeIdentifier.getSocketAddress();
        int localPort = localNodeIdentifier.getSocketPort();
        if (Objects.equals(localAddress, socketAddress.getHostString()) && localPort == socketAddress.getPort()) {
            throw new UnknownServiceAddressException("Cluster Coordinator is currently " + socketAddress.getHostString() + ":" + socketAddress.getPort() + ", which is this node, but connecting to self is not allowed at this phase of the lifecycle. This node must wait for a new Cluster Coordinator to be elected before connecting to the cluster.");
        }
    }

    @Override
    public HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) throws ProtocolException {
        int port;
        CommsTimingDetails timingDetails = new CommsTimingDetails();
        String[] parts = address.split(":");
        String hostname = parts[0];
        ProtocolMessage responseMessage = this.sendProtocolMessage(msg, hostname, port = Integer.parseInt(parts[1]), timingDetails);
        if (ProtocolMessage.MessageType.HEARTBEAT_RESPONSE == responseMessage.getType()) {
            HeartbeatResponseMessage heartbeatResponseMessage = (HeartbeatResponseMessage)responseMessage;
            heartbeatResponseMessage.setCommsTimingDetails(timingDetails);
            return heartbeatResponseMessage;
        }
        throw new ProtocolException("Expected message type '" + String.valueOf((Object)ProtocolMessage.MessageType.HEARTBEAT_RESPONSE) + "' but found '" + String.valueOf((Object)responseMessage.getType()) + "'");
    }

    @Override
    public ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException {
        InetSocketAddress serviceAddress;
        try {
            serviceAddress = this.getServiceAddress();
        }
        catch (IOException e) {
            throw new ProtocolException("Failed to getServiceAddress due to " + String.valueOf(e), e);
        }
        ProtocolMessage responseMessage = this.sendProtocolMessage(msg, serviceAddress.getHostName(), serviceAddress.getPort(), new CommsTimingDetails());
        if (ProtocolMessage.MessageType.CLUSTER_WORKLOAD_RESPONSE == responseMessage.getType()) {
            return (ClusterWorkloadResponseMessage)responseMessage;
        }
        throw new ProtocolException("Expected message type '" + String.valueOf((Object)ProtocolMessage.MessageType.CLUSTER_WORKLOAD_RESPONSE) + "' but found '" + String.valueOf((Object)responseMessage.getType()) + "'");
    }

    private Socket createSocket(InetSocketAddress socketAddress) {
        try {
            return SocketUtils.createSocket(socketAddress, this.socketConfiguration);
        }
        catch (IOException ioe) {
            if (socketAddress == null) {
                throw new ProtocolException("Failed to create socket due to: " + String.valueOf(ioe), ioe);
            }
            throw new ProtocolException("Failed to create socket to " + String.valueOf(socketAddress) + " due to: " + String.valueOf(ioe), ioe);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    /*
     * Exception decompiling
     */
    private ProtocolMessage sendProtocolMessage(ProtocolMessage msg, String hostname, int port, CommsTimingDetails timingDetails) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    protected abstract InetSocketAddress getServiceAddress() throws IOException;
}

