/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.util;

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.io.Closer;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelIterable<T>
extends CloseableGroup
implements CloseableIterable<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class);
    private static final int DEFAULT_MAX_QUEUE_SIZE = 30000;
    private final Iterable<? extends Iterable<T>> iterables;
    private final ExecutorService workerPool;
    private final int approximateMaxQueueSize;

    public ParallelIterable(Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) {
        this(iterables, workerPool, 30000);
    }

    public ParallelIterable(Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int approximateMaxQueueSize) {
        this.iterables = Preconditions.checkNotNull(iterables, "Input iterables cannot be null");
        this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool cannot be null");
        this.approximateMaxQueueSize = approximateMaxQueueSize;
    }

    @Override
    public CloseableIterator<T> iterator() {
        ParallelIterator iter = new ParallelIterator(this.iterables, this.workerPool, this.approximateMaxQueueSize);
        this.addCloseable(iter);
        return iter;
    }

    private static class Task<T>
    implements Supplier<Optional<Task<T>>>,
    Closeable {
        private final Iterable<T> input;
        private final ConcurrentLinkedQueue<T> queue;
        private final AtomicBoolean closed;
        private final int approximateMaxQueueSize;
        private Iterator<T> iterator = null;

        Task(Iterable<T> input, ConcurrentLinkedQueue<T> queue, AtomicBoolean closed, int approximateMaxQueueSize) {
            this.input = Preconditions.checkNotNull(input, "input cannot be null");
            this.queue = Preconditions.checkNotNull(queue, "queue cannot be null");
            this.closed = Preconditions.checkNotNull(closed, "closed cannot be null");
            this.approximateMaxQueueSize = approximateMaxQueueSize;
        }

        @Override
        public Optional<Task<T>> get() {
            try {
                if (this.iterator == null) {
                    this.iterator = this.input.iterator();
                }
                while (this.iterator.hasNext()) {
                    if (this.queue.size() >= this.approximateMaxQueueSize) {
                        return Optional.of(this);
                    }
                    T next = this.iterator.next();
                    if (!this.closed.get()) {
                        this.queue.add(next);
                        continue;
                    }
                    break;
                }
            }
            catch (Throwable e) {
                block11: {
                    try {
                        this.close();
                    }
                    catch (IOException closeException) {
                        if (closeException == e) break block11;
                        e.addSuppressed(closeException);
                    }
                }
                throw e;
            }
            try {
                this.close();
            }
            catch (IOException e) {
                throw new UncheckedIOException("Close failed", e);
            }
            return Optional.empty();
        }

        @Override
        public void close() throws IOException {
            this.iterator = null;
            if (this.input instanceof Closeable) {
                ((Closeable)((Object)this.input)).close();
            }
        }
    }

    private static class ParallelIterator<T>
    implements CloseableIterator<T> {
        private final Iterator<Task<T>> tasks;
        private final Deque<Task<T>> yieldedTasks = new ArrayDeque<Task<T>>();
        private final ExecutorService workerPool;
        private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
        private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue();
        private final int maxQueueSize;
        private final AtomicBoolean closed = new AtomicBoolean(false);

        private ParallelIterator(Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) {
            this.tasks = Iterables.transform(iterables, iterable -> new Task<T>(iterable, this.queue, this.closed, maxQueueSize)).iterator();
            this.workerPool = workerPool;
            this.maxQueueSize = maxQueueSize;
            this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            this.closed.set(true);
            try (Closer closer = Closer.create();){
                CompletableFuture<Optional<Task<T>>>[] completableFutureArray = this;
                synchronized (this) {
                    this.yieldedTasks.forEach(closer::register);
                    this.yieldedTasks.clear();
                    // ** MonitorExit[completableFutureArray] (shouldn't be in output)
                    for (CompletableFuture<Optional<Task<Optional<Task<T>>>>> completableFuture : this.taskFutures) {
                        if (completableFuture == null) continue;
                        completableFuture.cancel(true);
                        completableFuture.thenAccept(continuation -> {
                            if (continuation.isPresent()) {
                                try {
                                    ((Task)continuation.get()).close();
                                }
                                catch (IOException e) {
                                    LOG.error("Task close failed", (Throwable)e);
                                }
                            }
                        });
                    }
                    this.queue.clear();
                }
            }
            catch (IOException e) {
                throw new UncheckedIOException("Close failed", e);
            }
            {
                return;
            }
        }

        private synchronized boolean checkTasks() {
            Preconditions.checkState(!this.closed.get(), "Already closed");
            boolean hasRunningTask = false;
            for (int i = 0; i < this.taskFutures.length; ++i) {
                if (this.taskFutures[i] == null || this.taskFutures[i].isDone()) {
                    if (this.taskFutures[i] != null) {
                        try {
                            Optional<Task<Task>> continuation = this.taskFutures[i].get();
                            continuation.ifPresent(this.yieldedTasks::addLast);
                        }
                        catch (ExecutionException e) {
                            if (e.getCause() instanceof RuntimeException) {
                                throw (RuntimeException)e.getCause();
                            }
                            throw new RuntimeException("Failed while running parallel task", e.getCause());
                        }
                        catch (InterruptedException e) {
                            throw new RuntimeException("Interrupted while running parallel task", e);
                        }
                    }
                    this.taskFutures[i] = this.submitNextTask();
                }
                if (this.taskFutures[i] == null) continue;
                hasRunningTask = true;
            }
            return !this.closed.get() && (this.tasks.hasNext() || hasRunningTask);
        }

        private CompletableFuture<Optional<Task<T>>> submitNextTask() {
            if (!this.closed.get()) {
                if (!this.yieldedTasks.isEmpty()) {
                    return CompletableFuture.supplyAsync((Supplier)this.yieldedTasks.removeFirst(), this.workerPool);
                }
                if (this.tasks.hasNext()) {
                    return CompletableFuture.supplyAsync((Supplier)this.tasks.next(), this.workerPool);
                }
            }
            return null;
        }

        @Override
        public synchronized boolean hasNext() {
            Preconditions.checkState(!this.closed.get(), "Already closed");
            if (!this.queue.isEmpty()) {
                return true;
            }
            while (this.checkTasks()) {
                if (!this.queue.isEmpty()) {
                    return true;
                }
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            return !this.queue.isEmpty();
        }

        @Override
        public synchronized T next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.queue.poll();
        }
    }
}

