/*
 * Decompiled with CFR 0.152.
 */
package com.blacklocus.qs;

import com.blacklocus.misc.ExceptingRunnable;
import com.blacklocus.qs.QueueItemHandler;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueReader<Q, T, R>
extends ExceptingRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(QueueReader.class);
    public static final long DEFAULT_SLEEP_MS = 20000L;
    protected Iterable<Collection<Q>> queueItemProvider;
    protected QueueItemHandler<Q, T, R> handler;
    protected ExecutorService executor;
    protected long sleepMs;

    public QueueReader(Iterable<Collection<Q>> provider, QueueItemHandler<Q, T, R> handler, ExecutorService executor) {
        this(provider, handler, executor, 20000L);
    }

    public QueueReader(Iterable<Collection<Q>> provider, QueueItemHandler<Q, T, R> handler, ExecutorService executor, long sleepMs) {
        this.queueItemProvider = provider;
        this.handler = handler;
        this.executor = executor;
        this.sleepMs = sleepMs;
    }

    @Override
    public void go() throws Exception {
        for (Collection<Q> queueItems : this.queueItemProvider) {
            try {
                if (queueItems.size() > 0) {
                    for (final Q queueItem : queueItems) {
                        this.handler.withFuture(queueItem, this.executor.submit(new Callable<Pair<Q, R>>(){

                            @Override
                            public Pair<Q, R> call() throws Exception {
                                try {
                                    Object converted = QueueReader.this.handler.convert(queueItem);
                                    Object result = QueueReader.this.handler.process(converted);
                                    QueueReader.this.handler.onSuccess(queueItem, result);
                                    Pair pair = Pair.of((Object)queueItem, result);
                                    return pair;
                                }
                                catch (Throwable t) {
                                    LOG.error("An error occurred while processing item {}", queueItem, (Object)t);
                                    QueueReader.this.handler.onError(queueItem, t);
                                    throw new RuntimeException(t);
                                }
                                finally {
                                    QueueReader.this.handler.onComplete(queueItem);
                                }
                            }
                        }));
                    }
                    continue;
                }
                LOG.debug("No items available... sleeping for {} ms", (Object)this.sleepMs);
                Thread.sleep(this.sleepMs);
            }
            catch (InterruptedException e) {
                LOG.error("Reader thread interrupted", (Throwable)e);
                Thread.currentThread().interrupt();
            }
            catch (Throwable t) {
                LOG.error("Runtime error in reader thread", t);
            }
        }
    }
}

