/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.lang.reflect.Array;
import java.util.Arrays;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

public abstract class ProducerStep<T>
extends AbstractStep<Void> {
    private final int batchSize;
    private final Class<T> itemClass;

    public ProducerStep(StageControl control, String name, int batchSize, int movingAverageSize, Class<T> itemClass) {
        super(control, name, movingAverageSize);
        this.batchSize = batchSize;
        this.itemClass = itemClass;
    }

    protected abstract T nextOrNull();

    @Override
    public long receive(long ticket, Void nothing) {
        new Thread("PRODUCER"){

            @Override
            public void run() {
                ProducerStep.this.assertHealthy();
                try {
                    ProducerStep.this.process();
                    ProducerStep.this.endOfUpstream();
                }
                catch (Throwable e) {
                    ProducerStep.this.issuePanic(e, false);
                }
            }
        }.start();
        return 0L;
    }

    protected void process() {
        T[] batch = this.newBatch();
        int size = 0;
        long startTime = System.currentTimeMillis();
        Object next = null;
        while (true) {
            T t = this.nextOrNull();
            next = t;
            if (t == null) break;
            batch[size++] = next;
            if (size != this.batchSize) continue;
            this.totalProcessingTime.add(System.currentTimeMillis() - startTime);
            this.sendDownstream(this.nextTicket(), this.constructBatch(batch));
            batch = this.newBatch();
            size = 0;
            this.assertHealthy();
            startTime = System.currentTimeMillis();
        }
        if (size > 0) {
            this.totalProcessingTime.add(System.currentTimeMillis() - startTime);
            this.sendDownstream(this.nextTicket(), this.constructBatch(Arrays.copyOf(batch, size)));
        }
    }

    protected Object constructBatch(T[] batch) {
        return batch;
    }

    private T[] newBatch() {
        return (Object[])Array.newInstance(this.itemClass, this.batchSize);
    }

    private long nextTicket() {
        return this.doneBatches.incrementAndGet();
    }
}

