/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.utils;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.IOExceptionSupplier;

public class AsyncRecordReader<T>
implements RecordReader<T> {
    private static final ExecutorService ASYNC_EXECUTOR = Executors.newCachedThreadPool(new ExecutorThreadFactory("paimon-reader-async-thread"));
    private final BlockingQueue<Element> queue = new LinkedBlockingQueue<Element>();
    private final Future<Void> future;
    private boolean isEnd = false;

    public AsyncRecordReader(IOExceptionSupplier<RecordReader<T>> supplier) {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        this.future = ASYNC_EXECUTOR.submit(() -> this.asyncRead(supplier, classLoader));
    }

    private Void asyncRead(IOExceptionSupplier<RecordReader<T>> supplier, ClassLoader classLoader) throws IOException {
        Thread.currentThread().setContextClassLoader(classLoader);
        try (RecordReader<T> reader = supplier.get();){
            while (true) {
                RecordReader.RecordIterator<T> batch;
                if ((batch = reader.readBatch()) == null) {
                    this.queue.add(new Element(true, null));
                    Void void_ = null;
                    return void_;
                }
                this.queue.add(new Element(false, batch));
            }
        }
    }

    @Override
    @Nullable
    public RecordReader.RecordIterator<T> readBatch() throws IOException {
        if (this.isEnd) {
            return null;
        }
        try {
            Element element;
            do {
                element = this.queue.poll(2L, TimeUnit.SECONDS);
                this.checkException();
            } while (element == null);
            if (element.isEnd) {
                this.isEnd = true;
                return null;
            }
            return element.batch;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
    }

    private void checkException() throws IOException, InterruptedException {
        if (this.future.isDone()) {
            try {
                this.future.get();
            }
            catch (ExecutionException e) {
                throw new IOException(e.getCause());
            }
        }
    }

    @Override
    public void close() throws IOException {
        this.future.cancel(true);
    }

    private class Element {
        private final boolean isEnd;
        private final RecordReader.RecordIterator<T> batch;

        private Element(boolean isEnd, RecordReader.RecordIterator<T> batch) {
            this.isEnd = isEnd;
            this.batch = batch;
        }
    }
}

