/*
 * Decompiled with CFR 0.152.
 */
package org.nd4j.parameterserver.distributed.transport;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.Header;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import lombok.NonNull;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.nd4j.aeron.ipc.AeronUtil;
import org.nd4j.linalg.exception.ND4JIllegalStateException;
import org.nd4j.linalg.util.HashUtil;
import org.nd4j.parameterserver.distributed.conf.VoidConfiguration;
import org.nd4j.parameterserver.distributed.enums.NodeRole;
import org.nd4j.parameterserver.distributed.logic.ClientRouter;
import org.nd4j.parameterserver.distributed.logic.RetransmissionHandler;
import org.nd4j.parameterserver.distributed.logic.completion.Clipboard;
import org.nd4j.parameterserver.distributed.logic.routing.InterleavedRouter;
import org.nd4j.parameterserver.distributed.messages.DistributedMessage;
import org.nd4j.parameterserver.distributed.messages.Frame;
import org.nd4j.parameterserver.distributed.messages.MeaningfulMessage;
import org.nd4j.parameterserver.distributed.messages.RequestMessage;
import org.nd4j.parameterserver.distributed.messages.TrainingMessage;
import org.nd4j.parameterserver.distributed.messages.VoidAggregation;
import org.nd4j.parameterserver.distributed.messages.VoidMessage;
import org.nd4j.parameterserver.distributed.messages.requests.IntroductionRequestMessage;
import org.nd4j.parameterserver.distributed.transport.BaseTransport;
import org.nd4j.parameterserver.distributed.transport.Transport;
import org.nd4j.shade.guava.math.IntMath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public class RoutedTransport
extends BaseTransport {
    private static final Logger log = LoggerFactory.getLogger(RoutedTransport.class);
    private static final long DEFAULT_TERM_BUFFER_PROP = IntMath.pow((int)2, (int)25);
    protected List<RemoteConnection> shards = new ArrayList<RemoteConnection>();
    protected Map<Long, RemoteConnection> clients = new ConcurrentHashMap<Long, RemoteConnection>();
    protected ClientRouter router;

    @Override
    public void init(@NonNull VoidConfiguration voidConfiguration, @NonNull Clipboard clipboard, @NonNull NodeRole role, @NonNull String localIp, int localPort, short shardIndex) {
        if (voidConfiguration == null) {
            throw new NullPointerException("voidConfiguration is marked non-null but is null");
        }
        if (clipboard == null) {
            throw new NullPointerException("clipboard is marked non-null but is null");
        }
        if (role == null) {
            throw new NullPointerException("role is marked non-null but is null");
        }
        if (localIp == null) {
            throw new NullPointerException("localIp is marked non-null but is null");
        }
        this.nodeRole = role;
        this.clipboard = clipboard;
        this.voidConfiguration = voidConfiguration;
        this.shardIndex = shardIndex;
        this.messages = new LinkedBlockingQueue();
        super.init(voidConfiguration, clipboard, role, localIp, localPort, shardIndex);
        System.setProperty("aeron.client.liveness.timeout", "30000000000");
        String p = System.getProperty("aeron.term.buffer.length");
        if (p == null) {
            System.setProperty("aeron.term.buffer.length", String.valueOf(DEFAULT_TERM_BUFFER_PROP));
        }
        this.context = new Aeron.Context().driverTimeoutMs(30000L).keepAliveIntervalNs(100000000L);
        AeronUtil.setDaemonizedThreadFactories((Aeron.Context)this.context);
        MediaDriver.Context ctx = new MediaDriver.Context();
        AeronUtil.setDaemonizedThreadFactories((MediaDriver.Context)ctx);
        this.driver = MediaDriver.launchEmbedded((MediaDriver.Context)ctx);
        this.context.aeronDirectoryName(this.driver.aeronDirectoryName());
        this.aeron = Aeron.connect((Aeron.Context)this.context);
        if (this.router == null) {
            this.router = new InterleavedRouter();
        }
        if (this.port == 0) {
            this.ip = localIp;
            this.port = localPort;
        }
        this.unicastChannelUri = "aeron:udp?endpoint=" + this.ip + ":" + this.port;
        this.subscriptionForClients = this.aeron.addSubscription(this.unicastChannelUri, voidConfiguration.getStreamId());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            CloseHelper.quietClose((AutoCloseable)this.aeron);
            CloseHelper.quietClose((AutoCloseable)this.driver);
            CloseHelper.quietClose((AutoCloseable)this.subscriptionForClients);
        }));
        this.messageHandlerForClients = new FragmentAssembler((buffer, offset, length, header) -> this.jointMessageHandler(buffer, offset, length, header));
        String shardChannelUri = null;
        String remoteIp = null;
        int remotePort = 0;
        for (String ip : voidConfiguration.getShardAddresses()) {
            if (ip.contains(":")) {
                shardChannelUri = "aeron:udp?endpoint=" + ip;
                String[] split = ip.split(":");
                remoteIp = split[0];
                remotePort = Integer.valueOf(split[1]);
            } else {
                shardChannelUri = "aeron:udp?endpoint=" + ip + ":" + voidConfiguration.getUnicastControllerPort();
                remoteIp = ip;
                remotePort = voidConfiguration.getUnicastControllerPort();
            }
            ConcurrentPublication publication = this.aeron.addPublication(shardChannelUri, voidConfiguration.getStreamId());
            RemoteConnection connection = RemoteConnection.builder().ip(remoteIp).port(remotePort).publication((Publication)publication).locker(new Object()).build();
            this.shards.add(connection);
        }
        if (this.nodeRole == NodeRole.SHARD) {
            log.info("Initialized as [{}]; ShardIndex: [{}]; Own endpoint: [{}]", new Object[]{this.nodeRole, shardIndex, this.unicastChannelUri});
        } else {
            log.info("Initialized as [{}]; Own endpoint: [{}]", (Object)this.nodeRole, (Object)this.unicastChannelUri);
        }
        switch (this.nodeRole) {
            case MASTER: 
            case BACKUP: 
            case SHARD: {
                this.addClient(this.ip, this.port);
                break;
            }
            case CLIENT: {
                break;
            }
            default: {
                throw new ND4JIllegalStateException("Unknown NodeRole being passed: " + this.nodeRole);
            }
        }
        this.router.init(voidConfiguration, this);
        this.originatorId = HashUtil.getLongHash((String)(this.getIp() + ":" + this.getPort()));
    }

    @Override
    public void sendMessageToAllClients(VoidMessage message, Long ... exclusions) {
        if (this.nodeRole != NodeRole.SHARD) {
            throw new ND4JIllegalStateException("Only SHARD allowed to send messages to all Clients");
        }
        UnsafeBuffer buffer = message.asUnsafeBuffer();
        AtomicInteger cnt = new AtomicInteger(0);
        this.clients.values().parallelStream().filter(rc -> {
            if (rc.getLongHash() == this.originatorId || rc.getLongHash() == 0L) {
                return false;
            }
            if (exclusions != null && cnt.get() < exclusions.length) {
                for (Long exclude : exclusions) {
                    if (exclude.longValue() != rc.getLongHash()) continue;
                    cnt.incrementAndGet();
                    return false;
                }
            }
            return true;
        }).forEach(arg_0 -> this.lambda$sendMessageToAllClients$3((DirectBuffer)buffer, arg_0));
    }

    @Override
    protected void sendCoordinationCommand(VoidMessage message) {
        message.setOriginatorId(this.originatorId);
        if (this.nodeRole == NodeRole.SHARD && this.voidConfiguration.getNumberOfShards() == 1) {
            try {
                this.messages.put(message);
                return;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        UnsafeBuffer buffer = message.asUnsafeBuffer();
        this.shards.parallelStream().forEach(arg_0 -> this.lambda$sendCoordinationCommand$4(message, (DirectBuffer)buffer, arg_0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void sendFeedbackToClient(VoidMessage message) {
        long targetAddress = message.getOriginatorId();
        if (targetAddress == this.originatorId) {
            this.completed.put(message.getTaskId(), (MeaningfulMessage)message);
            return;
        }
        RemoteConnection connection = this.clients.get(targetAddress);
        boolean delivered = false;
        if (connection == null) {
            log.info("Can't get client with address [{}]", (Object)targetAddress);
            log.info("Known clients: {}", this.clients.keySet());
            throw new RuntimeException();
        }
        while (!delivered) {
            RetransmissionHandler.TransmissionStatus result;
            Object object = connection.locker;
            synchronized (object) {
                result = RetransmissionHandler.getTransmissionStatus(connection.getPublication().offer((DirectBuffer)message.asUnsafeBuffer()));
            }
            switch (result) {
                case ADMIN_ACTION: 
                case BACKPRESSURE: {
                    try {
                        Thread.sleep(this.voidConfiguration.getRetransmitTimeout());
                    }
                    catch (Exception exception) {}
                    break;
                }
                case NOT_CONNECTED: {
                    try {
                        Thread.sleep(this.voidConfiguration.getRetransmitTimeout());
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                case MESSAGE_SENT: {
                    delivered = true;
                }
            }
        }
    }

    @Override
    public int numberOfKnownClients() {
        return this.clients.size();
    }

    @Override
    public int numberOfKnownShards() {
        return this.shards.size();
    }

    @Override
    protected void shutdownSilent() {
        this.shards.forEach(rc -> rc.getPublication().close());
        this.clients.values().forEach(rc -> rc.getPublication().close());
        this.subscriptionForClients.close();
        this.aeron.close();
        this.context.close();
        this.driver.close();
    }

    @Override
    public void shutdown() {
        this.runner.set(false);
        if (this.threadB != null) {
            this.threadB.interrupt();
        }
        if (this.threadA != null) {
            this.threadA.interrupt();
        }
        this.shutdownSilent();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void sendCommandToShard(VoidMessage message) {
        if (this.nodeRole == NodeRole.SHARD && message instanceof TrainingMessage) {
            this.router.setOriginator(message);
            message.setTargetId(this.getShardIndex());
            try {
                this.messages.put(message);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return;
        }
        int targetShard = this.router.assignTarget(message);
        boolean delivered = false;
        RemoteConnection connection = this.shards.get(targetShard);
        while (!delivered) {
            RetransmissionHandler.TransmissionStatus result;
            Object object = connection.locker;
            synchronized (object) {
                result = RetransmissionHandler.getTransmissionStatus(connection.getPublication().offer((DirectBuffer)message.asUnsafeBuffer()));
            }
            switch (result) {
                case ADMIN_ACTION: 
                case BACKPRESSURE: {
                    try {
                        Thread.sleep(this.voidConfiguration.getRetransmitTimeout());
                    }
                    catch (Exception exception) {}
                    break;
                }
                case NOT_CONNECTED: {
                    if (!connection.getActivated().get()) {
                        try {
                            Thread.sleep(this.voidConfiguration.getRetransmitTimeout());
                        }
                        catch (Exception exception) {}
                        break;
                    }
                    throw new ND4JIllegalStateException("Shards reassignment is to be implemented yet");
                }
                case MESSAGE_SENT: {
                    delivered = true;
                    connection.getActivated().set(true);
                }
            }
        }
    }

    protected void jointMessageHandler(DirectBuffer buffer, int offset, int length, Header header) {
        byte[] data = new byte[length];
        buffer.getBytes(offset, data);
        Object message = VoidMessage.fromBytes(data);
        if (message instanceof MeaningfulMessage) {
            MeaningfulMessage msg = (MeaningfulMessage)message;
            this.completed.put(message.getTaskId(), msg);
        } else if (message instanceof RequestMessage) {
            try {
                this.messages.put((RequestMessage)message);
            }
            catch (InterruptedException msg) {
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else if (message instanceof DistributedMessage) {
            try {
                this.messages.put((DistributedMessage)message);
            }
            catch (InterruptedException e) {
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else if (message instanceof TrainingMessage) {
            try {
                this.messages.put((TrainingMessage)message);
            }
            catch (InterruptedException e) {
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else if (message instanceof VoidAggregation) {
            try {
                this.messages.put((VoidAggregation)message);
            }
            catch (InterruptedException e) {
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else if (message instanceof Frame) {
            try {
                this.messages.put((Frame)message);
            }
            catch (InterruptedException e) {
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            log.info("Unknown message: {}", (Object)message.getClass().getSimpleName());
        }
    }

    @Override
    public void launch(@NonNull Transport.ThreadingModel threading) {
        if (threading == null) {
            throw new NullPointerException("threading is marked non-null but is null");
        }
        super.launch(threading);
        IntroductionRequestMessage irm = new IntroductionRequestMessage(this.getIp(), this.getPort());
        irm.setTargetId((short)-1);
        this.sendCoordinationCommand(irm);
    }

    @Override
    public synchronized void addShard(String ip, int port) {
        Long hash = HashUtil.getLongHash((String)(ip + ":" + port));
        RemoteConnection connection = RemoteConnection.builder().ip(ip).port(port).publication((Publication)this.aeron.addPublication("aeron:udp?endpoint=" + ip + ":" + port, this.voidConfiguration.getStreamId())).longHash(hash).locker(new Object()).activated(new AtomicBoolean(false)).build();
        log.info("sI_{} {}: Adding SHARD: [{}] to {}:{}", new Object[]{this.shardIndex, this.nodeRole, hash, ip, port});
        this.shards.add(connection);
    }

    @Override
    public synchronized void addClient(String ip, int port) {
        Long hash = HashUtil.getLongHash((String)(ip + ":" + port));
        if (this.clients.containsKey(hash)) {
            return;
        }
        RemoteConnection connection = RemoteConnection.builder().ip(ip).port(port).publication((Publication)this.aeron.addPublication("aeron:udp?endpoint=" + ip + ":" + port, this.voidConfiguration.getStreamId())).longHash(hash).locker(new Object()).activated(new AtomicBoolean(false)).build();
        log.info("sI_{} {}: Adding connection: [{}] to {}:{}", new Object[]{this.shardIndex, this.nodeRole, hash, ip, port});
        this.clients.put(hash, connection);
        log.info("sI_{} {}: Known clients: {}", new Object[]{this.shardIndex, this.nodeRole, this.clients.keySet()});
    }

    public ClientRouter getRouter() {
        return this.router;
    }

    public void setRouter(ClientRouter router) {
        this.router = router;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private /* synthetic */ void lambda$sendCoordinationCommand$4(VoidMessage message, DirectBuffer buffer, RemoteConnection rc) {
        long retr = 0L;
        boolean delivered = false;
        long address = HashUtil.getLongHash((String)(rc.getIp() + ":" + rc.getPort()));
        if (this.originatorId == address) {
            try {
                this.messages.put(message);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return;
        }
        while (!delivered) {
            RetransmissionHandler.TransmissionStatus res;
            Object object = rc.locker;
            synchronized (object) {
                res = RetransmissionHandler.getTransmissionStatus(rc.getPublication().offer(buffer));
            }
            switch (res) {
                case NOT_CONNECTED: {
                    if (!rc.getActivated().get()) {
                        if (++retr > 20L) {
                            throw new ND4JIllegalStateException("Can't connect to Shard: [" + rc.getPublication().channel() + "]");
                        }
                        try {
                            Thread.sleep(this.voidConfiguration.getRetransmitTimeout());
                        }
                        catch (Exception exception) {}
                        break;
                    }
                    throw new ND4JIllegalStateException("Shards reassignment is to be implemented yet");
                }
                case ADMIN_ACTION: 
                case BACKPRESSURE: {
                    try {
                        Thread.sleep(this.voidConfiguration.getRetransmitTimeout());
                    }
                    catch (Exception exception) {}
                    break;
                }
                case MESSAGE_SENT: {
                    delivered = true;
                    rc.getActivated().set(true);
                }
            }
            if (delivered) continue;
            log.info("Attempting to resend message");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private /* synthetic */ void lambda$sendMessageToAllClients$3(DirectBuffer buffer, RemoteConnection rc) {
        long retr = 0L;
        boolean delivered = false;
        while (!delivered) {
            RetransmissionHandler.TransmissionStatus res;
            Object object = rc.locker;
            synchronized (object) {
                res = RetransmissionHandler.getTransmissionStatus(rc.getPublication().offer(buffer));
            }
            switch (res) {
                case NOT_CONNECTED: {
                    if (!rc.getActivated().get()) {
                        if (++retr > 20L) {
                            throw new ND4JIllegalStateException("Can't connect to Shard: [" + rc.getPublication().channel() + "]");
                        }
                        try {
                            LockSupport.parkNanos(this.voidConfiguration.getRetransmitTimeout() * 1000000L);
                            break;
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                    throw new ND4JIllegalStateException("Shards reassignment is to be implemented yet");
                }
                case ADMIN_ACTION: 
                case BACKPRESSURE: {
                    try {
                        LockSupport.parkNanos(this.voidConfiguration.getRetransmitTimeout() * 1000000L);
                        break;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                case MESSAGE_SENT: {
                    delivered = true;
                    rc.getActivated().set(true);
                }
            }
        }
    }

    public static class RemoteConnection {
        private String ip;
        private int port;
        private Publication publication;
        private Object locker;
        private AtomicBoolean activated;
        protected long longHash;

        RemoteConnection(String ip, int port, Publication publication, Object locker, AtomicBoolean activated, long longHash) {
            this.ip = ip;
            this.port = port;
            this.publication = publication;
            this.locker = locker;
            this.activated = activated;
            this.longHash = longHash;
        }

        public static RemoteConnectionBuilder builder() {
            return new RemoteConnectionBuilder();
        }

        public String getIp() {
            return this.ip;
        }

        public int getPort() {
            return this.port;
        }

        public Publication getPublication() {
            return this.publication;
        }

        public Object getLocker() {
            return this.locker;
        }

        public AtomicBoolean getActivated() {
            return this.activated;
        }

        public long getLongHash() {
            return this.longHash;
        }

        public void setIp(String ip) {
            this.ip = ip;
        }

        public void setPort(int port) {
            this.port = port;
        }

        public void setPublication(Publication publication) {
            this.publication = publication;
        }

        public void setLocker(Object locker) {
            this.locker = locker;
        }

        public void setActivated(AtomicBoolean activated) {
            this.activated = activated;
        }

        public void setLongHash(long longHash) {
            this.longHash = longHash;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof RemoteConnection)) {
                return false;
            }
            RemoteConnection other = (RemoteConnection)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getPort() != other.getPort()) {
                return false;
            }
            if (this.getLongHash() != other.getLongHash()) {
                return false;
            }
            String this$ip = this.getIp();
            String other$ip = other.getIp();
            if (this$ip == null ? other$ip != null : !this$ip.equals(other$ip)) {
                return false;
            }
            Publication this$publication = this.getPublication();
            Publication other$publication = other.getPublication();
            if (this$publication == null ? other$publication != null : !this$publication.equals(other$publication)) {
                return false;
            }
            Object this$locker = this.getLocker();
            Object other$locker = other.getLocker();
            if (this$locker == null ? other$locker != null : !this$locker.equals(other$locker)) {
                return false;
            }
            AtomicBoolean this$activated = this.getActivated();
            AtomicBoolean other$activated = other.getActivated();
            return !(this$activated == null ? other$activated != null : !this$activated.equals(other$activated));
        }

        protected boolean canEqual(Object other) {
            return other instanceof RemoteConnection;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getPort();
            long $longHash = this.getLongHash();
            result = result * 59 + (int)($longHash >>> 32 ^ $longHash);
            String $ip = this.getIp();
            result = result * 59 + ($ip == null ? 43 : $ip.hashCode());
            Publication $publication = this.getPublication();
            result = result * 59 + ($publication == null ? 43 : $publication.hashCode());
            Object $locker = this.getLocker();
            result = result * 59 + ($locker == null ? 43 : $locker.hashCode());
            AtomicBoolean $activated = this.getActivated();
            result = result * 59 + ($activated == null ? 43 : $activated.hashCode());
            return result;
        }

        public String toString() {
            return "RoutedTransport.RemoteConnection(ip=" + this.getIp() + ", port=" + this.getPort() + ", publication=" + this.getPublication() + ", locker=" + this.getLocker() + ", activated=" + this.getActivated() + ", longHash=" + this.getLongHash() + ")";
        }

        public static class RemoteConnectionBuilder {
            private String ip;
            private int port;
            private Publication publication;
            private long longHash;
            private Object locker = new Object();
            private AtomicBoolean activated = new AtomicBoolean();

            RemoteConnectionBuilder() {
            }

            public RemoteConnectionBuilder ip(String ip) {
                this.ip = ip;
                return this;
            }

            public RemoteConnectionBuilder port(int port) {
                this.port = port;
                return this;
            }

            public RemoteConnectionBuilder publication(Publication publication) {
                this.publication = publication;
                return this;
            }

            public RemoteConnectionBuilder locker(Object locker) {
                this.locker = locker;
                return this;
            }

            public RemoteConnectionBuilder activated(AtomicBoolean activated) {
                this.activated = activated;
                return this;
            }

            public RemoteConnectionBuilder longHash(long longHash) {
                this.longHash = longHash;
                return this;
            }

            public RemoteConnection build() {
                return new RemoteConnection(this.ip, this.port, this.publication, this.locker, this.activated, this.longHash);
            }

            public String toString() {
                return "RoutedTransport.RemoteConnection.RemoteConnectionBuilder(ip=" + this.ip + ", port=" + this.port + ", publication=" + this.publication + ", locker=" + this.locker + ", activated=" + this.activated + ", longHash=" + this.longHash + ")";
            }
        }
    }
}

