/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformReplacements;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ReplacementOutputs;
import org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;

class WriteWithShardingFactory<InputT, DestinationT>
implements PTransformOverrideFactory<PCollection<InputT>, WriteFilesResult<DestinationT>, PTransform<PCollection<InputT>, WriteFilesResult<DestinationT>>> {
    static final int MAX_RANDOM_EXTRA_SHARDS = 3;
    @VisibleForTesting
    static final int MIN_SHARDS_FOR_LOG = 3;

    WriteWithShardingFactory() {
    }

    public PTransformOverrideFactory.PTransformReplacement<PCollection<InputT>, WriteFilesResult<DestinationT>> getReplacementTransform(AppliedPTransform<PCollection<InputT>, WriteFilesResult<DestinationT>, PTransform<PCollection<InputT>, WriteFilesResult<DestinationT>>> transform) {
        try {
            WriteFiles replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform)).withSideInputs(WriteFilesTranslation.getDynamicDestinationSideInputs(transform)).withSharding(new LogElementShardsWithDrift());
            if (WriteFilesTranslation.isWindowedWrites(transform)) {
                replacement = replacement.withWindowedWrites();
            }
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform)replacement);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
        return ReplacementOutputs.tagged(outputs, newOutput);
    }

    private static class BoundedRandomIntSupplier
    implements Supplier<Integer>,
    Serializable {
        private final int upperBound;

        private BoundedRandomIntSupplier(int upperBound) {
            this.upperBound = upperBound;
        }

        public Integer get() {
            return ThreadLocalRandom.current().nextInt(0, this.upperBound);
        }
    }

    @VisibleForTesting
    static class CalculateShardsFn
    extends DoFn<Long, Integer> {
        private final Supplier<Integer> extraShardsSupplier;

        public CalculateShardsFn() {
            this(new BoundedRandomIntSupplier(3));
        }

        @VisibleForTesting
        CalculateShardsFn(int constantExtraShards) {
            this((Supplier<Integer>)Suppliers.ofInstance((Object)constantExtraShards));
        }

        private CalculateShardsFn(Supplier<Integer> extraShardsSupplier) {
            this.extraShardsSupplier = extraShardsSupplier;
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext ctxt) {
            ctxt.output((Object)this.calculateShards((Long)ctxt.element()));
        }

        private int calculateShards(long totalRecords) {
            if (totalRecords == 0L) {
                return 1;
            }
            int extraShards = (Integer)this.extraShardsSupplier.get();
            if (totalRecords < (long)(3 + extraShards)) {
                return (int)totalRecords;
            }
            int floorLogRecs = (int)Math.log10(totalRecords);
            return Math.max(floorLogRecs, 3) + extraShards;
        }
    }

    private static class LogElementShardsWithDrift<T>
    extends PTransform<PCollection<T>, PCollectionView<Integer>> {
        private LogElementShardsWithDrift() {
        }

        public PCollectionView<Integer> expand(PCollection<T> records) {
            return (PCollectionView)((PCollection)((PCollection)((PCollection)records.apply((PTransform)Window.into((WindowFn)new GlobalWindows()))).apply("CountRecords", Count.globally())).apply("GenerateShardCount", (PTransform)ParDo.of((DoFn)new CalculateShardsFn()))).apply((PTransform)View.asSingleton());
        }
    }
}

