/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.couchbase;

import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.view.ViewOptions;
import com.couchbase.client.java.view.ViewOrdering;
import com.couchbase.client.java.view.ViewResult;
import com.couchbase.client.java.view.ViewRow;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.component.couchbase.CouchbaseCollectionOperation;
import org.apache.camel.component.couchbase.CouchbaseEndpoint;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.support.resume.ResumeStrategyHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CouchbaseConsumer
extends ScheduledBatchPollingConsumer
implements ResumeAware<ResumeStrategy> {
    private static final Logger LOG = LoggerFactory.getLogger(CouchbaseConsumer.class);
    private final Lock lock = new ReentrantLock();
    private final CouchbaseEndpoint endpoint;
    private Bucket bucket;
    private Collection collection;
    private ViewOptions viewOptions;
    private ResumeStrategy resumeStrategy;

    public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.bucket = client;
        this.endpoint = endpoint;
    }

    protected void doInit() throws Exception {
        int skip;
        super.doInit();
        Scope scope = this.endpoint.getScope() != null ? this.bucket.scope(this.endpoint.getScope()) : this.bucket.defaultScope();
        this.collection = this.endpoint.getCollection() != null ? scope.collection(this.endpoint.getCollection()) : this.bucket.defaultCollection();
        this.viewOptions = ViewOptions.viewOptions();
        int limit = this.endpoint.getLimit();
        if (limit > 0) {
            this.viewOptions.limit(limit);
        }
        if ((skip = this.endpoint.getSkip()) > 0) {
            this.viewOptions.skip(skip);
        }
        if (this.endpoint.isDescending()) {
            this.viewOptions.order(ViewOrdering.DESCENDING);
        }
        String rangeStartKey = this.endpoint.getRangeStartKey();
        String rangeEndKey = this.endpoint.getRangeEndKey();
        if (rangeStartKey == null || rangeStartKey.isEmpty() || rangeEndKey == null || rangeEndKey.isEmpty()) {
            return;
        }
        this.viewOptions.startKey(rangeStartKey).endKey(rangeEndKey);
    }

    protected void doStart() throws Exception {
        super.doStart();
        ResumeStrategyHelper.resume((CamelContext)this.getEndpoint().getCamelContext(), (Object)((Object)this), (ResumeStrategy)this.resumeStrategy, (String)"CamelCqlResumeQuery");
    }

    protected void doStop() throws Exception {
        super.doStop();
        if (this.bucket != null) {
            this.bucket.core().shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int poll() throws Exception {
        this.lock.lock();
        try {
            ViewResult result = this.bucket.viewQuery(this.endpoint.getDesignDocumentName(), this.endpoint.getViewName(), this.viewOptions);
            this.forceConsumerAsReady();
            if (LOG.isTraceEnabled()) {
                LOG.trace("ViewResponse: {}", (Object)result);
            }
            String consumerProcessedStrategy = this.endpoint.getConsumerProcessedStrategy();
            ArrayDeque<Object> exchanges = new ArrayDeque<Object>();
            for (ViewRow row : result.rows()) {
                String id = (String)row.id().get();
                Optional doc = this.endpoint.isFullDocument() ? CouchbaseCollectionOperation.getDocument(this.collection, id, this.endpoint.getQueryTimeout(), this.endpoint.getConsumerRetryPause()) : row.valueAs(Object.class);
                String key = ((JsonNode)row.keyAs(JsonNode.class).get()).asText();
                String designDocumentName = this.endpoint.getDesignDocumentName();
                String viewName = this.endpoint.getViewName();
                Exchange exchange = this.createExchange(true);
                exchange.getIn().setBody((Object)doc);
                exchange.getIn().setHeader("CCB_ID", (Object)id);
                exchange.getIn().setHeader("CCB_KEY", (Object)key);
                exchange.getIn().setHeader("CCB_DDN", (Object)designDocumentName);
                exchange.getIn().setHeader("CCB_VN", (Object)viewName);
                if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Deleting doc with ID {}", (Object)id);
                    }
                    CouchbaseCollectionOperation.removeDocument(this.collection, id, this.endpoint.getWriteQueryTimeout(), this.endpoint.getConsumerRetryPause());
                } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Filtering out ID {}", (Object)id);
                    }
                } else {
                    LOG.trace("No strategy set for already processed docs, beware of duplicates!");
                }
                this.logDetails(id, doc, key, designDocumentName, viewName, exchange);
                exchanges.add(exchange);
            }
            int n = this.processBatch(exchanges);
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    public int processBatch(Queue<Object> exchanges) throws Exception {
        int total;
        int answer = total = exchanges.size();
        if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) {
            LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", (Object)this.maxMessagesPerPoll, (Object)total);
            total = this.maxMessagesPerPoll;
        }
        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
            Exchange exchange = (Exchange)exchanges.poll();
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, (Object)index);
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, (Object)total);
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, (Object)(index == total - 1 ? 1 : 0));
            this.pendingExchanges = total - index - 1;
            this.getProcessor().process(exchange);
        }
        return answer;
    }

    private void logDetails(String id, Object doc, String key, String designDocumentName, String viewName, Exchange exchange) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Created exchange = {}", (Object)exchange);
            LOG.trace("Added Document in body = {}", doc);
            LOG.trace("Adding to Header");
            LOG.trace("ID = {}", (Object)id);
            LOG.trace("Key = {}", (Object)key);
            LOG.trace("Design Document Name = {}", (Object)designDocumentName);
            LOG.trace("View Name = {}", (Object)viewName);
        }
    }

    public ResumeStrategy getResumeStrategy() {
        return this.resumeStrategy;
    }

    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
        this.resumeStrategy = resumeStrategy;
    }
}

