/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.util.Queue;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.legacy.YieldingOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.FlinkInputSplit;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingReaderOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<FlinkInputSplit, RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingReaderOperator.class);
    private final MailboxExecutor executor;
    private FlinkInputFormat format;
    private transient SourceFunction.SourceContext<RowData> sourceContext;
    private transient ListState<FlinkInputSplit> inputSplitsState;
    private transient Queue<FlinkInputSplit> splits;
    private transient SplitState currentSplitState;

    private StreamingReaderOperator(StreamOperatorParameters<RowData> parameters, FlinkInputFormat format, ProcessingTimeService timeService, MailboxExecutor mailboxExecutor) {
        super(parameters);
        this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null.");
        this.processingTimeService = timeService;
        this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null.");
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.inputSplitsState = context.getOperatorStateStore().getListState(new ListStateDescriptor("splits", (TypeSerializer)new JavaSerializer()));
        this.currentSplitState = SplitState.IDLE;
        this.splits = Lists.newLinkedList();
        if (context.isRestored()) {
            int subtaskIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            LOG.info("Restoring state for the {} (taskIdx: {}).", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)subtaskIdx);
            for (FlinkInputSplit split : (Iterable)this.inputSplitsState.get()) {
                this.splits.add(split);
            }
        }
        this.sourceContext = StreamSourceContexts.getSourceContext((ProcessingTimeService)this.getProcessingTimeService(), (Object)new Object(), (Output)this.output, (long)this.getExecutionConfig().getAutoWatermarkInterval(), (long)-1L, (boolean)true);
        this.enqueueProcessSplits();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.inputSplitsState.clear();
        this.inputSplitsState.addAll(Lists.newArrayList(this.splits));
    }

    public void processElement(StreamRecord<FlinkInputSplit> element) {
        this.splits.add((FlinkInputSplit)((Object)element.getValue()));
        this.enqueueProcessSplits();
    }

    private void enqueueProcessSplits() {
        if (this.currentSplitState == SplitState.IDLE && !this.splits.isEmpty()) {
            this.currentSplitState = SplitState.RUNNING;
            this.executor.execute(this::processSplits, ((Object)((Object)this)).getClass().getSimpleName());
        }
    }

    private void processSplits() throws IOException {
        FlinkInputSplit split = this.splits.poll();
        if (split == null) {
            this.currentSplitState = SplitState.IDLE;
            return;
        }
        this.format.open(split);
        try {
            RowData nextElement = null;
            while (!this.format.reachedEnd()) {
                nextElement = this.format.nextRecord(nextElement);
                this.sourceContext.collect((Object)nextElement);
            }
        }
        finally {
            this.currentSplitState = SplitState.IDLE;
            this.format.close();
        }
        this.enqueueProcessSplits();
    }

    public void processWatermark(Watermark mark) {
    }

    public void close() throws Exception {
        super.close();
        if (this.format != null) {
            this.format.close();
            this.format.closeInputFormat();
            this.format = null;
        }
        this.sourceContext = null;
    }

    public void finish() throws Exception {
        super.finish();
        this.output.close();
        if (this.sourceContext != null) {
            this.sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.sourceContext.close();
            this.sourceContext = null;
        }
    }

    static OneInputStreamOperatorFactory<FlinkInputSplit, RowData> factory(FlinkInputFormat format) {
        return new OperatorFactory(format);
    }

    private static enum SplitState {
        IDLE,
        RUNNING;

    }

    private static class OperatorFactory
    extends AbstractStreamOperatorFactory<RowData>
    implements YieldingOperatorFactory<RowData>,
    OneInputStreamOperatorFactory<FlinkInputSplit, RowData> {
        private final FlinkInputFormat format;
        private transient MailboxExecutor mailboxExecutor;

        private OperatorFactory(FlinkInputFormat format) {
            this.format = format;
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
            StreamingReaderOperator operator = new StreamingReaderOperator(parameters, this.format, this.processingTimeService, this.mailboxExecutor);
            return (O)((Object)operator);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return StreamingReaderOperator.class;
        }
    }
}

