/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.MorphiumSingleton;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.messaging.MsgType;
import de.caluga.morphium.query.Query;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

public class Messaging
extends Thread {
    private static Logger log = Logger.getLogger(Messaging.class);
    private Morphium morphium;
    private boolean running;
    private int pause = 5000;
    private String id;
    private boolean autoAnswer = false;
    private boolean processMultiple = false;
    private List<MessageListener> listeners;
    private Map<String, List<MessageListener>> listenerByName;
    private volatile Vector<Msg> writeBuffer = new Vector();
    private final ScheduledThreadPoolExecutor writer;

    public Messaging(Morphium m, int pause, boolean processMultiple) {
        this.morphium = m;
        this.running = true;
        this.pause = pause;
        this.processMultiple = processMultiple;
        this.id = UUID.randomUUID().toString();
        try {
            m.ensureIndex(Msg.class, Msg.Fields.lockedBy, Msg.Fields.timestamp);
            m.ensureIndex(Msg.class, Msg.Fields.lockedBy, Msg.Fields.processedBy);
            m.ensureIndex(Msg.class, Msg.Fields.timestamp);
        }
        catch (Exception e) {
            log.error((Object)"Could not ensure indices", (Throwable)e);
        }
        this.listeners = new Vector<MessageListener>();
        this.listenerByName = new Hashtable<String, List<MessageListener>>();
        this.writer = new ScheduledThreadPoolExecutor(1);
        this.writer.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                Vector wb = Messaging.this.writeBuffer;
                Messaging.this.writeBuffer = new Vector();
                Messaging.this.morphium.storeList(wb);
            }
        }, 500L, pause, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Messaging " + this.id + " started"));
        }
        HashMap<String, Object> values = new HashMap<String, Object>();
        while (this.running) {
            try {
                Query<Msg> q = this.morphium.createQueryFor(Msg.class);
                q = q.where("this.ttl<" + System.currentTimeMillis() + "-this.timestamp");
                if (log.isDebugEnabled() && q.countAll() > 0L) {
                    log.info((Object)("Deleting outdate messages: " + q.countAll()));
                }
                this.morphium.delete(q);
                q = q.q();
                q.or(q.q().f(Msg.Fields.sender).ne(this.id).f(Msg.Fields.lockedBy).eq(null).f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.to).eq(null), q.q().f(Msg.Fields.sender).ne(this.id).f(Msg.Fields.lockedBy).eq(null).f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.to).eq(this.id));
                values.put("locked_by", this.id);
                values.put("locked", System.currentTimeMillis());
                this.morphium.set(q, values, false, this.processMultiple);
                q = q.q();
                q.or(q.q().f(Msg.Fields.lockedBy).eq(this.id), q.q().f(Msg.Fields.lockedBy).eq("ALL").f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.to).eq(this.id), q.q().f(Msg.Fields.lockedBy).eq("ALL").f(Msg.Fields.processedBy).ne(this.id).f(Msg.Fields.to).eq(null));
                q.sort(Msg.Fields.timestamp);
                List<Msg> messagesList = q.asList();
                ArrayList<Msg> toStore = new ArrayList<Msg>();
                for (Msg msg : messagesList) {
                    if ((msg = this.morphium.reread(msg)) == null || !msg.getLockedBy().equals(this.id) && !msg.getLockedBy().equals("ALL")) continue;
                    if (msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
                        log.warn((Object)"Found outdated message - deleting it!");
                        this.morphium.delete(msg);
                        continue;
                    }
                    try {
                        Msg answer;
                        for (MessageListener l : this.listeners) {
                            answer = l.onMessage(this, msg);
                            if (this.autoAnswer && answer == null) {
                                answer = new Msg(msg.getName(), "received", "");
                            }
                            if (answer == null) continue;
                            msg.sendAnswer(this, answer);
                        }
                        if (this.listenerByName.get(msg.getName()) != null) {
                            for (MessageListener l : this.listenerByName.get(msg.getName())) {
                                answer = l.onMessage(this, msg);
                                if (this.autoAnswer && answer == null) {
                                    answer = new Msg(msg.getName(), "received", "");
                                }
                                if (answer == null) continue;
                                msg.sendAnswer(this, answer);
                            }
                        }
                    }
                    catch (Throwable t) {
                        log.error((Object)"Processing failed", t);
                    }
                    if (msg.getType().equals((Object)MsgType.SINGLE)) {
                        this.morphium.delete(msg);
                    }
                    if (msg.getLockedBy().equals("ALL")) {
                        Query<Msg> idq = MorphiumSingleton.get().createQueryFor(Msg.class);
                        idq.f(Msg.Fields.msgId).eq(msg.getMsgId());
                        MorphiumSingleton.get().push(idq, Msg.Fields.processedBy, (Object)this.id);
                        continue;
                    }
                    msg.addProcessedId(this.id);
                    msg.setLockedBy(null);
                    msg.setLocked(0L);
                    toStore.add(msg);
                }
                this.morphium.storeList(toStore);
                while (this.morphium.getWriteBufferCount() > 0) {
                    Thread.sleep(100L);
                }
            }
            catch (Throwable e) {
                log.error((Object)("Unhandled exception " + e.getMessage()), e);
            }
            finally {
                try {
                    Messaging.sleep(this.pause);
                }
                catch (InterruptedException ignored) {}
            }
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Messaging " + this.id + " stopped!"));
        }
        if (!this.running) {
            this.listeners.clear();
            this.listenerByName.clear();
            this.writer.shutdown();
        }
    }

    public void addListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            this.listenerByName.put(n, new ArrayList());
        }
        this.listenerByName.get(n).add(l);
    }

    public void removeListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            return;
        }
        this.listenerByName.get(n).remove(l);
    }

    public String getSenderId() {
        return this.id;
    }

    public void setSenderId(String id) {
        this.id = id;
    }

    public int getPause() {
        return this.pause;
    }

    public void setPause(int pause) {
        this.pause = pause;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    public void addMessageListener(MessageListener l) {
        this.listeners.add(l);
    }

    public void removeMessageListener(MessageListener l) {
        this.listeners.remove(l);
    }

    public void queueMessage(Msg m) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Queueing message " + m.getMsg()));
        }
        m.setSender(this.id);
        m.addProcessedId(this.id);
        m.setLockedBy(null);
        m.setLocked(0L);
        this.writeBuffer.add(m);
    }

    public void storeMessage(Msg m) {
        m.setSender(this.id);
        m.addProcessedId(this.id);
        m.setLockedBy(null);
        m.setLocked(0L);
        this.morphium.storeNoCache(m);
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public void setAutoAnswer(boolean autoAnswer) {
        this.autoAnswer = autoAnswer;
    }
}

