/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.californium.elements.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.AbstractChannelPoolHandler;
import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.californium.elements.Connector;
import org.eclipse.californium.elements.CorrelationContext;
import org.eclipse.californium.elements.CorrelationContextMatcher;
import org.eclipse.californium.elements.CorrelationMismatchException;
import org.eclipse.californium.elements.RawData;
import org.eclipse.californium.elements.RawDataChannel;
import org.eclipse.californium.elements.tcp.CloseOnErrorHandler;
import org.eclipse.californium.elements.tcp.CloseOnIdleHandler;
import org.eclipse.californium.elements.tcp.DatagramFramer;
import org.eclipse.californium.elements.tcp.DispatchHandler;
import org.eclipse.californium.elements.tcp.NettyContextUtils;

public class TcpClientConnector
implements Connector {
    private static final Logger LOGGER = Logger.getLogger(TcpClientConnector.class.getName());
    private final URI listenUri;
    private final int numberOfThreads;
    private final int connectionIdleTimeoutSeconds;
    private final int connectTimeoutMillis;
    private final InetSocketAddress localSocketAddress = new InetSocketAddress(0);
    private CorrelationContextMatcher correlationContextMatcher;
    private EventLoopGroup workerGroup;
    private RawDataChannel rawDataChannel;
    private AbstractChannelPoolMap<SocketAddress, ChannelPool> poolMap;

    public TcpClientConnector(int numberOfThreads, int connectTimeoutMillis, int idleTimeout) {
        this.numberOfThreads = numberOfThreads;
        this.connectionIdleTimeoutSeconds = idleTimeout;
        this.connectTimeoutMillis = connectTimeoutMillis;
        this.listenUri = URI.create(String.format("%s://%s:%d", this.getSupportedScheme(), this.localSocketAddress.getHostString(), this.localSocketAddress.getPort()));
    }

    @Override
    public synchronized void start() throws IOException {
        if (this.rawDataChannel == null) {
            throw new IllegalStateException("Cannot start without message handler.");
        }
        if (this.workerGroup != null) {
            throw new IllegalStateException("Connector already started");
        }
        this.workerGroup = new NioEventLoopGroup(this.numberOfThreads);
        this.poolMap = new AbstractChannelPoolMap<SocketAddress, ChannelPool>(){

            protected ChannelPool newPool(SocketAddress key) {
                Bootstrap bootstrap = ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(TcpClientConnector.this.workerGroup)).channel(NioSocketChannel.class)).option(ChannelOption.SO_KEEPALIVE, (Object)true)).option(ChannelOption.AUTO_READ, (Object)true)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)TcpClientConnector.this.connectTimeoutMillis)).remoteAddress(key);
                return new FixedChannelPool(bootstrap, (ChannelPoolHandler)new MyChannelPoolHandler(key), 1);
            }
        };
    }

    @Override
    public synchronized void stop() {
        if (null != this.workerGroup) {
            this.workerGroup.shutdownGracefully(0L, 1L, TimeUnit.SECONDS).syncUninterruptibly();
            this.workerGroup = null;
        }
    }

    @Override
    public void destroy() {
        this.stop();
    }

    @Override
    public void send(final RawData msg) {
        InetSocketAddress addressKey = new InetSocketAddress(msg.getAddress(), msg.getPort());
        final CorrelationContextMatcher correlationMatcher = this.getCorrelationContextMatcher();
        if (null != correlationMatcher && !this.poolMap.contains((Object)addressKey) && !correlationMatcher.isToBeSent(msg.getCorrelationContext(), null)) {
            if (LOGGER.isLoggable(Level.WARNING)) {
                LOGGER.log(Level.WARNING, "TcpConnector (drops {0} bytes to {1}:{2}", new Object[]{msg.getSize(), msg.getAddress(), msg.getPort()});
            }
            msg.onError(new CorrelationMismatchException());
            return;
        }
        final ChannelPool channelPool = this.poolMap.get((Object)addressKey);
        Future acquire = channelPool.acquire();
        acquire.addListener((GenericFutureListener)new GenericFutureListener<Future<Channel>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void operationComplete(Future<Channel> future) throws Exception {
                if (future.isSuccess()) {
                    Channel channel = (Channel)future.getNow();
                    CorrelationContext context = NettyContextUtils.buildCorrelationContext(channel);
                    try {
                        if (null != correlationMatcher && !correlationMatcher.isToBeSent(msg.getCorrelationContext(), context)) {
                            if (LOGGER.isLoggable(Level.WARNING)) {
                                LOGGER.log(Level.WARNING, "TcpConnector (drops {0} bytes to {1}:{2}", new Object[]{msg.getSize(), msg.getAddress(), msg.getPort()});
                            }
                            msg.onError(new CorrelationMismatchException());
                            return;
                        }
                        msg.onContextEstablished(context);
                        ChannelFuture channelFuture = channel.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])msg.getBytes()));
                        channelFuture.addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (future.isSuccess()) {
                                    msg.onSent();
                                } else if (future.isCancelled()) {
                                    msg.onError(new CancellationException());
                                } else {
                                    msg.onError(future.cause());
                                }
                            }
                        });
                    }
                    finally {
                        channelPool.release(channel);
                    }
                } else {
                    LOGGER.log(Level.WARNING, "Unable to open connection to " + msg.getAddress(), future.cause());
                }
            }
        });
    }

    @Override
    public void setRawDataReceiver(RawDataChannel messageHandler) {
        if (this.rawDataChannel != null) {
            throw new IllegalStateException("Raw data channel already set.");
        }
        this.rawDataChannel = messageHandler;
    }

    @Override
    public synchronized void setCorrelationContextMatcher(CorrelationContextMatcher matcher) {
        this.correlationContextMatcher = matcher;
    }

    private synchronized CorrelationContextMatcher getCorrelationContextMatcher() {
        return this.correlationContextMatcher;
    }

    @Override
    public InetSocketAddress getAddress() {
        return this.localSocketAddress;
    }

    protected void onNewChannelCreated(SocketAddress remote, Channel ch) {
    }

    protected String getSupportedScheme() {
        return "coap+tcp";
    }

    @Override
    public final boolean isSchemeSupported(String scheme) {
        return this.getSupportedScheme().equals(scheme);
    }

    @Override
    public final URI getUri() {
        return this.listenUri;
    }

    private class RemoveEmptyPoolHandler
    extends ChannelDuplexHandler {
        private final SocketAddress key;

        RemoveEmptyPoolHandler(SocketAddress key) {
            this.key = key;
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            TcpClientConnector.this.poolMap.remove((Object)this.key);
        }
    }

    private class MyChannelPoolHandler
    extends AbstractChannelPoolHandler {
        private final SocketAddress key;

        MyChannelPoolHandler(SocketAddress key) {
            this.key = key;
        }

        public void channelCreated(Channel ch) throws Exception {
            TcpClientConnector.this.onNewChannelCreated(this.key, ch);
            ch.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0, 0, TcpClientConnector.this.connectionIdleTimeoutSeconds)});
            ch.pipeline().addLast(new ChannelHandler[]{new CloseOnIdleHandler()});
            ch.pipeline().addLast(new ChannelHandler[]{new RemoveEmptyPoolHandler(this.key)});
            ch.pipeline().addLast(new ChannelHandler[]{new DatagramFramer()});
            ch.pipeline().addLast(new ChannelHandler[]{new DispatchHandler(TcpClientConnector.this.rawDataChannel)});
            ch.pipeline().addLast(new ChannelHandler[]{new CloseOnErrorHandler()});
        }
    }
}

