/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.message.local;

import com.caucho.message.DistributionMode;
import com.caucho.message.MessageDecoder;
import com.caucho.message.broker.BrokerReceiver;
import com.caucho.message.broker.EnvironmentMessageBroker;
import com.caucho.message.broker.ReceiverMessageHandler;
import com.caucho.message.common.AbstractMessageReceiver;
import com.caucho.message.local.LocalReceiverFactory;
import com.caucho.util.L10N;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class LocalReceiver<T>
extends AbstractMessageReceiver<T> {
    private static final L10N L = new L10N(LocalReceiver.class);
    private static final Logger log = Logger.getLogger(LocalReceiver.class.getName());
    private String _address;
    private int _prefetch;
    private int _linkCredit;
    private MessageDecoder<T> _decoder;
    private LinkedBlockingQueue<QueueEntry> _queue = new LinkedBlockingQueue();
    private BrokerReceiver _sub;

    LocalReceiver(LocalReceiverFactory factory) {
        this._address = factory.getAddress();
        this._prefetch = factory.getPrefetch();
        this._decoder = factory.getMessageDecoder();
        EnvironmentMessageBroker broker = EnvironmentMessageBroker.getCurrent();
        LocalMessageHandler handler = new LocalMessageHandler();
        DistributionMode distMode = factory.getDistributionMode();
        Map<String, Object> nodeProperties = null;
        this._sub = broker.createReceiver(this._address, distMode, nodeProperties, handler);
        if (this._sub == null) {
            throw new IllegalArgumentException(L.l("'{0}' is an unknown queue", (Object)this._address));
        }
        this._linkCredit = this._prefetch;
        if (this._prefetch > 0) {
            this._sub.flow(-1L, this._prefetch);
        }
    }

    public String getAddress() {
        return this._address;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected T pollMicros(long timeoutMicros) {
        boolean isFlow = false;
        try {
            T value;
            QueueEntry entry = this._queue.poll(timeoutMicros, TimeUnit.MICROSECONDS);
            if (entry == null) {
                T t = null;
                return t;
            }
            isFlow = true;
            InputStream is = entry.getInputStream();
            T t = value = this._decoder.decode(is);
            return t;
        }
        catch (IOException e) {
            log.log(Level.FINE, e.toString(), e);
            T t = null;
            return t;
        }
        catch (InterruptedException e) {
            log.log(Level.FINE, e.toString(), e);
            T t = null;
            return t;
        }
        finally {
            if (isFlow) {
                --this._linkCredit;
                int delta = this._prefetch - this._linkCredit;
                if (this._linkCredit <= this._prefetch >> 2) {
                    this._sub.flow(-1L, this._prefetch);
                    this._linkCredit = this._prefetch;
                }
            }
        }
    }

    @Override
    public String toString() {
        return this.getClass().getSimpleName() + "[" + this.getAddress() + "]";
    }

    static class QueueEntry {
        private long _mid;
        private InputStream _is;

        QueueEntry(long mid, InputStream is) {
            this._mid = mid;
            this._is = is;
        }

        public InputStream getInputStream() {
            return this._is;
        }
    }

    class LocalMessageHandler
    implements ReceiverMessageHandler {
        LocalMessageHandler() {
        }

        @Override
        public void onMessage(long messageId, InputStream bodyIs, long contentLength) throws IOException {
            LocalReceiver.this._queue.add(new QueueEntry(messageId, bodyIs));
        }
    }
}

