/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.function.Factory;
import org.neo4j.function.primitive.PrimitiveLongPredicate;
import org.neo4j.graphdb.Resource;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.Task;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

public abstract class ProcessorStep<T>
extends AbstractStep<T> {
    private TaskExecutor<Sender> executor;
    private final int workAheadSize;
    private final int initialProcessorCount = 1;
    private final int maxProcessors;
    private final PrimitiveLongPredicate catchUp = new PrimitiveLongPredicate(){

        public boolean accept(long queueSizeThreshold) {
            return (long)ProcessorStep.this.queuedBatches.get() <= queueSizeThreshold;
        }
    };
    protected final AtomicLong begunBatches = new AtomicLong();
    private final PrimitiveLongPredicate rightBeginTicket = new PrimitiveLongPredicate(){

        public boolean accept(long ticket) {
            return ProcessorStep.this.begunBatches.get() == ticket;
        }
    };
    private final AtomicLong lastBatchEndTime = new AtomicLong();

    protected ProcessorStep(StageControl control, String name, Configuration config, int maxProcessors, StatsProvider ... additionalStatsProviders) {
        super(control, name, config, additionalStatsProviders);
        this.workAheadSize = config.workAheadSize();
        this.maxProcessors = maxProcessors;
    }

    @Override
    public void start(int orderingGuarantees) {
        super.start(orderingGuarantees);
        this.executor = new DynamicTaskExecutor<Sender>(1, this.maxProcessors, this.workAheadSize, DynamicTaskExecutor.DEFAULT_PARK_STRATEGY, this.name(), new Factory<Sender>(){

            public Sender newInstance() {
                return new Sender();
            }
        });
    }

    @Override
    public long receive(final long ticket, final T batch) {
        long idleTime = this.await(this.catchUp, this.workAheadSize);
        this.incrementQueue();
        this.executor.submit(new Task<Sender>(){

            @Override
            public void run(Sender sender) {
                ProcessorStep.this.assertHealthy();
                sender.initialize(ticket);
                try {
                    if (ProcessorStep.this.guarantees(2)) {
                        ProcessorStep.this.await(ProcessorStep.this.rightBeginTicket, ticket);
                    }
                    try (Resource precondition = ProcessorStep.this.permit(batch);){
                        ProcessorStep.this.begunBatches.incrementAndGet();
                        long startTime = System.currentTimeMillis();
                        ProcessorStep.this.process(batch, sender);
                        if (ProcessorStep.this.downstream == null) {
                            ProcessorStep.this.doneBatches.incrementAndGet();
                        }
                        ProcessorStep.this.totalProcessingTime.add(System.currentTimeMillis() - startTime - sender.sendTime);
                    }
                    ProcessorStep.this.decrementQueue();
                    ProcessorStep.this.checkNotifyEndDownstream();
                }
                catch (Throwable e) {
                    ProcessorStep.this.issuePanic(e);
                }
            }
        });
        return idleTime;
    }

    protected Resource permit(T batch) throws Throwable {
        return Resource.EMPTY;
    }

    private void decrementQueue() {
        int queueSizeAfterThisJobDone = this.queuedBatches.decrementAndGet();
        assert (queueSizeAfterThisJobDone >= 0) : "Negative queue size " + queueSizeAfterThisJobDone;
        if (queueSizeAfterThisJobDone == 0) {
            this.lastBatchEndTime.set(System.currentTimeMillis());
        }
    }

    private void incrementQueue() {
        long lastBatchEnd;
        if (this.queuedBatches.getAndIncrement() == 0 && (lastBatchEnd = this.lastBatchEndTime.get()) != 0L) {
            this.upstreamIdleTime.addAndGet(System.currentTimeMillis() - lastBatchEnd);
        }
    }

    protected abstract void process(T var1, BatchSender var2) throws Throwable;

    @Override
    public void close() throws Exception {
        super.close();
        this.executor.shutdown(this.panic == null);
    }

    @Override
    public int numberOfProcessors() {
        return this.executor.numberOfProcessors();
    }

    @Override
    public boolean incrementNumberOfProcessors() {
        return this.executor.incrementNumberOfProcessors();
    }

    @Override
    public boolean decrementNumberOfProcessors() {
        return this.executor.decrementNumberOfProcessors();
    }

    private void sendDownstream(long ticket, Object batch) {
        if (this.guarantees(1)) {
            this.await(this.rightDoneTicket, ticket);
        }
        this.downstreamIdleTime.addAndGet(this.downstream.receive(ticket, batch));
        this.doneBatches.incrementAndGet();
    }

    @Override
    protected void done() {
        this.lastCallForEmittingOutstandingBatches(new Sender());
        super.done();
    }

    protected void lastCallForEmittingOutstandingBatches(BatchSender sender) {
    }

    private class Sender
    implements BatchSender {
        private long sendTime;
        private long ticket;

        private Sender() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void send(Object batch) {
            long time = System.currentTimeMillis();
            try {
                ProcessorStep.this.sendDownstream(this.ticket, batch);
            }
            finally {
                this.sendTime += System.currentTimeMillis() - time;
            }
        }

        public void initialize(long ticket) {
            this.ticket = ticket;
            this.sendTime = 0L;
        }
    }
}

