/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.protobuf.Message;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;

public class PubsubUnboundedSink
extends PTransform<PCollection<PubsubMessage>, PDone> {
    static final @UnknownKeyFor @NonNull @Initialized int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
    static final @UnknownKeyFor @NonNull @Initialized int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
    private static final @UnknownKeyFor @NonNull @Initialized Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds((long)2L);
    @VisibleForTesting
    static final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized PubsubClient.OutgoingMessage> CODER = new OutgoingMessageCoder();
    private final @UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory;
    private final @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic;
    private final @Nullable @UnknownKeyFor @Initialized String timestampAttribute;
    private final @Nullable @UnknownKeyFor @Initialized String idAttribute;
    private final @UnknownKeyFor @NonNull @Initialized int numShards;
    private final @UnknownKeyFor @NonNull @Initialized int publishBatchSize;
    private final @UnknownKeyFor @NonNull @Initialized int publishBatchBytes;
    private final @UnknownKeyFor @NonNull @Initialized Duration maxLatency;
    private final @UnknownKeyFor @NonNull @Initialized RecordIdMethod recordIdMethod;
    private final @UnknownKeyFor @NonNull @Initialized String pubsubRootUrl;

    @VisibleForTesting
    PubsubUnboundedSink(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @UnknownKeyFor @NonNull @Initialized String timestampAttribute, @UnknownKeyFor @NonNull @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized int numShards, @UnknownKeyFor @NonNull @Initialized int publishBatchSize, @UnknownKeyFor @NonNull @Initialized int publishBatchBytes, @UnknownKeyFor @NonNull @Initialized Duration maxLatency, @UnknownKeyFor @NonNull @Initialized RecordIdMethod recordIdMethod, @UnknownKeyFor @NonNull @Initialized String pubsubRootUrl) {
        this.pubsubFactory = pubsubFactory;
        this.topic = topic;
        this.timestampAttribute = timestampAttribute;
        this.idAttribute = idAttribute;
        this.numShards = numShards;
        this.publishBatchSize = publishBatchSize;
        this.publishBatchBytes = publishBatchBytes;
        this.maxLatency = maxLatency;
        this.pubsubRootUrl = pubsubRootUrl;
        this.recordIdMethod = idAttribute == null ? RecordIdMethod.NONE : recordIdMethod;
    }

    public PubsubUnboundedSink(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @UnknownKeyFor @NonNull @Initialized String timestampAttribute, @UnknownKeyFor @NonNull @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized int numShards) {
        this(pubsubFactory, topic, timestampAttribute, idAttribute, numShards, 1000, 400000, DEFAULT_MAX_LATENCY, RecordIdMethod.RANDOM, null);
    }

    public PubsubUnboundedSink(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @UnknownKeyFor @NonNull @Initialized String timestampAttribute, @UnknownKeyFor @NonNull @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized int numShards, @UnknownKeyFor @NonNull @Initialized String pubsubRootUrl) {
        this(pubsubFactory, topic, timestampAttribute, idAttribute, numShards, 1000, 400000, DEFAULT_MAX_LATENCY, RecordIdMethod.RANDOM, pubsubRootUrl);
    }

    public PubsubUnboundedSink(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @UnknownKeyFor @NonNull @Initialized String timestampAttribute, @UnknownKeyFor @NonNull @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized int numShards, @UnknownKeyFor @NonNull @Initialized int publishBatchSize, @UnknownKeyFor @NonNull @Initialized int publishBatchBytes) {
        this(pubsubFactory, topic, timestampAttribute, idAttribute, numShards, publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, RecordIdMethod.RANDOM, null);
    }

    public PubsubUnboundedSink(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @UnknownKeyFor @NonNull @Initialized String timestampAttribute, @UnknownKeyFor @NonNull @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized int numShards, @UnknownKeyFor @NonNull @Initialized int publishBatchSize, @UnknownKeyFor @NonNull @Initialized int publishBatchBytes, @UnknownKeyFor @NonNull @Initialized String pubsubRootUrl) {
        this(pubsubFactory, topic, timestampAttribute, idAttribute, numShards, publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, RecordIdMethod.RANDOM, pubsubRootUrl);
    }

    public @UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath getTopic() {
        return (PubsubClient.TopicPath)this.topic.get();
    }

    public @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> getTopicProvider() {
        return this.topic;
    }

    public @Nullable @UnknownKeyFor @Initialized String getTimestampAttribute() {
        return this.timestampAttribute;
    }

    public @Nullable @UnknownKeyFor @Initialized String getIdAttribute() {
        return this.idAttribute;
    }

    public @UnknownKeyFor @NonNull @Initialized PDone expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized PubsubMessage> input) {
        return (PDone)((PCollection)input.apply("Output Serialized PubsubMessage Proto", (PTransform)MapElements.into((TypeDescriptor)new TypeDescriptor<byte[]>(){}).via((SerializableFunction)new PubsubMessages.ParsePayloadAsPubsubMessageProto()))).setCoder((Coder)ByteArrayCoder.of()).apply((PTransform)new PubsubSink(this));
    }

    static class PubsubSink
    extends PTransform<PCollection<byte[]>, PDone> {
        public final @UnknownKeyFor @NonNull @Initialized PubsubUnboundedSink outer;

        PubsubSink(@UnknownKeyFor @NonNull @Initialized PubsubUnboundedSink outer) {
            this.outer = outer;
        }

        public @UnknownKeyFor @NonNull @Initialized PDone expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> input) {
            ((PCollection)((PCollection)((PCollection)input.apply("PubsubUnboundedSink.Window", (PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)Repeatedly.forever((Trigger)AfterFirst.of((Trigger.OnceTrigger[])new Trigger.OnceTrigger[]{AfterPane.elementCountAtLeast((int)this.outer.publishBatchSize), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(this.outer.maxLatency)}))).discardingFiredPanes())).apply("PubsubUnboundedSink.Shard", (PTransform)ParDo.of((DoFn)new ShardFn(this.outer.numShards, this.outer.recordIdMethod)))).setCoder((Coder)KvCoder.of((Coder)VarIntCoder.of(), CODER)).apply((PTransform)GroupByKey.create())).apply("PubsubUnboundedSink.Writer", (PTransform)ParDo.of((DoFn)new WriterFn(this.outer.pubsubFactory, (ValueProvider<PubsubClient.TopicPath>)this.outer.topic, this.outer.timestampAttribute, this.outer.idAttribute, this.outer.publishBatchSize, this.outer.publishBatchBytes, this.outer.pubsubRootUrl)));
            return PDone.in((Pipeline)input.getPipeline());
        }
    }

    private static class WriterFn
    extends DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void> {
        private final @UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory;
        private final @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic;
        private final @UnknownKeyFor @NonNull @Initialized String timestampAttribute;
        private final @UnknownKeyFor @NonNull @Initialized String idAttribute;
        private final @UnknownKeyFor @NonNull @Initialized int publishBatchSize;
        private final @UnknownKeyFor @NonNull @Initialized int publishBatchBytes;
        private final @UnknownKeyFor @NonNull @Initialized String pubsubRootUrl;
        private transient @Nullable @UnknownKeyFor @Initialized PubsubClient pubsubClient;
        private final @UnknownKeyFor @NonNull @Initialized Counter batchCounter = Metrics.counter(WriterFn.class, (String)"batches");
        private final @UnknownKeyFor @NonNull @Initialized Counter elementCounter = SinkMetrics.elementsWritten();
        private final @UnknownKeyFor @NonNull @Initialized Counter byteCounter = SinkMetrics.bytesWritten();

        WriterFn(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @UnknownKeyFor @NonNull @Initialized String timestampAttribute, @UnknownKeyFor @NonNull @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized int publishBatchSize, @UnknownKeyFor @NonNull @Initialized int publishBatchBytes) {
            this.pubsubFactory = pubsubFactory;
            this.topic = topic;
            this.timestampAttribute = timestampAttribute;
            this.idAttribute = idAttribute;
            this.publishBatchSize = publishBatchSize;
            this.publishBatchBytes = publishBatchBytes;
            this.pubsubRootUrl = null;
        }

        WriterFn(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @UnknownKeyFor @NonNull @Initialized String timestampAttribute, @UnknownKeyFor @NonNull @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized int publishBatchSize, @UnknownKeyFor @NonNull @Initialized int publishBatchBytes, @UnknownKeyFor @NonNull @Initialized String pubsubRootUrl) {
            this.pubsubFactory = pubsubFactory;
            this.topic = topic;
            this.timestampAttribute = timestampAttribute;
            this.idAttribute = idAttribute;
            this.publishBatchSize = publishBatchSize;
            this.publishBatchBytes = publishBatchBytes;
            this.pubsubRootUrl = pubsubRootUrl;
        }

        private void publishBatch(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized PubsubClient.OutgoingMessage> messages, @UnknownKeyFor @NonNull @Initialized int bytes) throws @UnknownKeyFor @NonNull @Initialized IOException {
            int n = this.pubsubClient.publish((PubsubClient.TopicPath)this.topic.get(), messages);
            Preconditions.checkState((n == messages.size() ? 1 : 0) != 0, (String)"Attempted to publish %s messages but %s were successful", (int)messages.size(), (int)n);
            this.batchCounter.inc();
            this.elementCounter.inc((long)messages.size());
            this.byteCounter.inc((long)bytes);
        }

        @DoFn.StartBundle
        public void startBundle(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized StartBundleContext c) throws @UnknownKeyFor @NonNull @Initialized Exception {
            Preconditions.checkState((this.pubsubClient == null ? 1 : 0) != 0, (Object)"startBundle invoked without prior finishBundle");
            this.pubsubClient = this.pubsubFactory.newClient(this.timestampAttribute, this.idAttribute, (PubsubOptions)c.getPipelineOptions().as(PubsubOptions.class), this.pubsubRootUrl);
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) throws @UnknownKeyFor @NonNull @Initialized Exception {
            ArrayList<PubsubClient.OutgoingMessage> pubsubMessages = new ArrayList<PubsubClient.OutgoingMessage>(this.publishBatchSize);
            int bytes = 0;
            for (PubsubClient.OutgoingMessage message : (Iterable)((KV)c.element()).getValue()) {
                if (!pubsubMessages.isEmpty() && bytes + message.message().getData().size() > this.publishBatchBytes) {
                    this.publishBatch(pubsubMessages, bytes);
                    pubsubMessages.clear();
                    bytes = 0;
                }
                pubsubMessages.add(message);
                bytes += message.message().getData().size();
            }
            if (!pubsubMessages.isEmpty()) {
                this.publishBatch(pubsubMessages, bytes);
            }
        }

        @DoFn.FinishBundle
        public void finishBundle() throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.pubsubClient.close();
            this.pubsubClient = null;
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"topic", this.topic));
            builder.add(DisplayData.item((String)"transport", (String)this.pubsubFactory.getKind()));
            builder.addIfNotNull(DisplayData.item((String)"timestampAttribute", (String)this.timestampAttribute));
            builder.addIfNotNull(DisplayData.item((String)"idAttribute", (String)this.idAttribute));
        }
    }

    private static class ShardFn
    extends DoFn<byte[], KV<Integer, PubsubClient.OutgoingMessage>> {
        private final @UnknownKeyFor @NonNull @Initialized Counter elementCounter = Metrics.counter(ShardFn.class, (String)"elements");
        private final @UnknownKeyFor @NonNull @Initialized int numShards;
        private final @UnknownKeyFor @NonNull @Initialized RecordIdMethod recordIdMethod;

        ShardFn(@UnknownKeyFor @NonNull @Initialized int numShards, @UnknownKeyFor @NonNull @Initialized RecordIdMethod recordIdMethod) {
            this.numShards = numShards;
            this.recordIdMethod = recordIdMethod;
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.elementCounter.inc();
            com.google.pubsub.v1.PubsubMessage message = com.google.pubsub.v1.PubsubMessage.parseFrom((byte[])((byte[])c.element()));
            byte[] elementBytes = message.getData().toByteArray();
            long timestampMsSinceEpoch = c.timestamp().getMillis();
            String recordId = null;
            switch (this.recordIdMethod) {
                case NONE: {
                    break;
                }
                case DETERMINISTIC: {
                    recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString();
                    break;
                }
                case RANDOM: {
                    recordId = UUID.randomUUID().toString();
                }
            }
            c.output((Object)KV.of((Object)ThreadLocalRandom.current().nextInt(this.numShards), (Object)PubsubClient.OutgoingMessage.of(message, timestampMsSinceEpoch, recordId)));
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"numShards", (Integer)this.numShards));
        }
    }

    @VisibleForTesting
    static enum RecordIdMethod {
        NONE,
        RANDOM,
        DETERMINISTIC;

    }

    private static class OutgoingMessageCoder
    extends AtomicCoder<PubsubClient.OutgoingMessage> {
        private static final @UnknownKeyFor @NonNull @Initialized NullableCoder<@UnknownKeyFor @NonNull @Initialized String> RECORD_ID_CODER = NullableCoder.of((Coder)StringUtf8Coder.of());

        private OutgoingMessageCoder() {
        }

        public void encode(@UnknownKeyFor @NonNull @Initialized PubsubClient.OutgoingMessage value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
            ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode((Message)value.message(), outStream);
            BigEndianLongCoder.of().encode(Long.valueOf(value.timestampMsSinceEpoch()), outStream);
            RECORD_ID_CODER.encode((Object)value.recordId(), outStream);
        }

        public @UnknownKeyFor @NonNull @Initialized PubsubClient.OutgoingMessage decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
            com.google.pubsub.v1.PubsubMessage message = (com.google.pubsub.v1.PubsubMessage)ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).decode(inStream);
            long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
            @Nullable String recordId = (String)RECORD_ID_CODER.decode(inStream);
            return PubsubClient.OutgoingMessage.of(message, timestampMsSinceEpoch, recordId);
        }
    }
}

