package org.apache.beam.runners.dataflow;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

/* loaded from: input_file:org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.class */
public class GroupIntoBatchesOverride {
    private static final UUID workerUuid = UUID.randomUUID();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/GroupIntoBatchesOverride$BatchGroupIntoBatches.class */
    public static class BatchGroupIntoBatches<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
        private final GroupIntoBatches.BatchingParams<V> batchingParams;

        private BatchGroupIntoBatches(GroupIntoBatches.BatchingParams<V> batchingParams) {
            this.batchingParams = batchingParams;
        }

        public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> pCollection) {
            final SerializableFunction weigher = this.batchingParams.getWeigher((Coder) pCollection.getCoder().getCoderArguments().get(1));
            final long batchSize = this.batchingParams.getBatchSize();
            final long batchSizeBytes = this.batchingParams.getBatchSizeBytes();
            return pCollection.apply("GroupAll", GroupByKey.create()).apply("SplitIntoBatches", ParDo.of(new DoFn<KV<K, Iterable<V>>, KV<K, Iterable<V>>>() { // from class: org.apache.beam.runners.dataflow.GroupIntoBatchesOverride.BatchGroupIntoBatches.1
                @DoFn.ProcessElement
                public void process(DoFn<KV<K, Iterable<V>>, KV<K, Iterable<V>>>.ProcessContext processContext) {
                    ArrayList newArrayList = Lists.newArrayList();
                    long j = 0;
                    for (Object obj : (Iterable) ((KV) processContext.element()).getValue()) {
                        newArrayList.add(obj);
                        if (weigher != null) {
                            j += ((Long) weigher.apply(obj)).longValue();
                        }
                        if (newArrayList.size() == batchSize || (batchSizeBytes != Long.MAX_VALUE && j >= batchSizeBytes)) {
                            processContext.output(KV.of(((KV) processContext.element()).getKey(), newArrayList));
                            newArrayList = Lists.newArrayList();
                            j = 0;
                        }
                    }
                    if (newArrayList.isEmpty()) {
                        return;
                    }
                    processContext.output(KV.of(((KV) processContext.element()).getKey(), newArrayList));
                }
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/GroupIntoBatchesOverride$BatchGroupIntoBatchesOverrideFactory.class */
    public static class BatchGroupIntoBatchesOverrideFactory<K, V> implements PTransformOverrideFactory<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupIntoBatches<K, V>> {
        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupIntoBatches<K, V>> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new BatchGroupIntoBatches(appliedPTransform.getTransform().getBatchingParams()));
        }

        public Map<PCollection<?>, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PCollection<?>> map, PCollection<KV<K, Iterable<V>>> pCollection) {
            return ReplacementOutputs.singleton(map, pCollection);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PCollection<?>>) map, (PCollection) pOutput);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/dataflow/GroupIntoBatchesOverride$BatchGroupIntoBatchesWithShardedKey.class */
    static class BatchGroupIntoBatchesWithShardedKey<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>> {
        private final GroupIntoBatches.BatchingParams<V> batchingParams;

        private BatchGroupIntoBatchesWithShardedKey(GroupIntoBatches.BatchingParams<V> batchingParams) {
            this.batchingParams = batchingParams;
        }

        public PCollection<KV<ShardedKey<K>, Iterable<V>>> expand(PCollection<KV<K, V>> pCollection) {
            return GroupIntoBatchesOverride.shardKeys(pCollection).apply(new BatchGroupIntoBatches(this.batchingParams));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/GroupIntoBatchesOverride$BatchGroupIntoBatchesWithShardedKeyOverrideFactory.class */
    public static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V> implements PTransformOverrideFactory<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>, GroupIntoBatches<K, V>.WithShardedKey> {
        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>, GroupIntoBatches<K, V>.WithShardedKey> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new BatchGroupIntoBatchesWithShardedKey(appliedPTransform.getTransform().getBatchingParams()));
        }

        public Map<PCollection<?>, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PCollection<?>> map, PCollection<KV<ShardedKey<K>, Iterable<V>>> pCollection) {
            return ReplacementOutputs.singleton(map, pCollection);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PCollection<?>>) map, (PCollection) pOutput);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/dataflow/GroupIntoBatchesOverride$StreamingGroupIntoBatchesWithShardedKey.class */
    static class StreamingGroupIntoBatchesWithShardedKey<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>> {
        private final transient DataflowRunner runner;
        private final GroupIntoBatches<K, V>.WithShardedKey originalTransform;
        private final transient PCollection<KV<ShardedKey<K>, Iterable<V>>> originalOutput;

        public StreamingGroupIntoBatchesWithShardedKey(DataflowRunner dataflowRunner, GroupIntoBatches<K, V>.WithShardedKey withShardedKey, PCollection<KV<ShardedKey<K>, Iterable<V>>> pCollection) {
            this.runner = dataflowRunner;
            this.originalTransform = withShardedKey;
            this.originalOutput = pCollection;
        }

        public PCollection<KV<ShardedKey<K>, Iterable<V>>> expand(PCollection<KV<K, V>> pCollection) {
            this.runner.maybeRecordPCollectionWithAutoSharding(this.originalOutput);
            return pCollection.apply(this.originalTransform);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/GroupIntoBatchesOverride$StreamingGroupIntoBatchesWithShardedKeyOverrideFactory.class */
    public static class StreamingGroupIntoBatchesWithShardedKeyOverrideFactory<K, V> implements PTransformOverrideFactory<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>, GroupIntoBatches<K, V>.WithShardedKey> {
        private final DataflowRunner runner;

        /* JADX INFO: Access modifiers changed from: package-private */
        public StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(DataflowRunner dataflowRunner) {
            this.runner = dataflowRunner;
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>, GroupIntoBatches<K, V>.WithShardedKey> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new StreamingGroupIntoBatchesWithShardedKey(this.runner, appliedPTransform.getTransform(), PTransformReplacements.getSingletonMainOutput(appliedPTransform)));
        }

        public Map<PCollection<?>, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PCollection<?>> map, PCollection<KV<ShardedKey<K>, Iterable<V>>> pCollection) {
            return ReplacementOutputs.singleton(map, pCollection);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PCollection<?>>) map, (PCollection) pOutput);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V> PCollection<KV<ShardedKey<K>, V>> shardKeys(PCollection<KV<K, V>> pCollection) {
        KvCoder coder = pCollection.getCoder();
        Coder coder2 = (Coder) coder.getCoderArguments().get(0);
        return pCollection.apply("Shard Keys", MapElements.via(new SimpleFunction<KV<K, V>, KV<ShardedKey<K>, V>>() { // from class: org.apache.beam.runners.dataflow.GroupIntoBatchesOverride.1
            public KV<ShardedKey<K>, V> apply(KV<K, V> kv) {
                long id = Thread.currentThread().getId();
                ByteBuffer allocate = ByteBuffer.allocate(24);
                allocate.putLong(GroupIntoBatchesOverride.workerUuid.getMostSignificantBits());
                allocate.putLong(GroupIntoBatchesOverride.workerUuid.getLeastSignificantBits());
                allocate.putLong(id);
                return KV.of(ShardedKey.of(kv.getKey(), allocate.array()), kv.getValue());
            }
        })).setCoder(KvCoder.of(ShardedKey.Coder.of(coder2), (Coder) coder.getCoderArguments().get(1)));
    }
}
