/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source.legacy;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.util.Preconditions;

@Internal
public class StatefulSequenceSource
extends RichParallelSourceFunction<Long>
implements CheckpointedFunction {
    private static final long serialVersionUID = 1L;
    private final long start;
    private final long end;
    private volatile boolean isRunning = true;
    private transient Deque<Long> valuesToEmit;
    private transient ListState<Long> checkpointedState;

    public StatefulSequenceSource(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        Preconditions.checkState(this.checkpointedState == null, "The " + this.getClass().getSimpleName() + " has already been initialized.");
        this.checkpointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("stateful-sequence-source-state", LongSerializer.INSTANCE));
        this.valuesToEmit = new ArrayDeque<Long>();
        if (context.isRestored()) {
            for (Long v : (Iterable)this.checkpointedState.get()) {
                this.valuesToEmit.add(v);
            }
        } else {
            int stepSize = this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
            int taskIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            long congruence = this.start + (long)taskIdx;
            long totalNoOfElements = Math.abs(this.end - this.start + 1L);
            int baseSize = StatefulSequenceSource.safeDivide(totalNoOfElements, stepSize);
            int toCollect = totalNoOfElements % (long)stepSize > (long)taskIdx ? baseSize + 1 : baseSize;
            for (long collected = 0L; collected < (long)toCollect; ++collected) {
                this.valuesToEmit.add(collected * (long)stepSize + congruence);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
        while (this.isRunning && !this.valuesToEmit.isEmpty()) {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                ctx.collect(this.valuesToEmit.poll());
            }
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkState(this.checkpointedState != null, "The " + this.getClass().getSimpleName() + " state has not been properly initialized.");
        this.checkpointedState.update(new ArrayList<Long>(this.valuesToEmit));
    }

    private static int safeDivide(long left, long right) {
        Preconditions.checkArgument(right > 0L);
        Preconditions.checkArgument(left >= 0L);
        Preconditions.checkArgument(left <= Integer.MAX_VALUE * right);
        return (int)(left / right);
    }
}

