/*
 * Decompiled with CFR 0.152.
 */
package asia.stampy.server.listener.subscription;

import asia.stampy.client.message.ack.AckHeader;
import asia.stampy.client.message.ack.AckMessage;
import asia.stampy.client.message.nack.NackHeader;
import asia.stampy.client.message.nack.NackMessage;
import asia.stampy.common.StampyLibrary;
import asia.stampy.common.gateway.AbstractStampyMessageGateway;
import asia.stampy.common.gateway.HostPort;
import asia.stampy.common.gateway.StampyMessageListener;
import asia.stampy.common.message.StampyMessage;
import asia.stampy.common.message.StompMessageType;
import asia.stampy.common.message.interceptor.AbstractOutgoingMessageInterceptor;
import asia.stampy.common.message.interceptor.InterceptException;
import asia.stampy.server.listener.subscription.StampyAcknowledgementHandler;
import asia.stampy.server.listener.subscription.UnexpectedAcknowledgementException;
import asia.stampy.server.message.message.MessageHeader;
import asia.stampy.server.message.message.MessageMessage;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang.StringUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@StampyLibrary(libraryName="stampy-client-server")
public abstract class AbstractAcknowledgementListenerAndInterceptor<SVR extends AbstractStampyMessageGateway>
extends AbstractOutgoingMessageInterceptor<SVR>
implements StampyMessageListener {
    private static final StompMessageType[] TYPES = new StompMessageType[]{StompMessageType.ACK, StompMessageType.NACK, StompMessageType.MESSAGE};
    private StampyAcknowledgementHandler handler;
    protected Map<HostPort, Queue<String>> messages = new ConcurrentHashMap<HostPort, Queue<String>>();
    private Timer ackTimer = new Timer("Stampy Acknowledgement Timer", true);
    private long ackTimeoutMillis = 60000L;

    @Override
    public StompMessageType[] getMessageTypes() {
        return TYPES;
    }

    @Override
    public boolean isForMessage(StampyMessage<?> message) {
        switch (message.getMessageType()) {
            case MESSAGE: {
                return StringUtils.isNotEmpty((String)((MessageHeader)((MessageMessage)message).getHeader()).getAck());
            }
            case ACK: 
            case NACK: {
                return true;
            }
        }
        return false;
    }

    @Override
    public void messageReceived(StampyMessage<?> message, HostPort hostPort) throws Exception {
        switch (message.getMessageType()) {
            case ACK: {
                this.evaluateAck((AckHeader)((AckMessage)message).getHeader(), hostPort);
                break;
            }
            case NACK: {
                this.evaluateNack((NackHeader)((NackMessage)message).getHeader(), hostPort);
                break;
            }
        }
    }

    @Override
    public void interceptMessage(StampyMessage<?> message, HostPort hostPort) throws InterceptException {
        MessageMessage msg = (MessageMessage)message;
        String ack = ((MessageHeader)msg.getHeader()).getAck();
        Queue<String> queue = this.messages.get(hostPort);
        if (queue == null) {
            queue = new ConcurrentLinkedQueue<String>();
            this.messages.put(hostPort, queue);
        }
        queue.add(ack);
        this.startTimerTask(hostPort, ack);
    }

    private void startTimerTask(final HostPort hostPort, final String ack) {
        TimerTask task = new TimerTask(){

            public void run() {
                Queue<String> q = AbstractAcknowledgementListenerAndInterceptor.this.messages.get(hostPort);
                if (q == null || !q.contains(ack)) {
                    return;
                }
                AbstractAcknowledgementListenerAndInterceptor.this.getHandler().noAcknowledgementReceived(ack);
                q.remove(ack);
            }
        };
        this.ackTimer.schedule(task, this.getAckTimeoutMillis());
    }

    private void evaluateNack(NackHeader header, HostPort hostPort) throws Exception {
        String id = header.getId();
        if (!this.hasMessageAck(id, hostPort)) {
            throw new UnexpectedAcknowledgementException("No NACK message expected, yet received id " + id + " from " + hostPort);
        }
        this.clearMessageAck(id, hostPort);
        this.getHandler().nackReceived(id, header.getReceipt(), header.getTransaction());
    }

    private void evaluateAck(AckHeader header, HostPort hostPort) throws Exception {
        String id = header.getId();
        if (!this.hasMessageAck(id, hostPort)) {
            throw new UnexpectedAcknowledgementException("No ACK message expected, yet received id " + id + " from " + hostPort);
        }
        this.clearMessageAck(id, hostPort);
        this.getHandler().ackReceived(id, header.getReceipt(), header.getTransaction());
    }

    private boolean hasMessageAck(String messageId, HostPort hostPort) {
        Queue<String> ids = this.messages.get(hostPort);
        if (ids == null || ids.isEmpty()) {
            return false;
        }
        return ids.contains(messageId);
    }

    private void clearMessageAck(String messageId, HostPort hostPort) {
        Queue<String> ids = this.messages.get(hostPort);
        if (ids == null) {
            return;
        }
        ids.remove(messageId);
    }

    public long getAckTimeoutMillis() {
        return this.ackTimeoutMillis;
    }

    public void setAckTimeoutMillis(long ackTimeoutMillis) {
        this.ackTimeoutMillis = ackTimeoutMillis;
    }

    @Override
    public void setGateway(SVR gateway) {
        super.setGateway(gateway);
        this.ensureCleanup();
    }

    protected abstract void ensureCleanup();

    public StampyAcknowledgementHandler getHandler() {
        return this.handler;
    }

    public void setHandler(StampyAcknowledgementHandler handler) {
        this.handler = handler;
    }
}

