/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.message.nautilus;

import com.caucho.db.block.BlockStore;
import com.caucho.message.journal.JournalFile;
import com.caucho.message.nautilus.MessageDataNode;
import com.caucho.message.nautilus.NautilusBrokerSubscriber;
import com.caucho.util.ConcurrentArrayList;

class NautilusQueue {
    private final long _qid;
    private QueueEntry _head;
    private QueueEntry _tail;
    private ConcurrentArrayList<NautilusBrokerSubscriber> _subscriberList = new ConcurrentArrayList(NautilusBrokerSubscriber.class);
    private long _enqueueCount;
    private long _dequeueCount;
    private int _size;

    NautilusQueue(long qid) {
        this._qid = qid;
    }

    public int getSize() {
        return this._size;
    }

    public long getEnqueueCount() {
        return this._enqueueCount;
    }

    public long getDequeueCount() {
        return this._dequeueCount;
    }

    void processData(long mid, boolean isInit, boolean isFinal, BlockStore blockStore, long blockAddr, int offset, int length) {
        QueueEntry entry = new QueueEntry(mid);
        entry.addData(blockStore, blockAddr, offset, length);
        if (this._tail != null) {
            this._tail.setNext(entry);
        } else {
            this._head = entry;
        }
        this._tail = entry;
        ++this._size;
        ++this._enqueueCount;
        this.deliver();
    }

    void subscribe(NautilusBrokerSubscriber subscriber) {
        this._subscriberList.add((Object)subscriber);
        this.deliver();
    }

    void unsubscribe(NautilusBrokerSubscriber subscriber) {
        this._subscriberList.remove((Object)subscriber);
    }

    void ack(long sequence) {
        QueueEntry prev = null;
        for (QueueEntry entry = this._head; entry != null; entry = entry.getNext()) {
            if (entry.getSequence() != sequence) continue;
            if (prev != null) {
                prev.setNext(entry.getNext());
            } else {
                this._head = entry.getNext();
                if (this._head == null) {
                    this._tail = null;
                }
            }
            return;
        }
    }

    void ack(NautilusBrokerSubscriber subscriber) {
    }

    void deliver() {
        NautilusBrokerSubscriber[] subList = (NautilusBrokerSubscriber[])this._subscriberList.toArray();
        while (this._head != null) {
            boolean isDeliver = false;
            for (NautilusBrokerSubscriber sub : subList) {
                if (!sub.isAvailable()) continue;
                QueueEntry entry = this._head;
                this._head = this._head.getNext();
                if (this._head == null) {
                    this._tail = null;
                }
                --this._size;
                ++this._dequeueCount;
                sub.onTransfer(entry);
                isDeliver = true;
            }
            if (isDeliver) continue;
            return;
        }
    }

    public long updateCheckpoint(long tailAddress) {
        QueueEntry head = this._head;
        if (head == null) {
            return tailAddress;
        }
        long queueHeadAddress = head.getDataHead().getBlockAddress();
        if (!JournalFile.isSamePage(queueHeadAddress, tailAddress)) {
            return -1L;
        }
        if (queueHeadAddress < tailAddress) {
            return queueHeadAddress;
        }
        return tailAddress;
    }

    public String toString() {
        return this.getClass().getSimpleName() + "[" + this._qid + "]";
    }

    public static class QueueEntry {
        private final long _mid;
        private QueueEntry _next;
        private MessageDataNode _head;
        private MessageDataNode _tail;

        QueueEntry(long sequence) {
            this._mid = sequence;
        }

        long getSequence() {
            return this._mid;
        }

        QueueEntry getNext() {
            return this._next;
        }

        void setNext(QueueEntry next) {
            this._next = next;
        }

        MessageDataNode getDataHead() {
            return this._head;
        }

        void addData(BlockStore blockStore, long blockAddress, int offset, int length) {
            MessageDataNode dataNode = new MessageDataNode(blockStore, blockAddress, offset, length);
            if (this._tail != null) {
                this._tail.setNext(dataNode);
            } else {
                this._head = dataNode;
                this._tail = dataNode;
            }
        }
    }
}

