/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.protocol.impl;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardClusterCoordinationProtocolSender
implements ClusterCoordinationProtocolSender {
    private static final Logger logger = LoggerFactory.getLogger(StandardClusterCoordinationProtocolSender.class);
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final SocketConfiguration socketConfiguration;
    private final int maxThreadsPerRequest;
    private int handshakeTimeoutSeconds;

    public StandardClusterCoordinationProtocolSender(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext, int maxThreadsPerRequest) {
        if (socketConfiguration == null) {
            throw new IllegalArgumentException("Socket configuration may not be null.");
        }
        if (protocolContext == null) {
            throw new IllegalArgumentException("Protocol Context may not be null.");
        }
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
        this.handshakeTimeoutSeconds = -1;
        this.maxThreadsPerRequest = maxThreadsPerRequest;
    }

    @Override
    public void setBulletinRepository(BulletinRepository bulletinRepository) {
    }

    @Override
    public ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage msg) throws ProtocolException {
        ProtocolMessage response;
        block7: {
            ReconnectionResponseMessage reconnectionResponseMessage;
            Socket socket = null;
            try {
                socket = this.createSocket(msg.getNodeId(), true);
                try {
                    ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                    marshaller.marshal(msg, socket.getOutputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed marshalling '" + String.valueOf((Object)msg.getType()) + "' protocol message due to: " + String.valueOf(ioe), ioe);
                }
                try {
                    ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                    response = unmarshaller.unmarshal(socket.getInputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed unmarshalling '" + String.valueOf((Object)ProtocolMessage.MessageType.RECONNECTION_RESPONSE) + "' protocol message due to: " + String.valueOf(ioe), ioe);
                }
                if (ProtocolMessage.MessageType.RECONNECTION_RESPONSE != response.getType()) break block7;
                reconnectionResponseMessage = (ReconnectionResponseMessage)response;
            }
            catch (Throwable throwable) {
                SocketUtils.closeQuietly(socket);
                throw throwable;
            }
            SocketUtils.closeQuietly(socket);
            return reconnectionResponseMessage;
        }
        throw new ProtocolException("Expected message type '" + String.valueOf((Object)ProtocolMessage.MessageType.FLOW_RESPONSE) + "' but found '" + String.valueOf((Object)response.getType()) + "'");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void offload(OffloadMessage msg) throws ProtocolException {
        Socket socket = null;
        try {
            socket = this.createSocket(msg.getNodeId(), true);
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + String.valueOf((Object)msg.getType()) + "' protocol message due to: " + String.valueOf(ioe), ioe);
            }
        }
        catch (Throwable throwable) {
            SocketUtils.closeQuietly(socket);
            throw throwable;
        }
        SocketUtils.closeQuietly(socket);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnect(DisconnectMessage msg) throws ProtocolException {
        Socket socket = null;
        try {
            socket = this.createSocket(msg.getNodeId(), true);
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + String.valueOf((Object)msg.getType()) + "' protocol message due to: " + String.valueOf(ioe), ioe);
            }
        }
        catch (Throwable throwable) {
            SocketUtils.closeQuietly(socket);
            throw throwable;
        }
        SocketUtils.closeQuietly(socket);
    }

    private void setConnectionHandshakeTimeoutOnSocket(Socket socket) throws SocketException {
        if (this.handshakeTimeoutSeconds >= 0) {
            socket.setSoTimeout(this.handshakeTimeoutSeconds * 1000);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    public int getHandshakeTimeoutSeconds() {
        return this.handshakeTimeoutSeconds;
    }

    public void setHandshakeTimeout(String handshakeTimeout) {
        this.handshakeTimeoutSeconds = (int)FormatUtils.getTimeDuration((String)handshakeTimeout, (TimeUnit)TimeUnit.SECONDS);
    }

    private Socket createSocket(NodeIdentifier nodeId, boolean applyHandshakeTimeout) {
        return this.createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout);
    }

    private Socket createSocket(String host, int port, boolean applyHandshakeTimeout) {
        try {
            Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), this.socketConfiguration);
            if (applyHandshakeTimeout) {
                this.setConnectionHandshakeTimeoutOnSocket(socket);
            }
            return socket;
        }
        catch (IOException ioe) {
            throw new ProtocolException("Failed to create socket due to: " + String.valueOf(ioe), ioe);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public NodeConnectionStatus requestNodeConnectionStatus(String hostname, int port) {
        byte[] msgBytes;
        Objects.requireNonNull(hostname);
        NodeConnectionStatusRequestMessage msg = new NodeConnectionStatusRequestMessage();
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
            marshaller.marshal(msg, baos);
            msgBytes = baos.toByteArray();
        }
        catch (IOException e) {
            throw new ProtocolException("Failed to marshal NodeIdentifierRequestMessage", e);
        }
        try (Socket socket = this.createSocket(hostname, port, true);){
            ProtocolMessage response;
            socket.getOutputStream().write(msgBytes);
            try {
                ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                response = unmarshaller.unmarshal(socket.getInputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed unmarshalling '" + String.valueOf((Object)ProtocolMessage.MessageType.RECONNECTION_RESPONSE) + "' protocol message due to: " + String.valueOf(ioe), ioe);
            }
            if (ProtocolMessage.MessageType.NODE_CONNECTION_STATUS_RESPONSE != response.getType()) throw new ProtocolException("Expected message type '" + String.valueOf((Object)ProtocolMessage.MessageType.NODE_CONNECTION_STATUS_RESPONSE) + "' but found '" + String.valueOf((Object)response.getType()) + "'");
            NodeConnectionStatus nodeConnectionStatus = ((NodeConnectionStatusResponseMessage)response).getNodeConnectionStatus();
            return nodeConnectionStatus;
        }
        catch (IOException ioe) {
            throw new ProtocolException("Failed to request Node Identifer from " + hostname + ":" + port, ioe);
        }
    }

    @Override
    public void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, NodeStatusChangeMessage msg) {
        byte[] msgBytes;
        if (nodesToNotify.isEmpty()) {
            return;
        }
        int numThreads = Math.min(nodesToNotify.size(), this.maxThreadsPerRequest);
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
            marshaller.marshal(msg, baos);
            msgBytes = baos.toByteArray();
        }
        catch (IOException e) {
            throw new ProtocolException("Failed to marshal NodeStatusChangeMessage", e);
        }
        ExecutorService executor = Executors.newFixedThreadPool(numThreads, new ThreadFactory(this){
            private final AtomicInteger counter = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = Executors.defaultThreadFactory().newThread(r);
                thread.setDaemon(true);
                thread.setName("Notify Cluster of Node Status Change-" + this.counter.incrementAndGet());
                return thread;
            }
        });
        for (NodeIdentifier nodeId : nodesToNotify) {
            executor.submit(() -> {
                int attempts = 5;
                boolean retrySeconds = true;
                Exception lastException = null;
                for (int i = 0; i < 5; ++i) {
                    try (Socket socket = this.createSocket(nodeId, true);){
                        OutputStream out = socket.getOutputStream();
                        out.write(msgBytes);
                    }
                    catch (Exception e) {
                        if (e instanceof ProtocolException && e.getCause() instanceof ConnectException && nodeId.equals(msg.getNodeId())) {
                            logger.warn("Failed to send Node Status Change message to {} because unable to connect to node. Will not retry.", (Object)nodeId, (Object)e);
                            return;
                        }
                        logger.warn("Failed to send Node Status Change message to {}", (Object)nodeId, (Object)e);
                        lastException = e;
                        try {
                            Thread.sleep(1000L);
                            continue;
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    logger.debug("Notified {} of status change {}", (Object)nodeId, (Object)msg);
                    return;
                }
                throw new ProtocolException("Failed to send Node Status Change message to " + String.valueOf(nodeId), lastException);
            });
        }
        executor.shutdown();
        try {
            executor.awaitTermination(10L, TimeUnit.DAYS);
        }
        catch (InterruptedException ie) {
            logger.warn("Interrupted while waiting for other nodes in cluster to be notified of Node Status Change {}", (Object)msg);
            Thread.currentThread().interrupt();
        }
    }
}

