/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ExpressionComparator;
import org.apache.camel.util.ExpressionListComparator;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class Resequencer
extends ServiceSupport
implements Runnable {
    private static final transient Log log = LogFactory.getLog(Resequencer.class);
    private Endpoint endpoint;
    private Processor processor;
    private Set<Exchange> set;
    private long batchTimeout = 1000L;
    private int batchSize = 100;
    private PollingConsumer consumer;
    private ExceptionHandler exceptionHandler;

    public Resequencer(Endpoint endpoint, Processor processor, Expression<Exchange> expression) {
        this(endpoint, processor, Resequencer.createSet(expression));
    }

    public Resequencer(Endpoint endpoint, Processor processor, List<Expression<Exchange>> expressions) {
        this(endpoint, processor, Resequencer.createSet(expressions));
    }

    public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange> set) {
        this.endpoint = endpoint;
        this.processor = processor;
        this.set = set;
    }

    public String toString() {
        return "Resequencer[to: " + this.processor + "]";
    }

    @Override
    public void run() {
        log.debug((Object)("Starting thread for " + this));
        while (!this.isStopped() && !this.isStopping()) {
            try {
                this.processBatch();
            }
            catch (Exception e) {
                this.getExceptionHandler().handleException(e);
            }
        }
        this.set.clear();
    }

    public ExceptionHandler getExceptionHandler() {
        if (this.exceptionHandler == null) {
            this.exceptionHandler = new LoggingExceptionHandler(this.getClass());
        }
        return this.exceptionHandler;
    }

    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public long getBatchTimeout() {
        return this.batchTimeout;
    }

    public void setBatchTimeout(long batchTimeout) {
        this.batchTimeout = batchTimeout;
    }

    protected synchronized void processBatch() throws Exception {
        long timeout;
        Object exchange;
        long start = System.currentTimeMillis();
        long end = start + this.batchTimeout;
        for (int i = 0; i < this.batchSize && (exchange = this.consumer.receive(timeout = end - System.currentTimeMillis())) != null; ++i) {
            this.set.add((Exchange)exchange);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Finsihed batch size: " + this.batchSize + " timeout: " + this.batchTimeout + " so sending set: " + this.set));
        }
        Iterator<Exchange> iter = this.set.iterator();
        while (iter.hasNext()) {
            Exchange exchange2 = iter.next();
            iter.remove();
            this.processor.process(exchange2);
        }
    }

    @Override
    protected void doStart() throws Exception {
        this.consumer = this.endpoint.createPollingConsumer();
        ServiceHelper.startServices(this.processor, this.consumer);
        Thread thread = new Thread((Runnable)this, this + " Polling Thread");
        thread.start();
    }

    @Override
    protected void doStop() throws Exception {
        ServiceHelper.stopServices(this.consumer, this.processor);
        this.consumer = null;
    }

    protected static Set<Exchange> createSet(Expression<Exchange> expression) {
        return Resequencer.createSet(new ExpressionComparator<Exchange>(expression));
    }

    protected static Set<Exchange> createSet(List<Expression<Exchange>> expressions) {
        if (expressions.size() == 1) {
            return Resequencer.createSet(expressions.get(0));
        }
        return Resequencer.createSet(new ExpressionListComparator(expressions));
    }

    protected static Set<Exchange> createSet(Comparator<? super Exchange> comparator) {
        return new TreeSet<Exchange>(comparator);
    }
}

