/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.message.nautilus;

import com.caucho.env.actor.AbstractActorProcessor;
import com.caucho.message.journal.JournalResult;
import com.caucho.message.nautilus.NautilusCheckpointPublisher;
import com.caucho.message.nautilus.NautilusQueue;
import com.caucho.message.nautilus.NautilusRingItem;
import java.util.HashMap;
import java.util.logging.Logger;

class NautilusMultiQueueActor
extends AbstractActorProcessor<NautilusRingItem> {
    private static final Logger log = Logger.getLogger(NautilusMultiQueueActor.class.getName());
    private static final long C_DURABLE = 0x200000000000L;
    private static final int C_PRIORITY_OFF = 40;
    private static final long C_PRIORITY_MASK = 15L;
    private static final long C_PRIORITY_DEFAULT = 4L;
    private static final long C_EXPIRE_LOSS_BITS = 6L;
    private static final long C_EXPIRE_LOSS_MASK = 63L;
    private static final long C_EXPIRE_BITS = 26L;
    private static final long C_EXPIRE_MASK = 0x3FFFFFFL;
    private static final int C_EXPIRE_OFF = 6;
    private static final long C_OP = 31L;
    private String _threadName;
    private HashMap<Long, NautilusQueue> _queueMap = new HashMap();
    private NautilusCheckpointPublisher _checkpointPub;
    private long _enqueueCount;
    private long _dequeueCount;
    private long _messageCount;
    private long _lastAddress;
    private long _lastCheckpoint;
    private int _size;

    public NautilusMultiQueueActor() {
        this._threadName = this.toString();
    }

    void setNautilusCheckpointPublisher(NautilusCheckpointPublisher pub) {
        this._checkpointPub = pub;
    }

    public int getSize() {
        return this._size;
    }

    public long getEnqueueCount() {
        return this._enqueueCount;
    }

    public long getDequeueCount() {
        return this._dequeueCount;
    }

    public String getThreadName() {
        return this._threadName;
    }

    public void process(NautilusRingItem entry) throws Exception {
        long qid = entry.getQid();
        NautilusQueue queue = this.getQueue(qid);
        long code = entry.getCode();
        int op = this.decodeOp(code);
        long mid = entry.getMid();
        switch (op) {
            case 2: {
                long count;
                JournalResult result = entry.getResult();
                queue.processData(mid, entry.isInit(), entry.isFin(), result.getBlockStore(), result.getBlockAddr1(), result.getOffset1(), result.getLength1());
                this._lastAddress = result.getBlockAddr1();
                if (result.getLength2() > 0) {
                    queue.processData(mid, false, entry.isFin(), result.getBlockStore(), result.getBlockAddr2(), result.getOffset2(), result.getLength2());
                }
                if (((count = ++this._messageCount) & 0x3FFFL) != 0L) break;
                this.updateCheckpoint(this._lastAddress);
                break;
            }
            case 4: {
                queue.subscribe(entry.getSubscriber());
                break;
            }
            case 5: {
                queue.unsubscribe(entry.getSubscriber());
                break;
            }
            case 6: {
                queue.ack(entry.getMid());
                queue.ack(entry.getSubscriber());
                break;
            }
            case 3: {
                if (!entry.getSubscriber().onFlow(entry.getDeliveryCount(), entry.getCredit())) break;
                queue.deliver();
                break;
            }
            case 1: {
                break;
            }
            default: {
                System.out.println("UNKNOWN: " + Integer.toHexString(op));
                log.warning("Unknown code:  " + Integer.toHexString(op));
            }
        }
    }

    private int decodeOp(long code) {
        return (int)(code & 0x1FL);
    }

    static long encode(int op, boolean isDurable, int priority, long expireTime) {
        long code = op;
        if (isDurable) {
            code |= 0x200000000000L;
        }
        long priorityBits = priority < 0 ? 4L : (15L < (long)priority ? 15L : (long)priority & 0xFL);
        code |= priorityBits << 40;
        long expireBits = expireTime + 63L >> 6;
        return code |= (expireBits &= 0x3FFFFFFL) << 6;
    }

    private void updateCheckpoint(long tailAddress) {
        NautilusCheckpointPublisher checkpointPub;
        long checkpointAddress = tailAddress;
        for (NautilusQueue queue : this._queueMap.values()) {
            if (checkpointAddress <= 0L) continue;
            checkpointAddress = queue.updateCheckpoint(checkpointAddress);
        }
        if (checkpointAddress > 0L && checkpointAddress != this._lastCheckpoint && (checkpointPub = this._checkpointPub) != null && checkpointPub.checkpoint(checkpointAddress)) {
            this._lastCheckpoint = checkpointAddress;
        }
    }

    private NautilusQueue getQueue(Long id) {
        NautilusQueue queue = this._queueMap.get(id);
        if (queue == null) {
            queue = new NautilusQueue(id);
            this._queueMap.put(id, queue);
        }
        return queue;
    }

    public void onProcessComplete() throws Exception {
    }

    public String toString() {
        return ((Object)((Object)this)).getClass().getSimpleName() + "[]";
    }
}

