/*
 * Decompiled with CFR 0.152.
 */
package org.granite.gravity;

import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.Message;
import java.io.IOException;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.granite.context.AMFContextImpl;
import org.granite.context.GraniteContext;
import org.granite.gravity.AsyncHttpContext;
import org.granite.gravity.AsyncPublishedMessage;
import org.granite.gravity.AsyncPublisher;
import org.granite.gravity.AsyncReceiver;
import org.granite.gravity.Channel;
import org.granite.gravity.Gravity;
import org.granite.gravity.GravityConfig;
import org.granite.gravity.GravityManager;
import org.granite.gravity.MessagePublishingException;
import org.granite.gravity.MessageReceivingException;
import org.granite.gravity.Subscription;
import org.granite.logging.Logger;
import org.granite.messaging.webapp.HttpGraniteContext;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class AbstractChannel
implements Channel {
    private static final Logger log = Logger.getLogger(AbstractChannel.class);
    protected final String id;
    protected final ServletConfig servletConfig;
    protected final ConcurrentMap<String, Subscription> subscriptions = new ConcurrentHashMap<String, Subscription>();
    protected LinkedList<AsyncPublishedMessage> publishedQueue = new LinkedList();
    protected final Lock publishedQueueLock = new ReentrantLock();
    protected LinkedList<AsyncMessage> receivedQueue = new LinkedList();
    protected final Lock receivedQueueLock = new ReentrantLock();
    protected final AsyncPublisher publisher;
    protected final AsyncReceiver receiver;

    protected AbstractChannel(ServletConfig servletConfig, GravityConfig gravityConfig, String id) {
        if (id == null) {
            throw new NullPointerException("id cannot be null");
        }
        this.id = id;
        this.servletConfig = servletConfig;
        this.publisher = new AsyncPublisher(this);
        this.receiver = new AsyncReceiver(this);
    }

    protected abstract boolean hasAsyncHttpContext();

    protected abstract AsyncHttpContext acquireAsyncHttpContext();

    protected abstract void releaseAsyncHttpContext(AsyncHttpContext var1);

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public Gravity getGravity() {
        return GravityManager.getGravity(this.getServletContext());
    }

    @Override
    public Subscription addSubscription(String destination, String subTopicId, String subscriptionId, boolean noLocal) {
        Subscription subscription = new Subscription(this, destination, subTopicId, subscriptionId, noLocal);
        Subscription present = this.subscriptions.putIfAbsent(subscriptionId, subscription);
        return present != null ? present : subscription;
    }

    @Override
    public Collection<Subscription> getSubscriptions() {
        return this.subscriptions.values();
    }

    @Override
    public Subscription removeSubscription(String subscriptionId) {
        return (Subscription)this.subscriptions.remove(subscriptionId);
    }

    @Override
    public void publish(AsyncPublishedMessage message) throws MessagePublishingException {
        if (message == null) {
            throw new NullPointerException("message cannot be null");
        }
        this.publishedQueueLock.lock();
        try {
            this.publishedQueue.add(message);
        }
        finally {
            this.publishedQueueLock.unlock();
        }
        this.publisher.queue(this.getGravity());
    }

    @Override
    public boolean hasPublishedMessage() {
        this.publishedQueueLock.lock();
        try {
            boolean bl = !this.publishedQueue.isEmpty();
            return bl;
        }
        finally {
            this.publishedQueueLock.unlock();
        }
    }

    @Override
    public boolean runPublish() {
        LinkedList<AsyncPublishedMessage> publishedCopy = null;
        this.publishedQueueLock.lock();
        try {
            if (this.publishedQueue.isEmpty()) {
                return false;
            }
            publishedCopy = this.publishedQueue;
            this.publishedQueue = new LinkedList();
        }
        finally {
            this.publishedQueueLock.unlock();
        }
        for (AsyncPublishedMessage message : publishedCopy) {
            try {
                message.publish(this);
            }
            catch (Exception e) {
                log.error(e, "Error while trying to publish message: %s", message);
            }
        }
        return true;
    }

    @Override
    public void receive(AsyncMessage message) throws MessageReceivingException {
        if (message == null) {
            throw new NullPointerException("message cannot be null");
        }
        Gravity gravity = this.getGravity();
        this.receivedQueueLock.lock();
        try {
            if (this.receivedQueue.size() + 1 > gravity.getGravityConfig().getMaxMessagesQueuedPerChannel()) {
                throw new MessageReceivingException((Message)message, "Could not queue message (channel's queue is full) for channel: " + this);
            }
            this.receivedQueue.add(message);
        }
        finally {
            this.receivedQueueLock.unlock();
        }
        if (this.hasAsyncHttpContext()) {
            this.receiver.queue(gravity);
        }
    }

    @Override
    public boolean hasReceivedMessage() {
        this.receivedQueueLock.lock();
        try {
            boolean bl = !this.receivedQueue.isEmpty();
            return bl;
        }
        finally {
            this.receivedQueueLock.unlock();
        }
    }

    @Override
    public boolean runReceive() {
        return this.runReceived(null);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public boolean runReceived(AsyncHttpContext asyncHttpContext) {
        boolean httpAsParam = asyncHttpContext != null;
        LinkedList<AsyncMessage> messages = null;
        OutputStream os = null;
        try {
            this.receivedQueueLock.lock();
            try {
                if (this.receivedQueue.isEmpty()) {
                    return false;
                }
                if (asyncHttpContext == null && (asyncHttpContext = this.acquireAsyncHttpContext()) == null) {
                    return false;
                }
                messages = this.receivedQueue;
                this.receivedQueue = new LinkedList();
            }
            finally {
                this.receivedQueueLock.unlock();
            }
            HttpServletRequest request = asyncHttpContext.getRequest();
            HttpServletResponse response = asyncHttpContext.getResponse();
            String correlationId = asyncHttpContext.getConnectMessage().getMessageId();
            AsyncMessage[] messagesArray = new AsyncMessage[messages.size()];
            int i = 0;
            Iterator iterator = messages.iterator();
            while (true) {
                if (!iterator.hasNext()) {
                    Gravity gravity = this.getGravity();
                    HttpGraniteContext context = HttpGraniteContext.createThreadIntance(gravity.getGraniteConfig(), gravity.getServicesConfig(), null, request, response);
                    ((AMFContextImpl)context.getAMFContext()).setCurrentAmf3Message(asyncHttpContext.getConnectMessage());
                    response.flushBuffer();
                    response.setStatus(200);
                    response.setContentType("application/x-amf");
                    response.setDateHeader("Expire", 0L);
                    response.setHeader("Cache-Control", "no-store");
                    os = response.getOutputStream();
                    ObjectOutput amf3Serializer = context.getGraniteConfig().newAMF3Serializer(os);
                    log.debug("<< [MESSAGES for channel=%s] %s", this, messagesArray);
                    amf3Serializer.writeObject(messagesArray);
                    os.flush();
                    response.flushBuffer();
                    return true;
                }
                AsyncMessage message = (AsyncMessage)iterator.next();
                message.setCorrelationId(correlationId);
                messagesArray[i++] = message;
            }
        }
        catch (IOException e) {
            log.warn(e, "Could not send messages to channel: %s (retrying later)", this);
            GravityConfig gravityConfig = this.getGravity().getGravityConfig();
            if (messages == null) return true;
            if (!gravityConfig.isRetryOnError()) return true;
            this.receivedQueueLock.lock();
            try {
                if (this.receivedQueue.size() + messages.size() > gravityConfig.getMaxMessagesQueuedPerChannel()) {
                    log.warn("Channel %s has reached its maximum queue capacity %s (throwing %s messages)", this, gravityConfig.getMaxMessagesQueuedPerChannel(), messages.size());
                    return true;
                }
                this.receivedQueue.addAll(0, messages);
                return true;
            }
            finally {
                this.receivedQueueLock.unlock();
            }
        }
        finally {
            try {
                GraniteContext.release();
            }
            catch (Exception e) {}
            try {
                if (os != null) {
                    try {
                        os.close();
                    }
                    catch (IOException e) {
                        log.warn(e, "Could not close output stream (ignored)", new Object[0]);
                    }
                }
            }
            finally {
                if (!httpAsParam) {
                    this.releaseAsyncHttpContext(asyncHttpContext);
                }
            }
        }
    }

    @Override
    public void destroy() {
        Gravity gravity = this.getGravity();
        gravity.cancel(this.publisher);
        gravity.cancel(this.receiver);
        this.subscriptions.clear();
    }

    protected boolean queueReceiver() {
        if (this.hasReceivedMessage()) {
            this.receiver.queue(this.getGravity());
            return true;
        }
        return false;
    }

    protected ServletConfig getServletConfig() {
        return this.servletConfig;
    }

    protected ServletContext getServletContext() {
        return this.servletConfig.getServletContext();
    }

    public boolean equals(Object obj) {
        return obj instanceof Channel && this.id.equals(((Channel)obj).getId());
    }

    public int hashCode() {
        return this.id.hashCode();
    }

    public String toString() {
        return String.valueOf(this.getClass().getName()) + " {id=" + this.id + ", subscriptions=" + this.subscriptions.values() + "}";
    }
}

