/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

public abstract class ExecutorServiceStep<T>
extends AbstractStep<T> {
    private final ExecutorService executor;
    private final int workAheadSize;
    private final AtomicLong lastBatchEndTime = new AtomicLong();

    protected ExecutorServiceStep(StageControl control, String name, int workAheadSize, int numberOfExecutors) {
        super(control, name);
        this.workAheadSize = workAheadSize;
        NamedThreadFactory threadFactory = new NamedThreadFactory(name);
        this.executor = numberOfExecutors == 1 ? Executors.newSingleThreadExecutor(threadFactory) : Executors.newFixedThreadPool(numberOfExecutors, threadFactory);
    }

    @Override
    public long receive(final long ticket, final T batch) {
        long idleTime = this.awaitDownstreamToCatchUp(this.workAheadSize) + this.awaitTicket(ticket);
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                ExecutorServiceStep.this.assertHealthy();
                long startTime = ExecutorServiceStep.this.startProcessingTimer();
                try {
                    Object result = ExecutorServiceStep.this.process(ticket, batch);
                    ExecutorServiceStep.this.endProcessingTimer(startTime);
                    ExecutorServiceStep.this.doneBatches.incrementAndGet();
                    ExecutorServiceStep.this.sendDownstream(ticket, result);
                }
                catch (Throwable e) {
                    ExecutorServiceStep.this.issuePanic(e);
                }
            }
        });
        this.ticketReceived(ticket);
        return idleTime;
    }

    private long startProcessingTimer() {
        long startTime = System.currentTimeMillis();
        this.updateUpstreamIdleTime(startTime);
        return startTime;
    }

    private void endProcessingTimer(long startTime) {
        long endTime = System.currentTimeMillis();
        this.totalProcessingTime.addAndGet(endTime - startTime);
        this.lastBatchEndTime.set(endTime);
    }

    private void updateUpstreamIdleTime(long startTime) {
        if (this.lastBatchEndTime.get() != 0L) {
            this.upstreamIdleTime.addAndGet(startTime - this.lastBatchEndTime.get());
        }
    }

    private long awaitDownstreamToCatchUp(int queueSizeThreshold) {
        if (this.receivedBatches.get() - this.doneBatches.get() > (long)queueSizeThreshold) {
            long startTime = System.currentTimeMillis();
            while (this.receivedBatches.get() - this.doneBatches.get() > (long)queueSizeThreshold) {
                this.waitSome();
            }
            return System.currentTimeMillis() - startTime;
        }
        return 0L;
    }

    protected abstract Object process(long var1, T var3);

    @Override
    protected void done() {
        this.executor.shutdown();
    }
}

