/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.MessageRejectedException;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.query.MorphiumIterator;
import de.caluga.morphium.query.Query;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Messaging
extends Thread
implements ShutdownListener {
    private static Logger log = LoggerFactory.getLogger(Messaging.class);
    private Morphium morphium;
    private boolean running;
    private int pause = 5000;
    private String id;
    private boolean autoAnswer = false;
    private String hostname;
    private boolean processMultiple = false;
    private List<MessageListener> listeners;
    private Map<String, Long> pauseMessages = new ConcurrentHashMap<String, Long>();
    private Map<String, List<MessageListener>> listenerByName;
    private String queueName;
    private ThreadPoolExecutor threadPool;
    private ScheduledThreadPoolExecutor decouplePool;
    private boolean multithreadded = false;
    private int windowSize = 1000;
    private boolean useChangeStream = false;
    private ChangeStreamMonitor changeStreamMonitor;
    private Map<MorphiumId, Msg> waitingForAnswers = new ConcurrentHashMap<MorphiumId, Msg>();
    private List<MorphiumId> processing = new Vector<MorphiumId>();

    public Messaging(Morphium m, int pause, boolean processMultiple) {
        this(m, null, pause, processMultiple);
    }

    public Messaging(Morphium m) {
        this(m, null, 500, false, false, 100);
    }

    public Messaging(Morphium m, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, null, pause, processMultiple, multithreadded, windowSize);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple) {
        this(m, queueName, pause, processMultiple, false, 1000);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, queueName, pause, processMultiple, multithreadded, windowSize, m.isReplicaSet());
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize, boolean useChangeStream) {
        this.multithreadded = multithreadded;
        this.windowSize = windowSize;
        this.morphium = m;
        this.useChangeStream = useChangeStream;
        if (multithreadded) {
            LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(){

                @Override
                public boolean offer(Runnable e) {
                    int maximumPoolSize;
                    int poolSize = Messaging.this.threadPool.getPoolSize();
                    if (poolSize >= (maximumPoolSize = Messaging.this.threadPool.getMaximumPoolSize()) || poolSize > Messaging.this.threadPool.getActiveCount()) {
                        return super.offer(e);
                    }
                    return false;
                }
            };
            this.threadPool = new ThreadPoolExecutor(this.morphium.getConfig().getThreadPoolMessagingCoreSize(), this.morphium.getConfig().getThreadPoolMessagingMaxSize(), this.morphium.getConfig().getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)queue);
            this.threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler(){

                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            });
            this.threadPool.setThreadFactory(new ThreadFactory(){
                private AtomicInteger num = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    Thread ret = new Thread(r, "messaging " + this.num);
                    this.num.set(this.num.get() + 1);
                    ret.setDaemon(true);
                    return ret;
                }
            });
        }
        this.decouplePool = new ScheduledThreadPoolExecutor(1);
        this.decouplePool.setThreadFactory(new ThreadFactory(){
            private AtomicInteger num = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "decouple_thr_" + this.num);
                this.num.set(this.num.get() + 1);
                ret.setDaemon(true);
                return ret;
            }
        });
        this.morphium.addShutdownListener(this);
        this.queueName = queueName;
        this.running = true;
        this.pause = pause;
        this.processMultiple = processMultiple;
        this.id = UUID.randomUUID().toString();
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException unknownHostException) {
                // empty catch block
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        m.ensureIndicesFor(Msg.class, this.getCollectionName());
        this.listeners = new CopyOnWriteArrayList<MessageListener>();
        this.listenerByName = new HashMap<String, List<MessageListener>>();
    }

    public long getMessageCount() {
        return this.morphium.createQueryFor(Msg.class).setCollectionName(this.getCollectionName()).countAll();
    }

    public void removeMessage(Msg m) {
        this.morphium.delete(m, this.getCollectionName());
    }

    public List<Msg> findMessages(Query<Msg> q) {
        try {
            q = q.clone();
        }
        catch (CloneNotSupportedException cloneNotSupportedException) {
            // empty catch block
        }
        q.setCollectionName(this.getCollectionName());
        return q.asList();
    }

    @Override
    public void run() {
        this.setName("Msg " + this.id);
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " started");
        }
        if (this.useChangeStream) {
            log.debug("Before running the changestream monitor - check of already existing messages");
            try {
                this.findAndProcessMessages(true);
                if (this.multithreadded) {
                    while (this.threadPool.getActiveCount() > 0) {
                        Thread.yield();
                    }
                }
            }
            catch (Exception e) {
                log.error("Error processing existing messages in queue", (Throwable)e);
            }
            log.debug("init Messaging  using changestream monitor");
            this.changeStreamMonitor = new ChangeStreamMonitor(this.morphium, this.getCollectionName(), true);
            this.changeStreamMonitor.addListener(evt -> {
                Msg obj;
                if (evt == null || evt.getOperationType() == null) {
                    return this.running;
                }
                if (evt.getOperationType().equals("insert")) {
                    Msg obj2 = this.morphium.getMapper().deserialize(Msg.class, evt.getFullDocument());
                    if (obj2.getSender().equals(this.id) || obj2.getProcessedBy() != null && obj2.getProcessedBy().contains(this.id) || obj2.getRecipient() != null && !obj2.getRecipient().equals(this.id)) {
                        return this.running;
                    }
                    if (this.pauseMessages.containsKey(obj2.getName())) {
                        log.info("Not processing message - processing paused for " + obj2.getName());
                        return this.running;
                    }
                    if (!(!obj2.isExclusive() || obj2.getLockedBy() != null || obj2.getRecipient() != null && !obj2.getRecipient().equals(this.id) || obj2.getProcessedBy() != null && obj2.getProcessedBy().contains(this.id))) {
                        log.debug("trying to lock exclusive message");
                        this.lockAndProcess(obj2);
                    } else if (!obj2.isExclusive() || obj2.getRecipient() != null && obj2.getRecipient().equals(this.id)) {
                        if (this.processing.contains(obj2.getMsgId())) {
                            return this.running;
                        }
                        ArrayList<Msg> lst = new ArrayList<Msg>();
                        lst.add(obj2);
                        try {
                            this.processMessages(lst, false);
                        }
                        catch (Exception e) {
                            log.error("Error during message processing ", (Throwable)e);
                        }
                    } else {
                        log.debug("Message is not for me");
                    }
                } else if (evt.getOperationType().equals("update") && (obj = this.morphium.findById(Msg.class, new MorphiumId(evt.getFullDocument().get("_id").toString()))) != null && obj.isExclusive() && obj.getLockedBy() == null && (obj.getRecipient() == null || obj.getRecipient().equals(this.id))) {
                    log.debug("Update of msg - trying to lock");
                    this.lockAndProcess(obj);
                }
                return this.running;
            });
            this.changeStreamMonitor.start();
        } else {
            while (this.running) {
                try {
                    this.findAndProcessMessages(this.processMultiple);
                }
                catch (Throwable e) {
                    log.error("Unhandled exception " + e.getMessage(), e);
                }
                finally {
                    try {
                        Messaging.sleep(this.pause);
                    }
                    catch (InterruptedException e) {}
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("Messaging " + this.id + " stopped!");
            }
            if (!this.running) {
                this.listeners.clear();
                this.listenerByName.clear();
            }
        }
    }

    public void pauseProcessingOfMessagesNamed(String name) {
        this.pauseMessages.putIfAbsent(name, System.currentTimeMillis());
    }

    public Long unpauseProcessingOfMessagesNamed(String name) {
        this.findAndProcessPendingMessages(name, true);
        Long ret = this.pauseMessages.remove(name);
        if (ret != null) {
            ret = System.currentTimeMillis() - ret;
        }
        return ret;
    }

    public void findAndProcessPendingMessages(String name, boolean forceListeners) {
        MorphiumIterator<Msg> messages = this.findMessages(name, true);
        this.processMessages(messages, forceListeners);
    }

    private MorphiumIterator<Msg> findMessages(String name, boolean multiple) {
        HashMap<String, Object> values = new HashMap<String, Object>();
        Query<Msg> q = this.morphium.createQueryFor(Msg.class);
        q.setCollectionName(this.getCollectionName());
        Query<Msg> or1 = q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).eq(null).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(null);
        if (name != null) {
            or1.f((Enum)Msg.Fields.name).eq(name);
        }
        Query<Msg> or2 = q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).eq(null).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(this.id);
        if (name != null) {
            or2.f((Enum)Msg.Fields.name).eq(name);
        }
        q.f("_id").nin(this.processing);
        q.or(or1, or2);
        q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        values.put("locked_by", this.id);
        values.put("locked", System.currentTimeMillis());
        this.morphium.set(q, values, false, true);
        q = q.q();
        q.or(q.q().f((Enum)Msg.Fields.lockedBy).eq(this.id), q.q().f((Enum)Msg.Fields.lockedBy).eq("ALL").f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(this.id), q.q().f((Enum)Msg.Fields.lockedBy).eq("ALL").f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(null));
        q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        MorphiumIterator<Msg> it = q.asIterable(this.windowSize);
        it.setMultithreaddedAccess(this.multithreadded);
        return it;
    }

    private void findAndProcessMessages(boolean multiple) {
        MorphiumIterator<Msg> messages = this.findMessages(null, multiple);
        this.processMessages(messages, false);
    }

    private void lockAndProcess(Msg obj) {
        Query<Msg> q = this.morphium.createQueryFor(Msg.class);
        q.setCollectionName(this.getCollectionName());
        q.f("_id").eq(obj.getMsgId());
        q.f((Enum)Msg.Fields.processedBy).ne(this.id);
        HashMap<String, Object> values = new HashMap<String, Object>();
        values.put("locked_by", this.id);
        values.put("locked", System.currentTimeMillis());
        this.morphium.set(q, values, false, false);
        try {
            Thread.sleep(10L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        obj = this.morphium.reread(obj, this.getCollectionName());
        if (obj != null && obj.getLockedBy() != null && obj.getLockedBy().equals(this.id)) {
            ArrayList<Msg> lst = new ArrayList<Msg>();
            lst.add(obj);
            try {
                this.processMessages(lst, false);
            }
            catch (Exception e) {
                log.error("Error during message processing ", (Throwable)e);
            }
        }
    }

    private void processMessages(Iterable<Msg> messages, boolean forceIfPaused) {
        if (this.listeners.isEmpty() && this.listenerByName.isEmpty()) {
            return;
        }
        ArrayList toRemove = new ArrayList();
        for (Msg m : messages) {
            if (m == null || this.processing.contains(m.getMsgId())) continue;
            this.processing.add(m.getMsgId());
            Runnable r = () -> {
                if (m.getProcessedBy() != null && m.getProcessedBy().contains(this.id)) {
                    return;
                }
                Msg msg = this.morphium.reread(m, this.getCollectionName());
                if (msg == null) {
                    this.processing.remove(m.getMsgId());
                    return;
                }
                if (!forceIfPaused && this.pauseMessages.containsKey(msg.getName())) {
                    log.info("Not processing msg - paused " + msg.getName());
                    this.processing.remove(m.getMsgId());
                    return;
                }
                if (msg.getProcessedBy() != null && msg.getProcessedBy().contains(this.id)) {
                    this.processing.remove(m.getMsgId());
                    return;
                }
                if (!msg.getLockedBy().equals(this.id) && !msg.getLockedBy().equals("ALL")) {
                    this.processing.remove(m.getMsgId());
                    return;
                }
                if (msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
                    log.debug("Found outdated message - deleting it!");
                    this.morphium.delete(msg, this.getCollectionName());
                    this.processing.remove(m.getMsgId());
                    return;
                }
                try {
                    Msg answer;
                    for (MessageListener l : this.listeners) {
                        answer = l.onMessage(this, msg);
                        if (this.autoAnswer && answer == null) {
                            answer = new Msg(msg.getName(), "received", "");
                        }
                        if (answer == null) continue;
                        msg.sendAnswer(this, answer);
                    }
                    if (this.listenerByName.get(msg.getName()) != null) {
                        for (MessageListener l : this.listenerByName.get(msg.getName())) {
                            answer = l.onMessage(this, msg);
                            if (this.autoAnswer && answer == null) {
                                answer = new Msg(msg.getName(), "received", "");
                            }
                            if (answer == null) continue;
                            msg.sendAnswer(this, answer);
                        }
                    }
                }
                catch (MessageRejectedException mre) {
                    log.error("Message rejected by listener: " + mre.getMessage());
                    if (mre.isSendAnswer()) {
                        Msg answer = new Msg(msg.getName(), "message rejected by listener", mre.getMessage());
                        msg.sendAnswer(this, answer);
                    }
                    if (mre.isContinueProcessing()) {
                        this.updateProcessedByAndReleaseLock(msg);
                        this.processing.remove(m.getMsgId());
                        return;
                    }
                }
                catch (Throwable t) {
                    log.error("Processing failed", t);
                }
                if (msg.getLockedBy().equals("ALL")) {
                    this.updateProcessedByAndReleaseLock(msg);
                } else {
                    toRemove.add(msg);
                }
                Runnable rb = () -> this.processing.remove(m.getMsgId());
                while (true) {
                    try {
                        if (this.decouplePool.isTerminated() || this.decouplePool.isTerminating() || this.decouplePool.isShutdown()) break;
                        this.decouplePool.schedule(rb, m.getTtl(), TimeUnit.MILLISECONDS);
                    }
                    catch (RejectedExecutionException ex) {
                        try {
                            Thread.sleep(this.pause);
                        }
                        catch (InterruptedException interruptedException) {}
                        continue;
                    }
                    break;
                }
            };
            this.queueOrRun(r);
        }
        this.morphium.delete(toRemove, this.getCollectionName());
        while (this.morphium.getWriteBufferCount() > 0) {
            Thread.yield();
        }
    }

    private void updateProcessedByAndReleaseLock(Msg msg) {
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class);
        idq.setCollectionName(this.getCollectionName());
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        if (msg.getLockedBy().equals(this.id)) {
            this.morphium.set(idq, (Enum)Msg.Fields.lockedBy, null);
        }
        this.morphium.push(idq, Msg.Fields.processedBy, (Object)this.id);
    }

    private void queueOrRun(Runnable r) {
        if (this.multithreadded) {
            boolean queued = false;
            while (!queued) {
                try {
                    this.threadPool.execute(r);
                    queued = true;
                }
                catch (Throwable throwable) {}
            }
            while (this.threadPool.getActiveCount() > this.windowSize) {
                Thread.yield();
            }
        } else {
            r.run();
        }
    }

    public String getCollectionName() {
        if (this.queueName == null || this.queueName.isEmpty()) {
            return "msg";
        }
        return "mmsg_" + this.queueName;
    }

    public void addListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
            c.put(n, new ArrayList());
            this.listenerByName = c;
        }
        this.listenerByName.get(n).add(l);
    }

    public void removeListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            return;
        }
        HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
        ((List)c.get(n)).remove(l);
        this.listenerByName = c;
    }

    public String getSenderId() {
        return this.id;
    }

    public void setSenderId(String id) {
        this.id = id;
    }

    public int getPause() {
        return this.pause;
    }

    public void setPause(int pause) {
        this.pause = pause;
    }

    public boolean isRunning() {
        if (this.useChangeStream) {
            return this.changeStreamMonitor != null && this.changeStreamMonitor.isRunning();
        }
        return this.running;
    }

    @Deprecated
    public void setRunning(boolean running) {
        if (!running && this.changeStreamMonitor != null) {
            this.changeStreamMonitor.stop();
        }
        this.running = running;
    }

    public void terminate() {
        int sz;
        this.running = false;
        if (this.decouplePool != null) {
            sz = this.decouplePool.shutdownNow().size();
            log.debug("Shutting down with " + sz + " runnables still scheduled");
        }
        if (this.threadPool != null) {
            sz = this.threadPool.shutdownNow().size();
            log.debug("Shutting down with " + sz + " runnables still pending in pool");
        }
        if (this.changeStreamMonitor != null) {
            this.changeStreamMonitor.stop();
        }
        this.sendMessageToSelf(new Msg("info", "going down", "now"));
        if (this.isAlive()) {
            this.interrupt();
        }
        if (this.isAlive()) {
            this.stop();
        }
    }

    public void addMessageListener(MessageListener l) {
        this.listeners.add(l);
    }

    public void removeMessageListener(MessageListener l) {
        this.listeners.remove(l);
    }

    public void queueMessage(Msg m) {
        this.storeMsg(m, true);
    }

    @Override
    public synchronized void start() {
        super.start();
        if (this.useChangeStream) {
            try {
                Thread.sleep(500L);
            }
            catch (Exception e) {
                log.error("error:" + e.getMessage());
            }
        }
    }

    public void storeMessage(Msg m) {
        this.storeMsg(m, false);
    }

    public long getNumberOfMessages() {
        Query<Msg> q = this.morphium.createQueryFor(Msg.class);
        q.setCollectionName(this.getCollectionName());
        return q.countAll();
    }

    private void storeMsg(Msg m, boolean async) {
        AsyncOperationCallback cb = null;
        if (async) {
            cb = new AsyncOperationCallback(){

                public void onOperationSucceeded(AsyncOperationType type, Query q, long duration, List result, Object entity, Object ... param) {
                }

                public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Object entity, Object ... param) {
                    log.error("Error storing msg", t);
                }
            };
        }
        m.setSender(this.id);
        m.addProcessedId(this.id);
        m.setSenderHost(this.hostname);
        if (m.getTo() != null && !m.getTo().isEmpty()) {
            for (String recipient : m.getTo()) {
                try {
                    Msg msg = (Msg)m.getClass().newInstance();
                    List<Field> flds = this.morphium.getARHelper().getAllFields(m.getClass());
                    for (Field f : flds) {
                        f.setAccessible(true);
                        f.set(msg, f.get(m));
                    }
                    msg.setMsgId(null);
                    msg.setRecipient(recipient);
                    this.morphium.storeNoCache(msg, this.getCollectionName(), cb);
                }
                catch (Exception e) {
                    throw new RuntimeException("Sending of answer failed", e);
                }
            }
        } else {
            this.morphium.storeNoCache(m, this.getCollectionName(), cb);
        }
    }

    public void sendMessageToSelf(Msg m) {
        this.sendMessageToSelf(m, false);
    }

    public void queueMessagetoSelf(Msg m) {
        this.sendMessageToSelf(m, true);
    }

    private void sendMessageToSelf(Msg m, boolean async) {
        AsyncOperationCallback cb = null;
        if (async) {
            cb = new AsyncOperationCallback(){

                public void onOperationSucceeded(AsyncOperationType type, Query q, long duration, List result, Object entity, Object ... param) {
                }

                public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Object entity, Object ... param) {
                    log.error("Error storing msg", t);
                }
            };
        }
        m.setSender("self");
        m.setRecipient(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.storeNoCache(m);
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public void setAutoAnswer(boolean autoAnswer) {
        this.autoAnswer = autoAnswer;
    }

    @Override
    public void onShutdown(Morphium m) {
        try {
            this.running = false;
            if (this.threadPool != null) {
                this.threadPool.shutdownNow();
                this.threadPool = null;
            }
            if (this.changeStreamMonitor != null) {
                this.changeStreamMonitor.stop();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public Msg sendAndAwaitFirstAnswer(Msg theMEssage, long timeoutInMs) {
        this.addMessageListener(this.getAnswerListener(theMEssage));
        this.storeMessage(theMEssage);
        long start = System.currentTimeMillis();
        while (!this.waitingForAnswers.containsKey(theMEssage.getMsgId())) {
            if (System.currentTimeMillis() - start > timeoutInMs) {
                log.error("Did not receive Answer in timee");
                throw new RuntimeException("Did not receive answer in time!");
            }
            Thread.yield();
        }
        return this.waitingForAnswers.remove(theMEssage.getMsgId());
    }

    public List<Msg> sendAndAwaitAnswers(Msg theMessage, int numberOfAnswers, long timeout) {
        ArrayList<Msg> answers = new ArrayList<Msg>();
        this.addMessageListener(this.getAnswerListener(theMessage));
        this.storeMessage(theMessage);
        long start = System.currentTimeMillis();
        while (true) {
            if (this.waitingForAnswers.get(theMessage.getMsgId()) != null) {
                answers.add(this.waitingForAnswers.remove(theMessage.getMsgId()));
            }
            if (numberOfAnswers >= 0 && answers.size() >= numberOfAnswers || System.currentTimeMillis() - start > timeout) break;
            Thread.yield();
        }
        return answers;
    }

    private MessageListener getAnswerListener(final Msg theMessage) {
        return new MessageListener(){

            public Msg onMessage(Messaging msg, Msg m) {
                if (m.getInAnswerTo() != null && m.getInAnswerTo().equals(theMessage.getMsgId())) {
                    Messaging.this.waitingForAnswers.put(theMessage.getMsgId(), m);
                    Messaging.this.removeMessageListener(this);
                }
                return null;
            }
        };
    }
}

