/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.ip.tcp.connection;

import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.ip.tcp.connection.ConnectionFactory;
import org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpListener;
import org.springframework.integration.ip.tcp.connection.TcpMessageMapper;
import org.springframework.integration.ip.tcp.connection.TcpNioConnection;
import org.springframework.integration.ip.tcp.connection.TcpSender;
import org.springframework.integration.ip.tcp.connection.TcpSocketSupport;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

public abstract class AbstractConnectionFactory
extends IntegrationObjectSupport
implements ConnectionFactory,
ApplicationEventPublisherAware {
    private static final String UNUSED = "unused";
    protected static final int DEFAULT_REPLY_TIMEOUT = 10000;
    private static final int DEFAULT_NIO_HARVEST_INTERVAL = 2000;
    private static final int DEFAULT_READ_DELAY = 100;
    protected final Object lifecycleMonitor = new Object();
    private final Map<String, TcpConnectionSupport> connections = new ConcurrentHashMap<String, TcpConnectionSupport>();
    private final BlockingQueue<PendingIO> delayedReads = new LinkedBlockingQueue<PendingIO>();
    private final List<TcpSender> senders = Collections.synchronizedList(new ArrayList());
    private String host;
    private int port;
    private TcpListener listener;
    private int soTimeout = -1;
    private int soSendBufferSize;
    private int soReceiveBufferSize;
    private boolean soTcpNoDelay;
    private int soLinger = -1;
    private boolean soKeepAlive;
    private int soTrafficClass = -1;
    private Executor taskExecutor;
    private boolean privateExecutor;
    private Deserializer<?> deserializer = new ByteArrayCrLfSerializer();
    private boolean deserializerSet;
    private Serializer<?> serializer = new ByteArrayCrLfSerializer();
    private TcpMessageMapper mapper = new TcpMessageMapper();
    private boolean mapperSet;
    private boolean singleUse;
    private TcpConnectionInterceptorFactoryChain interceptorFactoryChain;
    private boolean lookupHost = true;
    private TcpSocketSupport tcpSocketSupport = new DefaultTcpSocketSupport();
    private long nextCheckForClosedNioConnections;
    private int nioHarvestInterval = 2000;
    private ApplicationEventPublisher applicationEventPublisher;
    private long readDelay = 100L;
    private Integer sslHandshakeTimeout;
    private volatile boolean active;

    public AbstractConnectionFactory(int port) {
        this.port = port;
    }

    public AbstractConnectionFactory(String host, int port) {
        Assert.notNull((Object)host, (String)"host must not be null");
        this.host = host;
        this.port = port;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
        if (!this.deserializerSet && this.deserializer instanceof ApplicationEventPublisherAware) {
            ((ApplicationEventPublisherAware)this.deserializer).setApplicationEventPublisher(applicationEventPublisher);
        }
    }

    @Nullable
    public ApplicationEventPublisher getApplicationEventPublisher() {
        return this.applicationEventPublisher;
    }

    protected void setSocketAttributes(Socket socket) throws SocketException {
        if (this.soTimeout >= 0) {
            socket.setSoTimeout(this.soTimeout);
        }
        if (this.soSendBufferSize > 0) {
            socket.setSendBufferSize(this.soSendBufferSize);
        }
        if (this.soReceiveBufferSize > 0) {
            socket.setReceiveBufferSize(this.soReceiveBufferSize);
        }
        socket.setTcpNoDelay(this.soTcpNoDelay);
        if (this.soLinger >= 0) {
            socket.setSoLinger(true, this.soLinger);
        }
        if (this.soTrafficClass >= 0) {
            socket.setTrafficClass(this.soTrafficClass);
        }
        socket.setKeepAlive(this.soKeepAlive);
        this.tcpSocketSupport.postProcessSocket(socket);
    }

    public int getSoTimeout() {
        return this.soTimeout;
    }

    public void setSoTimeout(int soTimeout) {
        this.soTimeout = soTimeout;
    }

    public int getSoReceiveBufferSize() {
        return this.soReceiveBufferSize;
    }

    public void setSoReceiveBufferSize(int soReceiveBufferSize) {
        this.soReceiveBufferSize = soReceiveBufferSize;
    }

    public int getSoSendBufferSize() {
        return this.soSendBufferSize;
    }

    public void setSoSendBufferSize(int soSendBufferSize) {
        this.soSendBufferSize = soSendBufferSize;
    }

    public boolean isSoTcpNoDelay() {
        return this.soTcpNoDelay;
    }

    public void setSoTcpNoDelay(boolean soTcpNoDelay) {
        this.soTcpNoDelay = soTcpNoDelay;
    }

    public int getSoLinger() {
        return this.soLinger;
    }

    public void setSoLinger(int soLinger) {
        this.soLinger = soLinger;
    }

    public boolean isSoKeepAlive() {
        return this.soKeepAlive;
    }

    public void setSoKeepAlive(boolean soKeepAlive) {
        this.soKeepAlive = soKeepAlive;
    }

    public int getSoTrafficClass() {
        return this.soTrafficClass;
    }

    public void setSoTrafficClass(int soTrafficClass) {
        this.soTrafficClass = soTrafficClass;
    }

    public void setHost(String host) {
        Assert.state((!this.isRunning() ? 1 : 0) != 0, (String)"Cannot change the host while running");
        this.host = host;
    }

    public String getHost() {
        return this.host;
    }

    public void setPort(int port) {
        Assert.state((!this.isRunning() ? 1 : 0) != 0, (String)"Cannot change the port while running");
        this.port = port;
    }

    public int getPort() {
        return this.port;
    }

    @Nullable
    public TcpListener getListener() {
        return this.listener;
    }

    @Nullable
    public TcpSender getSender() {
        return this.senders.size() > 0 ? this.senders.get(0) : null;
    }

    public List<TcpSender> getSenders() {
        return Collections.unmodifiableList(this.senders);
    }

    public Serializer<?> getSerializer() {
        return this.serializer;
    }

    public Deserializer<?> getDeserializer() {
        return this.deserializer;
    }

    public TcpMessageMapper getMapper() {
        return this.mapper;
    }

    public void registerListener(TcpListener listenerToRegister) {
        Assert.isNull((Object)this.listener, (String)(this.getClass().getName() + " may only be used by one inbound adapter"));
        this.listener = listenerToRegister;
    }

    public void registerSender(TcpSender senderToRegister) {
        this.senders.add(senderToRegister);
    }

    public boolean unregisterSender(TcpSender sender) {
        return this.senders.remove(sender);
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setDeserializer(Deserializer<?> deserializer) {
        this.deserializer = deserializer;
        this.deserializerSet = true;
    }

    public void setSerializer(Serializer<?> serializer) {
        this.serializer = serializer;
    }

    public void setMapper(TcpMessageMapper mapper) {
        this.mapper = mapper;
        this.mapperSet = true;
    }

    public boolean isSingleUse() {
        return this.singleUse;
    }

    public void setSingleUse(boolean singleUse) {
        this.singleUse = singleUse;
    }

    public void setLeaveOpen(boolean leaveOpen) {
        this.singleUse = !leaveOpen;
    }

    public void setInterceptorFactoryChain(TcpConnectionInterceptorFactoryChain interceptorFactoryChain) {
        this.interceptorFactoryChain = interceptorFactoryChain;
    }

    public void setLookupHost(boolean lookupHost) {
        this.lookupHost = lookupHost;
    }

    public boolean isLookupHost() {
        return this.lookupHost;
    }

    public void setNioHarvestInterval(int nioHarvestInterval) {
        Assert.isTrue((nioHarvestInterval > 0 ? 1 : 0) != 0, (String)"NIO Harvest interval must be > 0");
        this.nioHarvestInterval = nioHarvestInterval;
    }

    public void setSslHandshakeTimeout(int sslHandshakeTimeout) {
        this.sslHandshakeTimeout = sslHandshakeTimeout;
    }

    @Nullable
    protected Integer getSslHandshakeTimeout() {
        return this.sslHandshakeTimeout;
    }

    protected BlockingQueue<PendingIO> getDelayedReads() {
        return this.delayedReads;
    }

    protected long getReadDelay() {
        return this.readDelay;
    }

    public void setReadDelay(long readDelay) {
        Assert.isTrue((readDelay > 0L ? 1 : 0) != 0, (String)"'readDelay' must be positive");
        this.readDelay = readDelay;
    }

    protected Object getLifecycleMonitor() {
        return this.lifecycleMonitor;
    }

    protected void onInit() {
        super.onInit();
        if (!this.mapperSet) {
            this.mapper.setBeanFactory(this.getBeanFactory());
        }
    }

    public void start() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("started " + this));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Executor getTaskExecutor() {
        if (!this.active) {
            throw new MessagingException("Connection Factory not started");
        }
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (this.taskExecutor == null) {
                this.privateExecutor = true;
                this.taskExecutor = Executors.newCachedThreadPool();
            }
            return this.taskExecutor;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        this.active = false;
        Object object = this.connections;
        synchronized (object) {
            Iterator<Map.Entry<String, TcpConnectionSupport>> iterator = this.connections.entrySet().iterator();
            while (iterator.hasNext()) {
                TcpConnection connection = iterator.next().getValue();
                connection.close();
                iterator.remove();
            }
        }
        object = this.lifecycleMonitor;
        synchronized (object) {
            if (this.privateExecutor) {
                ExecutorService executorService = (ExecutorService)this.taskExecutor;
                executorService.shutdown();
                try {
                    if (!executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                        this.logger.debug((Object)"Forcing executor shutdown");
                        executorService.shutdownNow();
                        if (!executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                            this.logger.debug((Object)"Executor failed to shutdown");
                        }
                    }
                }
                catch (InterruptedException e) {
                    executorService.shutdownNow();
                    Thread.currentThread().interrupt();
                }
                finally {
                    this.taskExecutor = null;
                    this.privateExecutor = false;
                }
            }
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("stopped " + this));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected TcpConnectionSupport wrapConnection(TcpConnectionSupport connectionArg) {
        TcpConnectionSupport connection = connectionArg;
        try {
            if (this.interceptorFactoryChain == null) {
                TcpConnectionSupport tcpConnectionSupport = connection;
                return tcpConnectionSupport;
            }
            TcpConnectionInterceptorFactory[] interceptorFactories = this.interceptorFactoryChain.getInterceptorFactories();
            if (interceptorFactories == null) {
                TcpConnectionSupport tcpConnectionSupport = connection;
                return tcpConnectionSupport;
            }
            for (TcpConnectionInterceptorFactory factory : interceptorFactories) {
                TcpConnectionInterceptorSupport wrapper = factory.getInterceptor();
                wrapper.setTheConnection(connection);
                if (this.listener == null) {
                    connection.registerListener(wrapper);
                }
                if (this.senders.size() == 0) {
                    connection.registerSender(wrapper);
                }
                connection = wrapper;
            }
            TcpConnectionSupport tcpConnectionSupport = connection;
            return tcpConnectionSupport;
        }
        finally {
            this.addConnection(connection);
        }
    }

    protected void processNioSelections(int selectionCount, Selector selector, @Nullable ServerSocketChannel server, Map<SocketChannel, TcpNioConnection> connectionMap) {
        long now = System.currentTimeMillis();
        this.rescheduleDelayedReads(selector, now);
        if (this.soTimeout > 0 || now >= this.nextCheckForClosedNioConnections || selectionCount == 0) {
            this.nextCheckForClosedNioConnections = now + (long)this.nioHarvestInterval;
            Iterator<Map.Entry<SocketChannel, TcpNioConnection>> it = connectionMap.entrySet().iterator();
            while (it.hasNext()) {
                TcpNioConnection connection;
                SocketChannel channel = it.next().getKey();
                if (!channel.isOpen()) {
                    this.logger.debug((Object)"Removing closed channel");
                    it.remove();
                    continue;
                }
                if (this.soTimeout <= 0 || now - (connection = connectionMap.get(channel)).getLastRead() < (long)this.soTimeout) continue;
                if (!connection.isServer() && now - connection.getLastSend() < (long)this.soTimeout && now - connection.getLastRead() < (long)(this.soTimeout * 2)) {
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)("Skipping a connection timeout because we have a recent send " + connection.getConnectionId()));
                    continue;
                }
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn((Object)("Timing out TcpNioConnection " + connection.getConnectionId()));
                }
                SocketTimeoutException exception = new SocketTimeoutException("Timing out connection");
                connection.publishConnectionExceptionEvent(exception);
                connection.timeout();
                connection.sendExceptionToListener(exception);
            }
        }
        this.harvestClosedConnections();
        if (this.logger.isTraceEnabled()) {
            if (this.host == null) {
                this.logger.trace((Object)("Port " + this.port + " SelectionCount: " + selectionCount));
            } else {
                this.logger.trace((Object)("Host " + this.host + " port " + this.port + " SelectionCount: " + selectionCount));
            }
        }
        if (selectionCount > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (!key.isValid()) {
                        this.logger.debug((Object)"Selection key no longer valid");
                        continue;
                    }
                    if (key.isReadable()) {
                        key.interestOps(key.interestOps() - 1);
                        TcpNioConnection connection = (TcpNioConnection)key.attachment();
                        connection.setLastRead(System.currentTimeMillis());
                        try {
                            this.taskExecutor.execute(() -> {
                                boolean delayed = false;
                                try {
                                    connection.readPacket();
                                }
                                catch (RejectedExecutionException e1) {
                                    this.delayRead(selector, now, key);
                                    delayed = true;
                                }
                                catch (Exception e2) {
                                    if (connection.isOpen()) {
                                        this.logger.error((Object)("Exception on read " + connection.getConnectionId() + " " + e2.getMessage()));
                                        connection.close();
                                    }
                                    this.logger.debug((Object)"Connection closed");
                                }
                                if (!delayed) {
                                    if (key.channel().isOpen()) {
                                        key.interestOps(1);
                                        selector.wakeup();
                                    } else {
                                        connection.sendExceptionToListener(new EOFException("Connection is closed"));
                                    }
                                }
                            });
                        }
                        catch (RejectedExecutionException e) {
                            this.delayRead(selector, now, key);
                        }
                        continue;
                    }
                    if (key.isAcceptable()) {
                        try {
                            this.doAccept(selector, server, now);
                        }
                        catch (Exception e) {
                            this.logger.error((Object)"Exception accepting new connection(s)", (Throwable)e);
                        }
                        continue;
                    }
                    this.logger.error((Object)("Unexpected key: " + key));
                }
                catch (CancelledKeyException e) {
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)("Selection key " + key + " cancelled"));
                }
                catch (Exception e) {
                    this.logger.error((Object)("Exception on selection key " + key), (Throwable)e);
                }
            }
        }
    }

    protected void delayRead(Selector selector, long now, SelectionKey key) {
        TcpNioConnection connection = (TcpNioConnection)key.attachment();
        if (!this.delayedReads.add(new PendingIO(now, key))) {
            this.logger.error((Object)("Failed to delay read; closing " + connection.getConnectionId()));
            connection.close();
        } else {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("No threads available, delaying read for " + connection.getConnectionId()));
            }
            selector.wakeup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rescheduleDelayedReads(Selector selector, long now) {
        boolean wakeSelector = false;
        try {
            while (this.delayedReads.size() > 0 && ((PendingIO)this.delayedReads.peek()).failedAt + this.readDelay < now) {
                PendingIO pendingRead = this.delayedReads.take();
                if (pendingRead.key.channel().isOpen()) {
                    pendingRead.key.interestOps(1);
                    wakeSelector = true;
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)("Rescheduling delayed read for " + ((TcpNioConnection)pendingRead.key.attachment()).getConnectionId()));
                    continue;
                }
                ((TcpNioConnection)pendingRead.key.attachment()).sendExceptionToListener(new EOFException("Connection is closed"));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally {
            if (wakeSelector) {
                selector.wakeup();
            }
        }
    }

    protected void doAccept(Selector selector, ServerSocketChannel server, long now) {
        throw new UnsupportedOperationException("Nio server factory must override this method");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addConnection(TcpConnectionSupport connection) {
        Map<String, TcpConnectionSupport> map = this.connections;
        synchronized (map) {
            if (!this.active) {
                connection.close();
                return;
            }
            this.connections.put(connection.getConnectionId(), connection);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)(this.getComponentName() + ": Added new connection: " + connection.getConnectionId()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<String> removeClosedConnectionsAndReturnOpenConnectionIds() {
        Map<String, TcpConnectionSupport> map = this.connections;
        synchronized (map) {
            ArrayList<String> openConnectionIds = new ArrayList<String>();
            Iterator<Map.Entry<String, TcpConnectionSupport>> iterator = this.connections.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, TcpConnectionSupport> entry = iterator.next();
                TcpConnectionSupport connection = entry.getValue();
                if (!connection.isOpen()) {
                    iterator.remove();
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)(this.getComponentName() + ": Removed closed connection: " + connection.getConnectionId()));
                    continue;
                }
                openConnectionIds.add(entry.getKey());
                if (!this.logger.isTraceEnabled()) continue;
                this.logger.trace((Object)(this.getComponentName() + ": Connection is open: " + connection.getConnectionId()));
            }
            return openConnectionIds;
        }
    }

    protected void harvestClosedConnections() {
        this.removeClosedConnectionsAndReturnOpenConnectionIds();
    }

    public boolean isRunning() {
        return this.active;
    }

    protected boolean isActive() {
        return this.active;
    }

    protected void setActive(boolean active) {
        this.active = active;
    }

    protected void checkActive() {
        if (!this.isActive()) {
            throw new UncheckedIOException(new IOException(this + " connection factory has not been started"));
        }
    }

    protected TcpSocketSupport getTcpSocketSupport() {
        return this.tcpSocketSupport;
    }

    public void setTcpSocketSupport(TcpSocketSupport tcpSocketSupport) {
        Assert.notNull((Object)tcpSocketSupport, (String)"TcpSocketSupport must not be null");
        this.tcpSocketSupport = tcpSocketSupport;
    }

    public List<String> getOpenConnectionIds() {
        return Collections.unmodifiableList(this.removeClosedConnectionsAndReturnOpenConnectionIds());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean closeConnection(String connectionId) {
        Assert.notNull((Object)connectionId, (String)"'connectionId' to close must not be null");
        Map<String, TcpConnectionSupport> map = this.connections;
        synchronized (map) {
            boolean closed = false;
            TcpConnectionSupport connection = this.connections.get(connectionId);
            if (connection != null) {
                try {
                    connection.close();
                    closed = true;
                }
                catch (Exception e) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Failed to close connection " + connectionId), (Throwable)e);
                    }
                    connection.publishConnectionExceptionEvent(e);
                }
            }
            return closed;
        }
    }

    public String toString() {
        return super.toString() + (this.host != null ? ", host=" + this.host : "") + ", port=" + this.getPort();
    }

    private static final class PendingIO {
        private final long failedAt;
        private final SelectionKey key;

        private PendingIO(long failedAt, SelectionKey key) {
            this.failedAt = failedAt;
            this.key = key;
        }
    }
}

