/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.store.io;

import java.util.concurrent.TimeUnit;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.store.BatchingPageCache;
import org.neo4j.unsafe.impl.batchimport.store.io.Funnel;
import org.neo4j.unsafe.impl.batchimport.store.io.JobMonitor;
import org.neo4j.unsafe.impl.batchimport.store.io.Monitor;
import org.neo4j.unsafe.impl.batchimport.store.io.WriteQueue;

public class IoQueue
implements BatchingPageCache.WriterFactory {
    private final TaskExecutor<Void> executor;
    private final JobMonitor jobMonitor = new JobMonitor();
    private final BatchingPageCache.WriterFactory delegateFactory;

    public IoQueue(int initialProcessorCount, int maxProcessors, int queueSize, BatchingPageCache.WriterFactory delegateFactory) {
        this(new DynamicTaskExecutor<Void>(initialProcessorCount, maxProcessors, queueSize, DynamicTaskExecutor.DEFAULT_PARK_STRATEGY, "IoQueue I/O thread"), delegateFactory);
    }

    IoQueue(TaskExecutor<Void> executor, BatchingPageCache.WriterFactory delegateFactory) {
        this.executor = executor;
        this.delegateFactory = delegateFactory;
    }

    @Override
    public BatchingPageCache.Writer create(StoreChannel channel, Monitor monitor) {
        BatchingPageCache.Writer writer = this.delegateFactory.create(channel, monitor);
        WriteQueue queue = new WriteQueue(this.executor, this.jobMonitor);
        return new Funnel(writer, queue);
    }

    @Override
    public void awaitEverythingWritten() {
        long endTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10L);
        while (this.jobMonitor.hasActiveJobs()) {
            this.executor.assertHealthy();
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (System.currentTimeMillis() <= endTime) continue;
            throw new RuntimeException("Didn't finish within designated time");
        }
    }

    @Override
    public void shutdown() {
        this.executor.shutdown(true);
    }

    @Override
    public int numberOfProcessors() {
        return this.executor.numberOfProcessors();
    }

    @Override
    public boolean incrementNumberOfProcessors() {
        return this.executor.incrementNumberOfProcessors();
    }

    @Override
    public boolean decrementNumberOfProcessors() {
        return this.executor.decrementNumberOfProcessors();
    }

    public String toString() {
        return this.getClass().getSimpleName() + "[" + this.delegateFactory + ", threads:" + this.executor.numberOfProcessors() + "]";
    }
}

