/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.util.Collection;
import java.util.Iterator;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.Resequencer;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class BatchProcessor
extends ServiceSupport
implements Runnable {
    private static final transient Log LOG = LogFactory.getLog(Resequencer.class);
    private Endpoint endpoint;
    private Processor processor;
    private Collection<Exchange> collection;
    private long batchTimeout = 1000L;
    private int batchSize = 100;
    private PollingConsumer consumer;
    private ExceptionHandler exceptionHandler;

    public BatchProcessor(Endpoint endpoint, Processor processor, Collection<Exchange> collection) {
        this.endpoint = endpoint;
        this.processor = processor;
        this.collection = collection;
    }

    public String toString() {
        return "BatchProcessor[to: " + this.processor + "]";
    }

    @Override
    public void run() {
        LOG.debug("Starting thread for " + this);
        while (!this.isStopped() && !this.isStopping()) {
            try {
                this.processBatch();
            }
            catch (Exception e) {
                this.getExceptionHandler().handleException(e);
            }
        }
        this.collection.clear();
    }

    public ExceptionHandler getExceptionHandler() {
        if (this.exceptionHandler == null) {
            this.exceptionHandler = new LoggingExceptionHandler(this.getClass());
        }
        return this.exceptionHandler;
    }

    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public long getBatchTimeout() {
        return this.batchTimeout;
    }

    public void setBatchTimeout(long batchTimeout) {
        this.batchTimeout = batchTimeout;
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public Processor getProcessor() {
        return this.processor;
    }

    protected synchronized void processBatch() throws Exception {
        long timeout;
        Object exchange;
        long start = System.currentTimeMillis();
        long end = start + this.batchTimeout;
        for (int i = 0; i < this.batchSize && (exchange = this.consumer.receive(timeout = end - System.currentTimeMillis())) != null; ++i) {
            this.collection.add((Exchange)exchange);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Finsihed batch size: " + this.batchSize + " timeout: " + this.batchTimeout + " so sending set: " + this.collection);
        }
        Iterator<Exchange> iter = this.collection.iterator();
        while (iter.hasNext()) {
            Exchange exchange2 = iter.next();
            iter.remove();
            this.processExchange(exchange2);
        }
    }

    protected void processExchange(Exchange exchange) throws Exception {
        this.processor.process(exchange);
    }

    @Override
    protected void doStart() throws Exception {
        this.consumer = this.endpoint.createPollingConsumer();
        ServiceHelper.startServices(this.processor, this.consumer);
        Thread thread = new Thread((Runnable)this, this + " Polling Thread");
        thread.start();
    }

    @Override
    protected void doStop() throws Exception {
        ServiceHelper.stopServices(this.consumer, this.processor);
        this.collection.clear();
    }

    protected Collection<Exchange> getCollection() {
        return this.collection;
    }
}

