/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.amp.impl;

import com.caucho.amp.actor.AmpActorContext;
import com.caucho.amp.actor.AmpActorRef;
import com.caucho.amp.impl.AbstractActorWorker;
import com.caucho.amp.mailbox.AbstractAmpMailbox;
import com.caucho.amp.stream.AmpEncoder;
import com.caucho.amp.stream.AmpError;
import com.caucho.amp.stream.AmpStream;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;

public class QueueMailbox
extends AbstractAmpMailbox {
    private final LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue();
    private final AmpActorContext _actor;
    private final QueueWorker _worker;

    public QueueMailbox(AmpActorContext actor, Executor executor) {
        this._actor = actor;
        ClassLoader loader = Thread.currentThread().getContextClassLoader();
        this._worker = new QueueWorker(actor.getAddress(), executor, loader);
    }

    @Override
    public AmpStream getActorStream() {
        return this._actor.getStream();
    }

    @Override
    public AmpActorContext getActorContext() {
        return this._actor;
    }

    @Override
    public void send(AmpActorRef to, AmpActorRef from, AmpEncoder encoder, String methodName, Object ... args) {
        this._queue.offer(new SendMessage(to, from, encoder, methodName, args));
        this._worker.wake();
    }

    @Override
    public void query(long id, AmpActorRef to, AmpActorRef from, AmpEncoder encoder, String methodName, Object ... args) {
        this._queue.offer(new QueryMessage(id, to, from, encoder, methodName, args));
        this._worker.wake();
    }

    @Override
    public void queryResult(long id, AmpActorRef to, AmpActorRef from, AmpEncoder encoder, Object result) {
        this._queue.offer(new QueryReply(id, to, from, encoder, result));
        this._worker.wake();
    }

    @Override
    public void queryError(long id, AmpActorRef to, AmpActorRef from, AmpEncoder encoder, AmpError error) {
        this._queue.offer(new QueryError(id, to, from, encoder, error));
    }

    @Override
    public void error(AmpActorRef to, AmpActorRef from, AmpEncoder encoder, AmpError error) {
        this._queue.offer(new ErrorMessage(to, from, encoder, error));
    }

    @Override
    public void close() {
    }

    public String toString() {
        return this.getClass().getSimpleName() + "[" + this._actor + "]";
    }

    static class ErrorMessage
    extends Message {
        private final AmpActorRef _to;
        private final AmpActorRef _from;
        private final AmpEncoder _encoder;
        private final AmpError _error;

        ErrorMessage(AmpActorRef to, AmpActorRef from, AmpEncoder encoder, AmpError error) {
            this._to = to;
            this._from = from;
            this._encoder = encoder;
            this._error = error;
        }

        @Override
        void invoke(AmpStream stream) {
            stream.error(this._to, this._from, this._encoder, this._error);
        }
    }

    static class QueryError
    extends Message {
        private final long _id;
        private final AmpActorRef _to;
        private final AmpActorRef _from;
        private final AmpEncoder _encoder;
        private final AmpError _error;

        QueryError(long id, AmpActorRef to, AmpActorRef from, AmpEncoder encoder, AmpError error) {
            this._id = id;
            this._to = to;
            this._from = from;
            this._encoder = encoder;
            this._error = error;
        }

        @Override
        void invoke(AmpStream stream) {
            stream.queryResult(this._id, this._to, this._from, this._encoder, this._error);
        }
    }

    static class QueryReply
    extends Message {
        private final long _id;
        private final AmpActorRef _to;
        private final AmpActorRef _from;
        private final AmpEncoder _encoder;
        private final Object _result;

        QueryReply(long id, AmpActorRef to, AmpActorRef from, AmpEncoder encoder, Object result) {
            this._id = id;
            this._to = to;
            this._from = from;
            this._encoder = encoder;
            this._result = result;
        }

        @Override
        void invoke(AmpStream stream) {
            stream.queryResult(this._id, this._to, this._from, this._encoder, this._result);
        }
    }

    static class QueryMessage
    extends Message {
        private final long _id;
        private final AmpActorRef _to;
        private final AmpActorRef _from;
        private final AmpEncoder _encoder;
        private final String _methodName;
        private final Object[] _args;

        QueryMessage(long id, AmpActorRef to, AmpActorRef from, AmpEncoder encoder, String methodName, Object ... args) {
            this._id = id;
            this._to = to;
            this._from = from;
            this._encoder = encoder;
            this._methodName = methodName;
            this._args = args;
        }

        @Override
        void invoke(AmpStream stream) {
            stream.query(this._id, this._to, this._from, this._encoder, this._methodName, this._args);
        }
    }

    static class SendMessage
    extends Message {
        private final AmpActorRef _to;
        private final AmpActorRef _from;
        private final AmpEncoder _encoder;
        private final String _methodName;
        private final Object[] _args;

        SendMessage(AmpActorRef to, AmpActorRef from, AmpEncoder encoder, String methodName, Object ... args) {
            this._to = to;
            this._from = from;
            this._encoder = encoder;
            this._methodName = methodName;
            this._args = args;
        }

        @Override
        void invoke(AmpStream stream) {
            stream.send(this._to, this._from, this._encoder, this._methodName, this._args);
        }
    }

    static class Message {
        Message() {
        }

        void invoke(AmpStream stream) {
        }
    }

    class QueueWorker
    extends AbstractActorWorker {
        QueueWorker(String name, Executor executor, ClassLoader loader) {
            super(name, executor, loader);
        }

        private void processQueue() {
            Message msg;
            AmpStream stream = QueueMailbox.this.getActorStream();
            while ((msg = (Message)QueueMailbox.this._queue.poll()) != null) {
                msg.invoke(stream);
            }
        }

        @Override
        public void runTask() {
            AmpActorContext prev = QueueMailbox.this._actor.beginCurrentActor();
            try {
                this.processQueue();
            }
            finally {
                QueueMailbox.this._actor.endCurrentActor(prev);
            }
        }
    }
}

