/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.networking;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.flink.shaded.testutils.org.jboss.netty.bootstrap.ClientBootstrap;
import org.apache.flink.shaded.testutils.org.jboss.netty.buffer.ChannelBuffer;
import org.apache.flink.shaded.testutils.org.jboss.netty.buffer.ChannelBuffers;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.Channel;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ChannelFuture;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ChannelStateEvent;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ExceptionEvent;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.MessageEvent;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NetworkFailureHandler
extends SimpleChannelUpstreamHandler {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkFailureHandler.class);
    private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler";
    private final Map<Channel, Channel> sourceToTargetChannels = new ConcurrentHashMap<Channel, Channel>();
    private final Consumer<NetworkFailureHandler> onClose;
    private final ClientSocketChannelFactory channelFactory;
    private final String remoteHost;
    private final int remotePort;
    private final AtomicBoolean blocked;
    private static final Set<Channel> channelsBeingClosed = ConcurrentHashMap.newKeySet();
    private static final ChannelFutureListener CLOSE_WITH_BOOKKEEPING = new ChannelFutureListener(){

        @Override
        public void operationComplete(ChannelFuture future) {
            future.getChannel().close().addListener(channelFuture -> channelsBeingClosed.remove(channelFuture.getChannel()));
        }
    };

    public NetworkFailureHandler(AtomicBoolean blocked, Consumer<NetworkFailureHandler> onClose, ClientSocketChannelFactory channelFactory, String remoteHost, int remotePort) {
        this.blocked = blocked;
        this.onClose = onClose;
        this.channelFactory = channelFactory;
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    static void closeOnFlush(Channel channel) {
        if (channel.isConnected() && !channelsBeingClosed.contains(channel)) {
            channelsBeingClosed.add(channel);
            channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(CLOSE_WITH_BOOKKEEPING);
        }
    }

    public void closeConnections() {
        for (Map.Entry<Channel, Channel> entry : this.sourceToTargetChannels.entrySet()) {
            entry.getKey().close();
        }
    }

    @Override
    public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception {
        Channel sourceChannel = event.getChannel();
        sourceChannel.setReadable(false);
        boolean isBlocked = this.blocked.get();
        LOG.debug("Attempt to open proxy channel from [{}] to [{}:{}] in state [blocked = {}]", new Object[]{sourceChannel.getLocalAddress(), this.remoteHost, this.remotePort, isBlocked});
        if (isBlocked) {
            sourceChannel.close();
            return;
        }
        ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(this.channelFactory);
        targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(), this.blocked));
        ChannelFuture connectFuture = targetConnectionBootstrap.connect(new InetSocketAddress(this.remoteHost, this.remotePort));
        this.sourceToTargetChannels.put(sourceChannel, connectFuture.getChannel());
        connectFuture.addListener(future -> {
            if (future.isSuccess()) {
                sourceChannel.setReadable(true);
            } else {
                sourceChannel.close();
            }
        });
    }

    @Override
    public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception {
        if (this.blocked.get()) {
            return;
        }
        ChannelBuffer msg = (ChannelBuffer)event.getMessage();
        Channel targetChannel = this.sourceToTargetChannels.get(event.getChannel());
        if (targetChannel == null) {
            throw new IllegalStateException("Could not find a target channel for the source channel");
        }
        targetChannel.write(msg);
    }

    @Override
    public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws Exception {
        Channel targetChannel = this.sourceToTargetChannels.get(event.getChannel());
        if (targetChannel == null) {
            return;
        }
        NetworkFailureHandler.closeOnFlush(targetChannel);
        this.sourceToTargetChannels.remove(event.getChannel());
        this.onClose.accept(this);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception {
        LOG.error("Closing communication channel because of an exception", event.getCause());
        NetworkFailureHandler.closeOnFlush(event.getChannel());
    }

    private static class TargetChannelHandler
    extends SimpleChannelUpstreamHandler {
        private final Channel sourceChannel;
        private final AtomicBoolean blocked;

        TargetChannelHandler(Channel sourceChannel, AtomicBoolean blocked) {
            this.sourceChannel = sourceChannel;
            this.blocked = blocked;
        }

        @Override
        public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception {
            if (this.blocked.get()) {
                return;
            }
            ChannelBuffer msg = (ChannelBuffer)event.getMessage();
            this.sourceChannel.write(msg);
        }

        @Override
        public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws Exception {
            NetworkFailureHandler.closeOnFlush(this.sourceChannel);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception {
            LOG.error("Closing communication channel because of an exception", event.getCause());
            NetworkFailureHandler.closeOnFlush(event.getChannel());
        }
    }
}

