/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.ip.udp;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.ip.AbstractInternetProtocolSendingMessageHandler;
import org.springframework.integration.ip.udp.DatagramPacketMessageMapper;
import org.springframework.integration.ip.udp.SocketCustomizer;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class UnicastSendingMessageHandler
extends AbstractInternetProtocolSendingMessageHandler
implements Runnable {
    private static final int DEFAULT_ACK_TIMEOUT = 5000;
    private final DatagramPacketMessageMapper mapper = new DatagramPacketMessageMapper();
    private final Map<String, CountDownLatch> ackControl = Collections.synchronizedMap(new HashMap());
    private final Expression destinationExpression;
    private boolean waitForAck = false;
    private boolean acknowledge = false;
    private String ackHost;
    private int ackPort;
    private int ackTimeout = 5000;
    private int ackCounter = 1;
    private int soReceiveBufferSize = -1;
    private String localAddress;
    private DatagramSocket socket;
    private Executor taskExecutor;
    private boolean taskExecutorSet;
    private Expression socketExpression;
    private EvaluationContext evaluationContext;
    private SocketCustomizer socketCustomizer;
    private volatile CountDownLatch ackLatch;
    private volatile boolean ackThreadRunning;

    public UnicastSendingMessageHandler(String host, int port) {
        super(host, port);
        this.socketCustomizer = aSocket -> {};
        this.mapper.setLengthCheck(false);
        this.mapper.setAcknowledge(false);
        this.destinationExpression = null;
    }

    public UnicastSendingMessageHandler(String destinationExpression) {
        super("", 0);
        this.socketCustomizer = aSocket -> {};
        Assert.hasText((String)destinationExpression, (String)"'destinationExpression' cannot be null or empty");
        this.mapper.setLengthCheck(false);
        this.mapper.setAcknowledge(false);
        this.destinationExpression = EXPRESSION_PARSER.parseExpression(destinationExpression);
    }

    public UnicastSendingMessageHandler(Expression destinationExpression) {
        super("", 0);
        this.socketCustomizer = aSocket -> {};
        Assert.notNull((Object)destinationExpression, (String)"'destinationExpression' cannot be null");
        this.mapper.setLengthCheck(false);
        this.mapper.setAcknowledge(false);
        this.destinationExpression = destinationExpression;
    }

    public UnicastSendingMessageHandler(String host, int port, boolean lengthCheck) {
        super(host, port);
        this.socketCustomizer = aSocket -> {};
        this.mapper.setLengthCheck(lengthCheck);
        this.mapper.setAcknowledge(false);
        this.destinationExpression = null;
    }

    public UnicastSendingMessageHandler(String host, int port, boolean acknowledge, String ackHost, int ackPort, int ackTimeout) {
        super(host, port);
        this.socketCustomizer = aSocket -> {};
        this.destinationExpression = null;
        this.setReliabilityAttributes(false, acknowledge, ackHost, ackPort, ackTimeout);
    }

    public UnicastSendingMessageHandler(String host, int port, boolean lengthCheck, boolean acknowledge, String ackHost, int ackPort, int ackTimeout) {
        super(host, port);
        this.socketCustomizer = aSocket -> {};
        this.destinationExpression = null;
        this.setReliabilityAttributes(lengthCheck, acknowledge, ackHost, ackPort, ackTimeout);
    }

    public void setLengthCheck(boolean lengthCheck) {
        this.mapper.setLengthCheck(lengthCheck);
    }

    public void setSocketCustomizer(SocketCustomizer socketCustomizer) {
        Assert.notNull((Object)socketCustomizer, (String)"'socketCustomizer' cannot be null");
        this.socketCustomizer = socketCustomizer;
    }

    protected final void setReliabilityAttributes(boolean lengthCheck, boolean acknowledge, String ackHost, int ackPort, int ackTimeout) {
        this.mapper.setLengthCheck(lengthCheck);
        this.waitForAck = acknowledge;
        this.mapper.setAcknowledge(acknowledge);
        this.mapper.setAckAddress(ackHost + ":" + ackPort);
        this.ackHost = ackHost;
        this.ackPort = ackPort;
        if (ackTimeout > 0) {
            this.ackTimeout = ackTimeout;
        }
        this.acknowledge = acknowledge;
        if (this.acknowledge) {
            Assert.state((boolean)StringUtils.hasText((String)ackHost), (String)"'ackHost' must not be empty");
        }
    }

    @Override
    public void doStart() {
        if (this.acknowledge) {
            if (this.taskExecutor == null) {
                CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("UDP-Ack-Handler-");
                threadFactory.setDaemon(true);
                this.taskExecutor = Executors.newSingleThreadExecutor((ThreadFactory)threadFactory);
            }
            this.startAckThread();
        }
    }

    @Override
    protected void doStop() {
        this.closeSocketIfNeeded();
        if (!this.taskExecutorSet && this.taskExecutor != null) {
            ((ExecutorService)this.taskExecutor).shutdown();
            this.taskExecutor = null;
        }
    }

    public void handleMessageInternal(Message<?> message) {
        block13: {
            if (this.acknowledge) {
                Assert.state((boolean)this.isRunning(), (String)"When 'acknowledge' is enabled, adapter must be running");
                this.startAckThread();
            }
            CountDownLatch countdownLatch = null;
            UUID id = message.getHeaders().getId();
            if (id == null) {
                id = UUID.randomUUID();
            }
            String messageId = id.toString();
            try {
                boolean waitAck = this.waitForAck;
                if (waitAck) {
                    countdownLatch = new CountDownLatch(this.ackCounter);
                    this.ackControl.put(messageId, countdownLatch);
                }
                this.convertAndSend(message);
                if (!waitAck) break block13;
                try {
                    if (!countdownLatch.await(this.ackTimeout, TimeUnit.MILLISECONDS)) {
                        throw new MessagingException(message, "Failed to receive UDP Ack in " + this.ackTimeout + " millis");
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            catch (Exception ex) {
                if (!(ex instanceof MessagingException)) {
                    this.closeSocketIfNeeded();
                }
                throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message, () -> "Failed to send UDP packet in the [" + this + "]", (Throwable)ex);
            }
            finally {
                if (countdownLatch != null) {
                    this.ackControl.remove(messageId);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startAckThread() {
        if (!this.ackThreadRunning) {
            UnicastSendingMessageHandler unicastSendingMessageHandler = this;
            synchronized (unicastSendingMessageHandler) {
                if (!this.ackThreadRunning) {
                    try {
                        this.getSocket();
                    }
                    catch (IOException ex) {
                        this.logger.error((Throwable)ex, (CharSequence)"Error creating socket");
                    }
                    this.ackLatch = new CountDownLatch(1);
                    this.taskExecutor.execute(this);
                    try {
                        this.ackLatch.await(10000L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected void convertAndSend(Message<?> message) throws IOException, URISyntaxException {
        Object packet;
        SocketAddress destinationAddress;
        DatagramSocket datagramSocket = this.socketExpression != null ? (DatagramSocket)this.socketExpression.getValue(this.evaluationContext, message, DatagramSocket.class) : this.getSocket();
        if (this.destinationExpression != null) {
            Object destination = this.destinationExpression.getValue(this.evaluationContext, message);
            if (destination instanceof String) {
                destination = new URI((String)destination);
            }
            if (destination instanceof URI) {
                URI uri = (URI)destination;
                destination = new InetSocketAddress(uri.getHost(), uri.getPort());
            }
            if (!(destination instanceof SocketAddress)) throw new IllegalStateException("'destinationExpression' must evaluate to String, URI or SocketAddress. Gotten [" + destination + "].");
            destinationAddress = (SocketAddress)destination;
        } else {
            destinationAddress = this.getDestinationAddress();
        }
        if ((packet = this.mapper.fromMessage((Message)message)) != null) {
            ((DatagramPacket)packet).setSocketAddress(destinationAddress);
            datagramSocket.send((DatagramPacket)packet);
            this.logger.debug(() -> UnicastSendingMessageHandler.lambda$convertAndSend$2(message, (DatagramPacket)packet));
            return;
        } else {
            this.logger.debug(() -> "Mapper created no packet for message " + message);
        }
    }

    protected void setSocket(DatagramSocket socket) {
        this.socket = socket;
    }

    @Nullable
    protected DatagramSocket getTheSocket() {
        return this.socket;
    }

    protected synchronized DatagramSocket getSocket() throws IOException {
        if (this.socket == null) {
            if (this.acknowledge) {
                if (this.localAddress == null) {
                    this.socket = this.ackPort == 0 ? new DatagramSocket() : new DatagramSocket(this.ackPort);
                } else {
                    InetAddress whichNic = InetAddress.getByName(this.localAddress);
                    this.socket = new DatagramSocket(new InetSocketAddress(whichNic, this.ackPort));
                }
                if (this.soReceiveBufferSize > 0) {
                    this.socket.setReceiveBufferSize(this.soReceiveBufferSize);
                }
                this.logger.debug(() -> "Listening for acks on port: " + this.getAckPort());
                this.updateAckAddress();
            } else {
                this.socket = new DatagramSocket();
            }
            this.setSocketAttributes(this.socket);
        }
        return this.socket;
    }

    protected void updateAckAddress() {
        this.mapper.setAckAddress(this.ackHost + ":" + this.getAckPort());
    }

    @Override
    public void setSoReceiveBufferSize(int size) {
        this.soReceiveBufferSize = size;
    }

    @Override
    public synchronized void setLocalAddress(String localAddress) {
        this.localAddress = localAddress;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"'taskExecutor' cannot be null");
        this.taskExecutor = taskExecutor;
        this.taskExecutorSet = true;
    }

    public void setAckCounter(int ackCounter) {
        this.ackCounter = ackCounter;
    }

    public void setSocketExpression(Expression socketExpression) {
        this.socketExpression = socketExpression;
    }

    public void setSocketExpressionString(String socketExpression) {
        this.socketExpression = EXPRESSION_PARSER.parseExpression(socketExpression);
    }

    public String getComponentType() {
        return "ip:udp-outbound-channel-adapter";
    }

    public boolean isAcknowledge() {
        return this.acknowledge;
    }

    public int getAckPort() {
        DatagramSocket datagramSocket = this.socket;
        if (this.ackPort == 0 && datagramSocket != null) {
            return datagramSocket.getLocalPort();
        }
        return this.ackPort;
    }

    public int getSoReceiveBufferSize() {
        return this.soReceiveBufferSize;
    }

    protected void onInit() {
        super.onInit();
        this.mapper.setBeanFactory(this.getBeanFactory());
        this.evaluationContext = IntegrationContextUtils.getEvaluationContext((BeanFactory)this.getBeanFactory());
        if (this.socketExpression != null) {
            Assert.state((!this.acknowledge ? 1 : 0) != 0, (String)"'acknowledge' must be false when using a socket expression");
        }
    }

    protected void setSocketAttributes(DatagramSocket socket) throws SocketException {
        int soSendBufferSize;
        int soTimeout = this.getSoTimeout();
        if (soTimeout >= 0) {
            socket.setSoTimeout(soTimeout);
        }
        if ((soSendBufferSize = this.getSoSendBufferSize()) > 0) {
            socket.setSendBufferSize(soSendBufferSize);
        }
        this.socketCustomizer.configure(socket);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            try {
                this.ackThreadRunning = true;
                this.ackLatch.countDown();
                DatagramPacket ackPack = new DatagramPacket(new byte[100], 100);
                while (true) {
                    this.getSocket().receive(ackPack);
                    String id = new String(ackPack.getData(), ackPack.getOffset(), ackPack.getLength());
                    this.logger.debug(() -> "Received ack for " + id + " from " + ackPack.getAddress().getHostAddress());
                    CountDownLatch latch = this.ackControl.get(id);
                    if (latch == null) continue;
                    latch.countDown();
                }
            }
            catch (IOException ex) {
                if (this.socket != null && !this.socket.isClosed()) {
                    this.logger.error(() -> "Error on UDP Acknowledge thread: " + ex.getMessage());
                }
                this.ackThreadRunning = false;
            }
        }
        catch (Throwable throwable) {
            this.ackThreadRunning = false;
            throw throwable;
        }
    }

    public void restartAckThread() {
        this.taskExecutor.execute(this);
    }

    private void closeSocketIfNeeded() {
        if (this.socket != null) {
            try {
                this.socket.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.socket = null;
        }
    }

    private static /* synthetic */ CharSequence lambda$convertAndSend$2(Message message, DatagramPacket packet) {
        return "Sent packet for message " + message + " to " + packet.getSocketAddress();
    }
}

