/*
 * Decompiled with CFR 0.152.
 */
package de.ruedigermoeller.fastcast.control;

import de.ruedigermoeller.fastcast.control.ReceiveBufferDispatcher;
import de.ruedigermoeller.fastcast.packeting.ControlPacket;
import de.ruedigermoeller.fastcast.packeting.DataPacket;
import de.ruedigermoeller.fastcast.packeting.MsgReceiver;
import de.ruedigermoeller.fastcast.packeting.Packet;
import de.ruedigermoeller.fastcast.packeting.PacketReceiveBuffer;
import de.ruedigermoeller.fastcast.packeting.PacketSendBuffer;
import de.ruedigermoeller.fastcast.packeting.RetransPacket;
import de.ruedigermoeller.fastcast.packeting.TopicEntry;
import de.ruedigermoeller.fastcast.remoting.FCRemotingListener;
import de.ruedigermoeller.fastcast.remoting.FCTopicService;
import de.ruedigermoeller.fastcast.remoting.FastCast;
import de.ruedigermoeller.fastcast.transport.Transport;
import de.ruedigermoeller.fastcast.util.FCLog;
import de.ruedigermoeller.heapoff.bytez.Bytez;
import de.ruedigermoeller.heapoff.bytez.onheap.HeapBytez;
import de.ruedigermoeller.heapoff.structs.FSTStruct;
import de.ruedigermoeller.heapoff.structs.FSTStructAllocator;
import de.ruedigermoeller.heapoff.structs.structtypes.StructString;
import java.io.IOException;
import java.net.DatagramPacket;
import java.util.List;

public class FCTransportDispatcher {
    public static int MAX_NUM_TOPICS = 256;
    Transport trans;
    ReceiveBufferDispatcher[] receiver;
    PacketSendBuffer[] sender;
    StructString clusterName;
    StructString nodeId;
    Thread receiverThread;
    Thread calbackCleaner;
    FSTStructAllocator alloc = new FSTStructAllocator(1);
    Bytez heartbeat = new HeapBytez(new byte[]{99});
    Packet receivedPacket;

    public FCTransportDispatcher(Transport trans, String clusterName, String nodeId) {
        this.trans = trans;
        this.nodeId = (StructString)this.alloc.newStruct((FSTStruct)new StructString(nodeId));
        this.clusterName = (StructString)this.alloc.newStruct((FSTStruct)new StructString(clusterName));
        this.receiver = new ReceiveBufferDispatcher[MAX_NUM_TOPICS];
        this.sender = new PacketSendBuffer[MAX_NUM_TOPICS];
        this.receiverThread = new Thread("trans receiver " + trans.getConf().getName()){

            @Override
            public void run() {
                FCTransportDispatcher.this.receiveLoop();
            }
        };
        this.calbackCleaner = new Thread("callback cleaner"){

            @Override
            public void run() {
                FCTransportDispatcher.this.cleanCBLoop();
            }
        };
        this.receiverThread.start();
        this.calbackCleaner.start();
    }

    void cleanCBLoop() {
        while (true) {
            try {
                block3: while (true) {
                    Thread.sleep(100L);
                    long now = System.currentTimeMillis();
                    int i = 0;
                    while (true) {
                        if (i >= this.sender.length) continue block3;
                        PacketSendBuffer packetSendBuffer = this.sender[i];
                        if (packetSendBuffer != null) {
                            packetSendBuffer.getTopicEntry().getCbMap().release(now);
                        }
                        ++i;
                    }
                    break;
                }
            }
            catch (Exception e) {
                FCLog.log(e);
                continue;
            }
            break;
        }
    }

    public void installReceiver(TopicEntry chan, MsgReceiver msgListener) {
        ReceiveBufferDispatcher receiveBufferDispatcher = new ReceiveBufferDispatcher(this.trans.getConf().getDgramsize(), this.clusterName.toString(), this.nodeId.toString(), chan, msgListener);
        if (this.receiver[chan.getTopic()] != null) {
            throw new RuntimeException("double usage of topic " + chan.getTopic() + " on transport " + this.trans.getConf().getName());
        }
        this.receiver[chan.getTopic()] = receiveBufferDispatcher;
    }

