package com.caucho.hemp.broker;

import com.caucho.bam.ActorError;
import com.caucho.bam.ActorStream;
import com.caucho.hemp.packet.Message;
import com.caucho.hemp.packet.MessageError;
import com.caucho.hemp.packet.Packet;
import com.caucho.hemp.packet.PacketQueue;
import com.caucho.hemp.packet.QueryError;
import com.caucho.hemp.packet.QueryGet;
import com.caucho.hemp.packet.QueryResult;
import com.caucho.hemp.packet.QuerySet;
import com.caucho.lifecycle.Lifecycle;
import com.caucho.loader.Environment;
import com.caucho.util.Alarm;
import com.caucho.util.L10N;
import java.io.Closeable;
import java.io.Serializable;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/caucho/hemp/broker/HempMemoryQueue.class */
public class HempMemoryQueue implements ActorStream, Closeable {
    private static final L10N L = new L10N(HempMemoryQueue.class);
    private static final Logger log = Logger.getLogger(HempMemoryQueue.class.getName());
    private final String _name;
    private final ActorStream _linkStream;
    private final ActorStream _actorStream;
    private final QueueWorker[] _workers;
    private final PacketQueue _queue;
    private final Lifecycle _lifecycle = new Lifecycle();

    public HempMemoryQueue(ActorStream actorStream, ActorStream actorStream2, int i) {
        if (actorStream2 == null) {
            throw new NullPointerException();
        }
        if (actorStream == null) {
            throw new NullPointerException();
        }
        this._linkStream = actorStream2;
        this._actorStream = actorStream;
        if (this._actorStream.getJid() == null) {
            this._name = this._actorStream.getClass().getSimpleName();
        } else {
            this._name = this._actorStream.getJid();
        }
        this._workers = new QueueWorker[i];
        for (int i2 = 0; i2 < i; i2++) {
            this._workers[i2] = new QueueWorker(this);
        }
        this._queue = new PacketQueue(this._name, -1, 1024, -1L);
        this._lifecycle.toActive();
        Environment.addCloseListener(this);
    }

    @Override // com.caucho.bam.ActorStream
    public String getJid() {
        return this._actorStream.getJid();
    }

    public boolean isPacketAvailable() {
        return !this._queue.isEmpty();
    }

    public ActorStream getLinkStream() {
        return this._linkStream;
    }

    protected ActorStream getActorStream() {
        return this._actorStream;
    }

    @Override // com.caucho.bam.ActorStream
    public void message(String str, String str2, Serializable serializable) {
        enqueue(new Message(str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void messageError(String str, String str2, Serializable serializable, ActorError actorError) {
        enqueue(new MessageError(str, str2, serializable, actorError));
    }

    @Override // com.caucho.bam.ActorStream
    public void queryGet(long j, String str, String str2, Serializable serializable) {
        enqueue(new QueryGet(j, str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void querySet(long j, String str, String str2, Serializable serializable) {
        enqueue(new QuerySet(j, str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void queryResult(long j, String str, String str2, Serializable serializable) {
        enqueue(new QueryResult(j, str, str2, serializable));
    }

    @Override // com.caucho.bam.ActorStream
    public void queryError(long j, String str, String str2, Serializable serializable, ActorError actorError) {
        enqueue(new QueryError(j, str, str2, serializable, actorError));
    }

    protected final void enqueue(Packet packet) {
        if (!this._lifecycle.isActive()) {
            throw new IllegalStateException(L.l("{0} cannot accept packets because it's no longer active", this));
        }
        if (log.isLoggable(Level.FINEST)) {
            log.finest(this + " enqueue(" + this._queue.getSize() + ") " + packet);
        }
        this._queue.enqueue(packet);
        wakeConsumer(packet);
    }

    private void wakeConsumer(Packet packet) {
        for (QueueWorker queueWorker : this._workers) {
            boolean isRunning = queueWorker.isRunning();
            queueWorker.wake();
            if (!isRunning) {
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatch(Packet packet) {
        packet.dispatch(getActorStream(), this._linkStream);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Packet dequeue() {
        return this._queue.dequeue();
    }

    @Override // com.caucho.bam.ActorStream
    public void close() {
        this._lifecycle.toStop();
        for (QueueWorker queueWorker : this._workers) {
            queueWorker.wake();
        }
        long currentTimeActual = Alarm.getCurrentTimeActual() + 2000;
        while (!this._queue.isEmpty() && Alarm.getCurrentTimeActual() < currentTimeActual) {
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
            }
        }
        for (QueueWorker queueWorker2 : this._workers) {
            queueWorker2.destroy();
        }
        this._lifecycle.toDestroy();
    }

    @Override // com.caucho.bam.ActorStream
    public boolean isClosed() {
        return this._lifecycle.isDestroying() || this._linkStream.isClosed();
    }

    public String toString() {
        return getClass().getSimpleName() + "[" + this._name + "]";
    }
}
