/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.HashSet;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CompressedSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public abstract class ReadAllViaFileBasedSourceTransform<@UnknownKeyFor InT, @UnknownKeyFor T>
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
    public static final @UnknownKeyFor @NonNull @Initialized boolean DEFAULT_USES_RESHUFFLE = true;
    protected final @UnknownKeyFor @NonNull @Initialized long desiredBundleSizeBytes;
    protected final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized FileBasedSource<InT>> createSource;
    protected final @UnknownKeyFor @NonNull @Initialized Coder<T> coder;
    protected final @UnknownKeyFor @NonNull @Initialized ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler;
    protected final @UnknownKeyFor @NonNull @Initialized boolean usesReshuffle;

    public ReadAllViaFileBasedSourceTransform(@UnknownKeyFor @NonNull @Initialized long desiredBundleSizeBytes, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized FileBasedSource<InT>> createSource, @UnknownKeyFor @NonNull @Initialized Coder<T> coder) {
        this(desiredBundleSizeBytes, createSource, coder, true, new ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler());
    }

    public ReadAllViaFileBasedSourceTransform(@UnknownKeyFor @NonNull @Initialized long desiredBundleSizeBytes, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized FileBasedSource<InT>> createSource, @UnknownKeyFor @NonNull @Initialized Coder<T> coder, @UnknownKeyFor @NonNull @Initialized boolean usesReshuffle, @UnknownKeyFor @NonNull @Initialized ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler) {
        this.desiredBundleSizeBytes = desiredBundleSizeBytes;
        this.createSource = createSource;
        this.coder = coder;
        this.usesReshuffle = usesReshuffle;
        this.exceptionHandler = exceptionHandler;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized PCollection<T> expand(@UnknownKeyFor @NonNull @Initialized PCollection< @UnknownKeyFor @NonNull @Initialized FileIO.ReadableFile> input) {
        PCollection ranges = (PCollection)((Object)input.apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(this.desiredBundleSizeBytes))));
        if (this.usesReshuffle) {
            ranges = (PCollection)ranges.apply("Reshuffle", Reshuffle.viaRandomKey());
        }
        return ((PCollection)ranges.apply("Read ranges", ParDo.of(this.readRangesFn()))).setCoder(this.coder);
    }

    protected abstract @UnknownKeyFor @NonNull @Initialized DoFn<@UnknownKeyFor @NonNull @Initialized KV< @UnknownKeyFor @NonNull @Initialized FileIO.ReadableFile, @UnknownKeyFor @NonNull @Initialized OffsetRange>, T> readRangesFn();

    public static abstract class AbstractReadFileRangesFn<@UnknownKeyFor InT, @UnknownKeyFor T>
    extends DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> {
        private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized FileBasedSource<InT>> createSource;
        private final @UnknownKeyFor @NonNull @Initialized ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler;

        public AbstractReadFileRangesFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized FileBasedSource<InT>> createSource, @UnknownKeyFor @NonNull @Initialized ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler) {
            this.createSource = createSource;
            this.exceptionHandler = exceptionHandler;
        }

        protected abstract T makeOutput( @UnknownKeyFor @NonNull @Initialized FileIO.ReadableFile var1, @UnknownKeyFor @NonNull @Initialized OffsetRange var2, @UnknownKeyFor @NonNull @Initialized FileBasedSource<InT> var3,  @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<InT> var4);

        @DoFn.ProcessElement
        @SuppressFBWarnings(value={"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"}, justification="https://github.com/spotbugs/spotbugs/issues/756")
        public void process(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) throws @UnknownKeyFor @NonNull @Initialized IOException {
            block9: {
                FileIO.ReadableFile file = (FileIO.ReadableFile)((KV)c.element()).getKey();
                OffsetRange range = (OffsetRange)((KV)c.element()).getValue();
                ResourceId resourceId = file.getMetadata().resourceId();
                CompressedSource<InT> source = CompressedSource.from(this.createSource.apply(resourceId.toString())).withCompression(file.getCompression());
                try (BoundedSource.BoundedReader reader = ((FileBasedSource)source).createForSubrangeOfFile(file.getMetadata(), range.getFrom(), range.getTo()).createReader(c.getPipelineOptions());){
                    boolean more = reader.start();
                    while (more) {
                        c.output(this.makeOutput(file, range, source, reader));
                        more = reader.advance();
                    }
                }
                catch (RuntimeException e) {
                    if (!this.exceptionHandler.apply(file, range, e)) break block9;
                    throw e;
                }
            }
        }
    }

    public static class SplitIntoRangesFn
    extends DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>> {
        private final @UnknownKeyFor @NonNull @Initialized long desiredBundleSizeBytes;
        private transient @Nullable @UnknownKeyFor @Initialized HashSet<@UnknownKeyFor @NonNull @Initialized ResourceId> uniqueIds;

        public SplitIntoRangesFn(@UnknownKeyFor @NonNull @Initialized long desiredBundleSizeBytes) {
            this.desiredBundleSizeBytes = desiredBundleSizeBytes;
        }

        @DoFn.ProcessElement
        public void process(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
            MatchResult.Metadata metadata = ((FileIO.ReadableFile)c.element()).getMetadata();
            this.reportSourceLineage(metadata.resourceId());
            if (!metadata.isReadSeekEfficient()) {
                c.output(KV.of((FileIO.ReadableFile)c.element(), new OffsetRange(0L, metadata.sizeBytes())));
                return;
            }
            for (OffsetRange range : new OffsetRange(0L, metadata.sizeBytes()).split(this.desiredBundleSizeBytes, 0L)) {
                c.output(KV.of((FileIO.ReadableFile)c.element(), range));
            }
        }

        private void reportSourceLineage(@UnknownKeyFor @NonNull @Initialized ResourceId resourceId) {
            if (this.uniqueIds == null) {
                this.uniqueIds = new HashSet();
            } else if (this.uniqueIds.isEmpty()) {
                FileSystems.reportSourceLineage(resourceId, FileSystem.LineageLevel.TOP_LEVEL);
                return;
            }
            this.uniqueIds.add(resourceId);
            FileSystems.reportSourceLineage(resourceId, FileSystem.LineageLevel.FILE);
            if (this.uniqueIds.size() >= 100) {
                this.uniqueIds.clear();
            }
        }
    }
}

