/*
 * Decompiled with CFR 0.152.
 */
package org.apache.uima.camel;

import java.util.HashMap;
import java.util.Map;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.uima.aae.client.UimaASProcessStatusImpl;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.jms.error.handler.BrokerConnectionException;

public class UimaAsProducer
extends DefaultProducer
implements AsyncProcessor {
    private static final Log LOG = LogFactory.getLog(UimaAsProducer.class);
    private UimaAsynchronousEngine uimaAsEngine;
    private final Map<String, ExchangeAsyncCallbackPair> intermediateMap = new HashMap<String, ExchangeAsyncCallbackPair>();

    public UimaAsProducer(String brokerAddress, String queue, Integer casPoolSize, Integer timeout, Endpoint endpoint) throws Exception {
        super(endpoint);
        this.uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
        this.uimaAsEngine.addStatusCallbackListener((UimaAsBaseCallbackListener)new UimaStatusCallbackListener(this.intermediateMap));
        HashMap<String, Object> appCtx = new HashMap<String, Object>();
        appCtx.put("ServerURI", brokerAddress);
        appCtx.put("Endpoint", queue);
        if (casPoolSize != null) {
            appCtx.put("CasPoolSize", casPoolSize);
        }
        if (timeout != null) {
            appCtx.put("Timeout", timeout);
        }
        this.uimaAsEngine.initialize(appCtx);
    }

    public void process(Exchange exchange) throws Exception {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean process(Exchange exchange, AsyncCallback callback) {
        String rowId = (String)exchange.getIn().getBody(String.class);
        try {
            CAS cas = this.uimaAsEngine.getCAS();
            cas.setDocumentText(rowId);
            Map<String, ExchangeAsyncCallbackPair> map = this.intermediateMap;
            synchronized (map) {
                String refernceId = this.uimaAsEngine.sendCAS(cas);
                ExchangeAsyncCallbackPair exchangeCallback = new ExchangeAsyncCallbackPair();
                exchangeCallback.exchange = exchange;
                exchangeCallback.callback = callback;
                this.intermediateMap.put(refernceId, exchangeCallback);
            }
        }
        catch (Exception e) {
            LOG.warn((Object)"Failed to send CAS", (Throwable)e);
            exchange.setException((Throwable)e);
            callback.done(true);
            return true;
        }
        return false;
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class UimaStatusCallbackListener
    extends UimaAsBaseCallbackListener {
        private final Map<String, ExchangeAsyncCallbackPair> intermediateMap;

        private UimaStatusCallbackListener(Map<String, ExchangeAsyncCallbackPair> intermediateMap) {
            this.intermediateMap = intermediateMap;
        }

        public void initializationComplete(EntityProcessStatus aStatus) {
            if (aStatus != null && aStatus.isException()) {
                LOG.warn((Object)("Error on initializing: " + aStatus.getStatusMessage()));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus) {
            ExchangeAsyncCallbackPair exchangeCallbackPair;
            UimaASProcessStatusImpl statusImpl = (UimaASProcessStatusImpl)aStatus;
            String referenceId = statusImpl.getCasReferenceId();
            Map<String, ExchangeAsyncCallbackPair> map = this.intermediateMap;
            synchronized (map) {
                exchangeCallbackPair = this.intermediateMap.remove(referenceId);
            }
            if (exchangeCallbackPair != null) {
                if (aStatus.isException()) {
                    for (Exception e : aStatus.getExceptions()) {
                        LOG.warn((Object)e);
                    }
                    exchangeCallbackPair.exchange.setException((Throwable)new Exception(aStatus.getStatusMessage()));
                }
                exchangeCallbackPair.callback.done(false);
            } else {
                if (aStatus.isException()) {
                    for (Exception exception : aStatus.getExceptions()) {
                        if (!(exception instanceof BrokerConnectionException)) continue;
                        LOG.warn((Object)"Connection to broker lost, report all outstanding messages as failed!");
                        for (ExchangeAsyncCallbackPair callback : this.intermediateMap.values()) {
                            callback.exchange.setException((Throwable)exception);
                            callback.callback.done(false);
                        }
                        return;
                    }
                }
                LOG.warn((Object)("Could not find callback for CAS id: " + referenceId));
            }
        }

        public void collectionProcessComplete(EntityProcessStatus aStatus) {
            if (aStatus != null && aStatus.isException()) {
                LOG.warn((Object)("Error on collection process complete: " + aStatus.getStatusMessage()));
            }
        }
    }

    private static class ExchangeAsyncCallbackPair {
        Exchange exchange;
        AsyncCallback callback;

        private ExchangeAsyncCallbackPair() {
        }
    }
}

