/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;

public class NumberSequenceRowSource
implements Source<RowData, NumberSequenceSplit, Collection<NumberSequenceSplit>>,
ResultTypeQueryable<RowData> {
    private static final long serialVersionUID = 1L;
    private final long from;
    private final long to;

    public NumberSequenceRowSource(long from, long to) {
        Preconditions.checkArgument((from <= to ? 1 : 0) != 0, (Object)"'from' must be <= 'to'");
        this.from = from;
        this.to = to;
    }

    public long getFrom() {
        return this.from;
    }

    public long getTo() {
        return this.to;
    }

    public TypeInformation<RowData> getProducedType() {
        return InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{new BigIntType(false)}));
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SourceReader<RowData, NumberSequenceSplit> createReader(SourceReaderContext readerContext) {
        return new IteratorSourceReader(readerContext);
    }

    public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> createEnumerator(SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
        List<NumberSequenceSplit> splits = this.splitNumberRange(this.from, this.to, enumContext.currentParallelism());
        return new IteratorSourceEnumerator(enumContext, splits);
    }

    public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> restoreEnumerator(SplitEnumeratorContext<NumberSequenceSplit> enumContext, Collection<NumberSequenceSplit> checkpoint) {
        return new IteratorSourceEnumerator(enumContext, checkpoint);
    }

    public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer() {
        return new SplitSerializer();
    }

    public SimpleVersionedSerializer<Collection<NumberSequenceSplit>> getEnumeratorCheckpointSerializer() {
        return new CheckpointSerializer();
    }

    protected List<NumberSequenceSplit> splitNumberRange(long from, long to, int numSplits) {
        NumberSequenceIterator[] subSequences = new NumberSequenceIterator(from, to).split(numSplits);
        ArrayList<NumberSequenceSplit> splits = new ArrayList<NumberSequenceSplit>(subSequences.length);
        int splitId = 1;
        for (NumberSequenceIterator seq : subSequences) {
            if (!seq.hasNext()) continue;
            splits.add(new NumberSequenceSplit(String.valueOf(splitId++), seq.getCurrent(), seq.getTo()));
        }
        return splits;
    }

    private static class NumberSequenceIterator
    extends SplittableIterator<RowData> {
        private static final long serialVersionUID = 1L;
        private final long to;
        private long current;

        public NumberSequenceIterator(long from, long to) {
            if (from > to) {
                throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value.");
            }
            this.current = from;
            this.to = to;
        }

        private NumberSequenceIterator(long from, long to, boolean unused) {
            this.current = from;
            this.to = to;
        }

        public long getCurrent() {
            return this.current;
        }

        public long getTo() {
            return this.to;
        }

        public boolean hasNext() {
            return this.current <= this.to;
        }

        public RowData next() {
            if (this.current <= this.to) {
                return GenericRowData.of((Object[])new Object[]{this.current++});
            }
            throw new NoSuchElementException();
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

        public NumberSequenceIterator[] split(int numPartitions) {
            long next;
            long elementsPerSplit;
            if (numPartitions < 1) {
                throw new IllegalArgumentException("The number of partitions must be at least 1.");
            }
            if (numPartitions == 1) {
                return new NumberSequenceIterator[]{new NumberSequenceIterator(this.current, this.to)};
            }
            if (this.to - this.current + 1L >= 0L) {
                elementsPerSplit = (this.to - this.current + 1L) / (long)numPartitions;
            } else {
                long posFrom;
                long halfDiff = this.current == Long.MIN_VALUE ? 0x4000000000000000L + this.to / 2L : ((posFrom = -this.current) > this.to ? this.to + (posFrom - this.to) / 2L : posFrom + (this.to - posFrom) / 2L);
                elementsPerSplit = halfDiff / (long)numPartitions * 2L;
            }
            long numWithExtra = -(elementsPerSplit * (long)numPartitions) + this.to - this.current + 1L;
            if (numWithExtra > (long)numPartitions) {
                ++elementsPerSplit;
                if ((numWithExtra -= (long)numPartitions) > (long)numPartitions) {
                    throw new RuntimeException("Bug in splitting logic. Too much rounding loss.");
                }
            }
            NumberSequenceIterator[] iters = new NumberSequenceIterator[numPartitions];
            long curr = this.current;
            int i = 0;
            while ((long)i < numWithExtra) {
                next = curr + elementsPerSplit + 1L;
                iters[i] = new NumberSequenceIterator(curr, next - 1L);
                curr = next;
                ++i;
            }
            while (i < numPartitions) {
                next = curr + elementsPerSplit;
                iters[i] = new NumberSequenceIterator(curr, next - 1L, true);
                curr = next;
                ++i;
            }
            return iters;
        }

        public int getMaximumNumberOfSplits() {
            if (this.to >= Integer.MAX_VALUE || this.current <= Integer.MIN_VALUE || this.to - this.current + 1L >= Integer.MAX_VALUE) {
                return Integer.MAX_VALUE;
            }
            return (int)(this.to - this.current + 1L);
        }
    }

    private static final class CheckpointSerializer
    implements SimpleVersionedSerializer<Collection<NumberSequenceSplit>> {
        private static final int CURRENT_VERSION = 1;

        private CheckpointSerializer() {
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(Collection<NumberSequenceSplit> checkpoint) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(checkpoint.size() * 22 + 4);
            out.writeInt(checkpoint.size());
            for (NumberSequenceSplit split : checkpoint) {
                SplitSerializer.serializeV1((DataOutputView)out, split);
            }
            return out.getCopyOfBuffer();
        }

        public Collection<NumberSequenceSplit> deserialize(int version, byte[] serialized) throws IOException {
            if (version != 1) {
                throw new IOException("Unrecognized version: " + version);
            }
            DataInputDeserializer in = new DataInputDeserializer(serialized);
            int num = in.readInt();
            ArrayList<NumberSequenceSplit> result = new ArrayList<NumberSequenceSplit>(num);
            for (int remaining = num; remaining > 0; --remaining) {
                result.add(SplitSerializer.deserializeV1((DataInputView)in));
            }
            return result;
        }
    }

    private static final class SplitSerializer
    implements SimpleVersionedSerializer<NumberSequenceSplit> {
        private static final int CURRENT_VERSION = 1;

        private SplitSerializer() {
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(NumberSequenceSplit split) throws IOException {
            Preconditions.checkArgument((split.getClass() == NumberSequenceSplit.class ? 1 : 0) != 0, (Object)"cannot serialize subclasses");
            DataOutputSerializer out = new DataOutputSerializer(split.splitId().length() + 18);
            SplitSerializer.serializeV1((DataOutputView)out, split);
            return out.getCopyOfBuffer();
        }

        public NumberSequenceSplit deserialize(int version, byte[] serialized) throws IOException {
            if (version != 1) {
                throw new IOException("Unrecognized version: " + version);
            }
            DataInputDeserializer in = new DataInputDeserializer(serialized);
            return SplitSerializer.deserializeV1((DataInputView)in);
        }

        static void serializeV1(DataOutputView out, NumberSequenceSplit split) throws IOException {
            out.writeUTF(split.splitId());
            out.writeLong(split.from());
            out.writeLong(split.to());
        }

        static NumberSequenceSplit deserializeV1(DataInputView in) throws IOException {
            return new NumberSequenceSplit(in.readUTF(), in.readLong(), in.readLong());
        }
    }

    public static class NumberSequenceSplit
    implements IteratorSourceSplit<RowData, NumberSequenceIterator> {
        private final String splitId;
        private final long from;
        private final long to;

        public NumberSequenceSplit(String splitId, long from, long to) {
            Preconditions.checkArgument((from <= to ? 1 : 0) != 0, (Object)"'from' must be <= 'to'");
            this.splitId = (String)Preconditions.checkNotNull((Object)splitId);
            this.from = from;
            this.to = to;
        }

        public String splitId() {
            return this.splitId;
        }

        public long from() {
            return this.from;
        }

        public long to() {
            return this.to;
        }

        public NumberSequenceIterator getIterator() {
            return new NumberSequenceIterator(this.from, this.to);
        }

        public IteratorSourceSplit<RowData, NumberSequenceIterator> getUpdatedSplitForIterator(NumberSequenceIterator iterator) {
            return new NumberSequenceSplit(this.splitId, iterator.getCurrent(), iterator.getTo());
        }

        public String toString() {
            return String.format("NumberSequenceSplit [%d, %d] (%s)", this.from, this.to, this.splitId);
        }
    }
}

