/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.function.primitive.PrimitiveLongPredicate;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

public abstract class ExecutorServiceStep<T>
extends AbstractStep<T> {
    private TaskExecutor executor;
    private final int workAheadSize;
    private final int initialProcessorCount;
    private final boolean allowMultipleProcessors;
    private final PrimitiveLongPredicate catchUp = new PrimitiveLongPredicate(){

        public boolean accept(long queueSizeThreshold) {
            return (long)ExecutorServiceStep.this.queuedBatches.get() <= queueSizeThreshold;
        }
    };
    private final AtomicLong lastBatchEndTime = new AtomicLong();

    protected ExecutorServiceStep(StageControl control, String name, int workAheadSize, int movingAverageSize, int initialProcessorCount, boolean allowMultipleProcessors) {
        super(control, name, movingAverageSize);
        this.workAheadSize = workAheadSize;
        this.initialProcessorCount = initialProcessorCount;
        this.allowMultipleProcessors = allowMultipleProcessors;
    }

    protected ExecutorServiceStep(StageControl control, String name, int workAheadSize, int movingAverageSize, int initialProcessorCount) {
        this(control, name, workAheadSize, movingAverageSize, initialProcessorCount, initialProcessorCount > 1);
    }

    @Override
    public void start(boolean orderedTickets) {
        super.start(orderedTickets);
        this.executor = new DynamicTaskExecutor(this.initialProcessorCount, this.workAheadSize, DynamicTaskExecutor.DEFAULT_PARK_STRATEGY, this.name());
    }

    @Override
    public long receive(final long ticket, final T batch) {
        long idleTime = this.await(this.catchUp, this.workAheadSize);
        this.receivedBatch();
        this.executor.submit(new Callable<Void>(){

            @Override
            public Void call() {
                ExecutorServiceStep.this.assertHealthy();
                long startTime = System.currentTimeMillis();
                try {
                    Object result = ExecutorServiceStep.this.process(ticket, batch);
                    ExecutorServiceStep.this.totalProcessingTime.add(System.currentTimeMillis() - startTime);
                    if (ExecutorServiceStep.this.orderedTickets) {
                        ExecutorServiceStep.this.await(ExecutorServiceStep.this.rightTicket, ticket);
                    }
                    ExecutorServiceStep.this.sendDownstream(ticket, result);
                    long expectedTicket = ExecutorServiceStep.this.doneBatches.incrementAndGet();
                    assert (!ExecutorServiceStep.this.orderedTickets || expectedTicket == ticket) : "Unexpected ticket " + ticket + ", expected " + expectedTicket;
                    int queueSizeAfterThisJobDone = ExecutorServiceStep.this.queuedBatches.decrementAndGet();
                    assert (queueSizeAfterThisJobDone >= 0) : "Negative queue size " + queueSizeAfterThisJobDone;
                    if (queueSizeAfterThisJobDone == 0) {
                        ExecutorServiceStep.this.lastBatchEndTime.set(System.currentTimeMillis());
                    }
                    ExecutorServiceStep.this.checkNotifyEndDownstream();
                }
                catch (Throwable e) {
                    ExecutorServiceStep.this.issuePanic(e);
                }
                return null;
            }
        });
        return idleTime;
    }

    private void receivedBatch() {
        long lastBatchEnd;
        if (this.queuedBatches.getAndIncrement() == 0 && (lastBatchEnd = this.lastBatchEndTime.get()) != 0L) {
            this.upstreamIdleTime.addAndGet(System.currentTimeMillis() - lastBatchEnd);
        }
    }

    protected abstract Object process(long var1, T var3);

    @Override
    public void close() {
        super.close();
        this.executor.shutdown(true);
    }

    @Override
    public int numberOfProcessors() {
        return this.executor.numberOfProcessors();
    }

    @Override
    public boolean incrementNumberOfProcessors() {
        return this.allowMultipleProcessors ? this.executor.incrementNumberOfProcessors() : false;
    }

    @Override
    public boolean decrementNumberOfProcessors() {
        return this.executor.decrementNumberOfProcessors();
    }
}

