/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;

class PartitionRequestClientFactory {
    private final NettyClient nettyClient;
    private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>();

    PartitionRequestClientFactory(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
        PartitionRequestClient client = null;
        while (client == null) {
            Object entry = this.clients.get(connectionId);
            if (entry != null) {
                if (entry instanceof PartitionRequestClient) {
                    client = (PartitionRequestClient)entry;
                } else {
                    ConnectingChannel future = (ConnectingChannel)entry;
                    client = future.waitForChannel();
                    this.clients.replace(connectionId, future, client);
                }
            } else {
                ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this);
                Object old = this.clients.putIfAbsent(connectionId, connectingChannel);
                if (old == null) {
                    this.nettyClient.connect(connectionId.getAddress()).addListener((GenericFutureListener)connectingChannel);
                    client = connectingChannel.waitForChannel();
                    this.clients.replace(connectionId, connectingChannel, client);
                } else if (old instanceof ConnectingChannel) {
                    client = ((ConnectingChannel)old).waitForChannel();
                    this.clients.replace(connectionId, old, client);
                } else {
                    client = (PartitionRequestClient)old;
                }
            }
            if (client.incrementReferenceCounter()) continue;
            this.destroyPartitionRequestClient(connectionId, client);
            client = null;
        }
        return client;
    }

    public void closeOpenChannelConnections(ConnectionID connectionId) {
        ConnectingChannel channel;
        Object entry = this.clients.get(connectionId);
        if (entry instanceof ConnectingChannel && (channel = (ConnectingChannel)entry).dispose()) {
            this.clients.remove(connectionId, channel);
        }
    }

    int getNumberOfActiveClients() {
        return this.clients.size();
    }

    void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) {
        this.clients.remove(connectionId, client);
    }

    private static final class ConnectingChannel
    implements ChannelFutureListener {
        private final Object connectLock = new Object();
        private final ConnectionID connectionId;
        private final PartitionRequestClientFactory clientFactory;
        private boolean disposeRequestClient = false;
        private volatile PartitionRequestClient partitionRequestClient;
        private volatile Throwable error;

        public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
            this.connectionId = connectionId;
            this.clientFactory = clientFactory;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean dispose() {
            boolean result;
            Object object = this.connectLock;
            synchronized (object) {
                if (this.partitionRequestClient != null) {
                    result = this.partitionRequestClient.disposeIfNotUsed();
                } else {
                    this.disposeRequestClient = true;
                    result = true;
                }
                this.connectLock.notifyAll();
            }
            return result;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handInChannel(Channel channel) {
            Object object = this.connectLock;
            synchronized (object) {
                try {
                    PartitionRequestClientHandler requestHandler = (PartitionRequestClientHandler)channel.pipeline().get(PartitionRequestClientHandler.class);
                    this.partitionRequestClient = new PartitionRequestClient(channel, requestHandler, this.connectionId, this.clientFactory);
                    if (this.disposeRequestClient) {
                        this.partitionRequestClient.disposeIfNotUsed();
                    }
                    this.connectLock.notifyAll();
                }
                catch (Throwable t) {
                    this.notifyOfError(t);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {
            Object object = this.connectLock;
            synchronized (object) {
                while (this.error == null && this.partitionRequestClient == null) {
                    this.connectLock.wait(2000L);
                }
            }
            if (this.error != null) {
                throw new IOException("Connecting the channel failed: " + this.error.getMessage(), this.error);
            }
            return this.partitionRequestClient;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void notifyOfError(Throwable error) {
            Object object = this.connectLock;
            synchronized (object) {
                this.error = error;
                this.connectLock.notifyAll();
            }
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                this.handInChannel(future.channel());
            } else if (future.cause() != null) {
                this.notifyOfError(new RemoteTransportException("Connecting to remote task manager + '" + this.connectionId.getAddress() + "' has failed. This might indicate that the remote task manager has been lost.", this.connectionId.getAddress(), future.cause()));
            } else {
                this.notifyOfError(new LocalTransportException("Connecting to remote task manager + '" + this.connectionId.getAddress() + "' has been cancelled.", null));
            }
        }
    }
}