    public boolean hasReceiver(TopicEntry chan) {
        return this.receiver[chan.getTopic()] != null;
    }

    public boolean hasSender(TopicEntry chan) {
        return this.sender[chan.getTopic()] != null;
    }

    public void installSender(final TopicEntry topicEntry) {
        PacketSendBuffer packetSendBuffer;
        if (this.sender[topicEntry.getTopic()] != null) {
            return;
        }
        topicEntry.setTrans(this.trans);
        this.sender[topicEntry.getTopic()] = packetSendBuffer = new PacketSendBuffer(this.trans.getConf().getDgramsize(), this.clusterName.toString(), this.nodeId.toString(), topicEntry);
        topicEntry.setSender(packetSendBuffer);
        if (topicEntry.getConf().getMaxSendPacketQueueSize() != 0) {
            Thread senderThread = new Thread("trans sender " + this.trans.getConf().getName() + " " + topicEntry.getName()){

                @Override
                public void run() {
                    FCTransportDispatcher.this.sendLoop(topicEntry);
                }
            };
            senderThread.start();
        }
    }

    public PacketSendBuffer getSender(int topic) {
        return this.sender[topic];
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendLoop(TopicEntry topic) {
        while (this.trans == null || this.sender[topic.getTopic()] == null) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                FCLog.log(e);
            }
        }
        PacketSendBuffer packetSendBuffer = this.sender[topic.getTopic()];
        long lastStat = System.currentTimeMillis();
        long lastHB = System.currentTimeMillis();
        int count = 0;
        long flowControlInterval = topic.getConf().getFlowControlInterval();
        long heartBeatInterval = topic.getConf().getHeartbeatInterval();
        int nothingSentCount = 0;
        while (true) {
            try {
                while (true) {
                    boolean succ;
                    boolean anyThingSent;
                    nothingSentCount = (anyThingSent = packetSendBuffer.send(this.trans)) ? 0 : ++nothingSentCount;
                    if (!packetSendBuffer.useSpinLock() && nothingSentCount > 1000) {
                        Object sendWakeupLock = packetSendBuffer.getSendWakeupLock();
                        nothingSentCount = 0;
                        Object object = sendWakeupLock;
                        synchronized (object) {
                            sendWakeupLock.wait(0L, 10);
                        }
                    }
                    if (++count % 100 != 0) continue;
                    long now = System.currentTimeMillis();
                    if (now - lastStat > flowControlInterval) {
                        packetSendBuffer.doFlowControl();
                        lastStat = now;
                    }
                    if (now - lastHB <= heartBeatInterval || !(succ = this.putHeartbeat(packetSendBuffer))) continue;
                    lastHB = now;
                }
            }
            catch (Exception e) {
                FCLog.log(e);
                continue;
            }
            break;
        }
    }

    public boolean putHeartbeat(PacketSendBuffer packetSendBuffer) {
        return packetSendBuffer.putMessage(-1, this.heartbeat, 0, 1, true);
    }

    void receiveLoop() {
        byte[] receiveBuf = new byte[this.trans.getConf().getDgramsize()];
        DatagramPacket p = new DatagramPacket(receiveBuf, receiveBuf.length);
        this.receivedPacket = (Packet)this.alloc.newStruct((FSTStruct)new Packet());
        while (true) {
            try {
                while (true) {
                    this.receiveDatagram(p);
                }
            }
            catch (IOException e) {
                FCLog.log(e);
                continue;
            }
            break;
        }
    }

    private void receiveDatagram(DatagramPacket p) throws IOException {
        if (this.trans.receive(p)) {
            this.receivedPacket.baseOn(p.getData(), p.getOffset());
            boolean sameCluster = this.receivedPacket.getCluster().equals((Object)this.clusterName);
            boolean selfSent = this.receivedPacket.getSender().equals((Object)this.nodeId);
            if (sameCluster && !selfSent) {
                ReceiveBufferDispatcher receiveBufferDispatcher;
                ControlPacket control;
                int topic = this.receivedPacket.getTopic();
                if (topic > MAX_NUM_TOPICS || topic < 0) {
                    FCLog.get().warn("foreign traffic");
                    return;
                }
                if (this.receiver[topic] == null && this.sender[topic] == null) {
                    return;
                }
                Class type = this.receivedPacket.getPointedClass();
                StructString receivedPacketReceiver = this.receivedPacket.getReceiver();
                if (type == DataPacket.class) {
                    if (this.receiver[topic] == null) {
                        return;
                    }
                    if (receivedPacketReceiver == null || receivedPacketReceiver.getLen() == 0 || receivedPacketReceiver.equals((Object)this.nodeId)) {
                        this.dispatchDataPacket(this.receivedPacket, topic);
                    }
                } else if (type == RetransPacket.class) {
                    if (this.sender[topic] == null) {
                        return;
                    }
                    if (receivedPacketReceiver.equals((Object)this.nodeId)) {
                        this.dispatchRetransmissionRequest(this.receivedPacket, topic);
                    }
                } else if (type == ControlPacket.class && (control = (ControlPacket)this.receivedPacket.cast()).getType() == ControlPacket.DROPPED && receivedPacketReceiver.equals((Object)this.nodeId) && (receiveBufferDispatcher = this.receiver[topic]) != null) {
                    FCRemotingListener remotingListener;
                    FCLog.get().warn(this.nodeId + " has been dropped by " + this.receivedPacket.getSender() + " on service " + receiveBufferDispatcher.getTopicEntry().getName());
                    FCTopicService service = receiveBufferDispatcher.getTopicEntry().getService();
                    if (service != null) {
                        service.droppedFromReceiving();
                    }
                    if ((remotingListener = FastCast.getRemoting().getRemotingListener()) != null) {
                        remotingListener.droppedFromTopic(receiveBufferDispatcher.getTopicEntry().getTopic(), receiveBufferDispatcher.getTopicEntry().getName());
                    }
                    this.receiver[topic] = null;
                }
            }
        }
    }

    private void dispatchDataPacket(Packet receivedPacket, int topic) throws IOException {
        DataPacket p;
        PacketReceiveBuffer buffer = this.receiver[topic].getBuffer(receivedPacket.getSender());
        RetransPacket retransPacket = buffer.receivePacket(p = (DataPacket)receivedPacket.cast().detach());
        if (retransPacket != null) {
            this.trans.send(new DatagramPacket(retransPacket.getBase().toBytes((int)retransPacket.getOffset(), retransPacket.getByteSize()), 0, retransPacket.getByteSize()));
        }
    }

    private void dispatchRetransmissionRequest(Packet receivedPacket, int topic) throws IOException {
        RetransPacket retransPacket = (RetransPacket)receivedPacket.cast().detach();
        this.sender[topic].addRetransmissionRequest(retransPacket, this.trans);
    }

    public void startListening(TopicEntry topic) {
        this.installReceiver(topic, topic.getMsgReceiver());
    }

    public void stopListening(TopicEntry topic) {
        this.receiver[topic.getTopic()] = null;
    }

    public void cleanup(List<String> timedOutSenders, int topic) {
        for (int i = 0; i < timedOutSenders.size(); ++i) {
            String s = timedOutSenders.get(i);
            ReceiveBufferDispatcher receiveBufferDispatcher = this.receiver[topic];
            FCLog.get().cluster("stopped receiving heartbeats from " + s);
            if (receiveBufferDispatcher == null) continue;
            receiveBufferDispatcher.cleanup(s);
        }
    }
}

