/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import com.google.auto.value.AutoValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.naming.SizeLimitExceededException;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubIO_Read;
import org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubIO_Write;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessagePayloadOnlyCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithMessageIdCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubIO {
    private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
    private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;
    private static final Pattern PROJECT_ID_REGEXP = Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
    private static final Pattern SUBSCRIPTION_REGEXP = Pattern.compile("projects/([^/]+)/subscriptions/(.+)");
    private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)");
    private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP = Pattern.compile("/subscriptions/([^/]+)/(.+)");
    private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)");
    private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
    private static final int PUBSUB_NAME_MIN_LENGTH = 3;
    private static final int PUBSUB_NAME_MAX_LENGTH = 255;
    private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
    private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
    private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";

    private static void validateProjectName(String project) {
        Matcher match = PROJECT_ID_REGEXP.matcher(project);
        if (!match.matches()) {
            throw new IllegalArgumentException("Illegal project name specified in Pubsub subscription: " + project);
        }
    }

    private static void validatePubsubName(String name) {
        if (name.length() < 3) {
            throw new IllegalArgumentException("Pubsub object name is shorter than 3 characters: " + name);
        }
        if (name.length() > 255) {
            throw new IllegalArgumentException("Pubsub object name is longer than 255 characters: " + name);
        }
        if (name.startsWith("goog")) {
            throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name);
        }
        Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
        if (!match.matches()) {
            throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name + " Please see Javadoc for naming rules.");
        }
    }

    private static void populateCommonDisplayData(DisplayData.Builder builder, String timestampAttribute, String idAttribute, ValueProvider<PubsubTopic> topic) {
        builder.addIfNotNull(DisplayData.item((String)"timestampAttribute", (String)timestampAttribute).withLabel("Timestamp Attribute")).addIfNotNull(DisplayData.item((String)"idAttribute", (String)idAttribute).withLabel("ID Attribute")).addIfNotNull(DisplayData.item((String)"topic", topic).withLabel("Pubsub Topic"));
    }

    private static <T> Read<T> read() {
        return new AutoValue_PubsubIO_Read.Builder().setNeedsAttributes(false).setNeedsMessageId(false).setPubsubClientFactory(FACTORY).build();
    }

    public static Read<PubsubMessage> readMessages() {
        return new AutoValue_PubsubIO_Read.Builder().setPubsubClientFactory(FACTORY).setCoder(PubsubMessagePayloadOnlyCoder.of()).setParseFn(new IdentityMessageFn()).setNeedsAttributes(false).setNeedsMessageId(false).build();
    }

    public static Read<PubsubMessage> readMessagesWithMessageId() {
        return new AutoValue_PubsubIO_Read.Builder().setPubsubClientFactory(FACTORY).setCoder(PubsubMessageWithMessageIdCoder.of()).setParseFn(new IdentityMessageFn()).setNeedsAttributes(false).setNeedsMessageId(true).build();
    }

    public static Read<PubsubMessage> readMessagesWithAttributes() {
        return new AutoValue_PubsubIO_Read.Builder().setPubsubClientFactory(FACTORY).setCoder(PubsubMessageWithAttributesCoder.of()).setParseFn(new IdentityMessageFn()).setNeedsAttributes(true).setNeedsMessageId(false).build();
    }

    public static Read<PubsubMessage> readMessagesWithAttributesAndMessageId() {
        return new AutoValue_PubsubIO_Read.Builder().setPubsubClientFactory(FACTORY).setCoder(PubsubMessageWithAttributesAndMessageIdCoder.of()).setParseFn(new IdentityMessageFn()).setNeedsAttributes(true).setNeedsMessageId(true).build();
    }

    public static Read<String> readStrings() {
        return new AutoValue_PubsubIO_Read.Builder().setNeedsAttributes(false).setNeedsMessageId(false).setPubsubClientFactory(FACTORY).setCoder(StringUtf8Coder.of()).setParseFn(new ParsePayloadAsUtf8()).build();
    }

    public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
        ProtoCoder coder = ProtoCoder.of(messageClass);
        return new AutoValue_PubsubIO_Read.Builder().setNeedsAttributes(false).setNeedsMessageId(false).setPubsubClientFactory(FACTORY).setCoder(coder).setParseFn(new ParsePayloadUsingCoder(coder)).build();
    }

    public static <T> Read<T> readAvros(Class<T> clazz) {
        AvroCoder coder = AvroCoder.of(clazz);
        return new AutoValue_PubsubIO_Read.Builder().setNeedsAttributes(false).setNeedsMessageId(false).setPubsubClientFactory(FACTORY).setCoder(coder).setParseFn(new ParsePayloadUsingCoder(coder)).build();
    }

    public static <T> Read<T> readMessagesWithCoderAndParseFn(Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
        return new AutoValue_PubsubIO_Read.Builder().setNeedsAttributes(false).setNeedsMessageId(false).setPubsubClientFactory(FACTORY).setCoder(coder).setParseFn(parseFn).build();
    }

    @Experimental(value=Experimental.Kind.SCHEMAS)
    public static Read<GenericRecord> readAvroGenericRecords(org.apache.avro.Schema avroSchema) {
        Schema schema = AvroUtils.getSchema(GenericRecord.class, (org.apache.avro.Schema)avroSchema);
        AvroCoder coder = AvroCoder.of(GenericRecord.class, (org.apache.avro.Schema)avroSchema);
        return new AutoValue_PubsubIO_Read.Builder().setNeedsAttributes(false).setNeedsMessageId(false).setPubsubClientFactory(FACTORY).setBeamSchema(schema).setTypeDescriptor(TypeDescriptor.of(GenericRecord.class)).setToRowFn(AvroUtils.getToRowFunction(GenericRecord.class, (org.apache.avro.Schema)avroSchema)).setFromRowFn(AvroUtils.getFromRowFunction(GenericRecord.class)).setParseFn(new ParsePayloadUsingCoder(coder)).build();
    }

    @Experimental(value=Experimental.Kind.SCHEMAS)
    public static <T> Read<T> readAvrosWithBeamSchema(Class<T> clazz) {
        if (clazz.equals(GenericRecord.class)) {
            throw new IllegalArgumentException("For GenericRecord, please call readAvroGenericRecords");
        }
        org.apache.avro.Schema avroSchema = ReflectData.get().getSchema(clazz);
        AvroCoder coder = AvroCoder.of(clazz);
        Schema schema = AvroUtils.getSchema(clazz, null);
        return new AutoValue_PubsubIO_Read.Builder().setNeedsAttributes(false).setNeedsMessageId(false).setPubsubClientFactory(FACTORY).setBeamSchema(schema).setTypeDescriptor(TypeDescriptor.of(clazz)).setToRowFn(AvroUtils.getToRowFunction(clazz, (org.apache.avro.Schema)avroSchema)).setFromRowFn(AvroUtils.getFromRowFunction(clazz)).setParseFn(new ParsePayloadUsingCoder(coder)).build();
    }

    private static <T> Write<T> write() {
        return new AutoValue_PubsubIO_Write.Builder().build();
    }

    public static Write<PubsubMessage> writeMessages() {
        return PubsubIO.write().withFormatFn(new IdentityMessageFn());
    }

    public static Write<String> writeStrings() {
        return PubsubIO.write().withFormatFn(new FormatPayloadAsUtf8());
    }

    public static <T extends Message> Write<T> writeProtos(Class<T> messageClass) {
        return ((Write)PubsubIO.write()).withFormatFn(new FormatPayloadUsingCoder(ProtoCoder.of(messageClass)));
    }

    public static <T> Write<T> writeAvros(Class<T> clazz) {
        return ((Write)PubsubIO.write()).withFormatFn(new FormatPayloadUsingCoder(AvroCoder.of(clazz)));
    }

    private PubsubIO() {
    }

    private static class IdentityMessageFn
    extends SimpleFunction<PubsubMessage, PubsubMessage> {
        private IdentityMessageFn() {
        }

        public PubsubMessage apply(PubsubMessage input) {
            return input;
        }
    }

    private static class FormatPayloadFromPubsubMessageProto
    extends SimpleFunction<byte[], PubsubMessage> {
        private FormatPayloadFromPubsubMessageProto() {
        }

        public PubsubMessage apply(byte[] input) {
            try {
                com.google.pubsub.v1.PubsubMessage message = com.google.pubsub.v1.PubsubMessage.parseFrom((byte[])input);
                return new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap());
            }
            catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Could not decode Pubsub message", e);
            }
        }
    }

    private static class FormatPayloadUsingCoder<T>
    extends SimpleFunction<T, PubsubMessage> {
        private Coder<T> coder;

        public FormatPayloadUsingCoder(Coder<T> coder) {
            this.coder = coder;
        }

        public PubsubMessage apply(T input) {
            try {
                return new PubsubMessage(CoderUtils.encodeToByteArray(this.coder, input), (Map<String, String>)ImmutableMap.of());
            }
            catch (CoderException e) {
                throw new RuntimeException("Could not decode Pubsub message", e);
            }
        }
    }

    private static class FormatPayloadAsUtf8
    extends SimpleFunction<String, PubsubMessage> {
        private FormatPayloadAsUtf8() {
        }

        public PubsubMessage apply(String input) {
            return new PubsubMessage(input.getBytes(StandardCharsets.UTF_8), (Map<String, String>)ImmutableMap.of());
        }
    }

    private static class ParsePayloadAsPubsubMessageProto
    extends SimpleFunction<PubsubMessage, byte[]> {
        private ParsePayloadAsPubsubMessageProto() {
        }

        public byte[] apply(PubsubMessage input) {
            Map<String, String> attributes = input.getAttributeMap();
            PubsubMessage.Builder message = com.google.pubsub.v1.PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])input.getPayload()));
            if (attributes != null) {
                message.putAllAttributes(attributes);
            }
            return message.build().toByteArray();
        }
    }

    private static class ParsePayloadUsingCoder<T>
    extends SimpleFunction<PubsubMessage, T> {
        private Coder<T> coder;

        public ParsePayloadUsingCoder(Coder<T> coder) {
            this.coder = coder;
        }

        public T apply(PubsubMessage input) {
            try {
                return (T)CoderUtils.decodeFromByteArray(this.coder, (byte[])input.getPayload());
            }
            catch (CoderException e) {
                throw new RuntimeException("Could not decode Pubsub message", e);
            }
        }
    }

    private static class ParsePayloadAsUtf8
    extends SimpleFunction<PubsubMessage, String> {
        private ParsePayloadAsUtf8() {
        }

        public String apply(PubsubMessage input) {
            return new String(input.getPayload(), StandardCharsets.UTF_8);
        }
    }

    @AutoValue
    public static abstract class Write<T>
    extends PTransform<PCollection<T>, PDone> {
        private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 7500000;
        private static final int MAX_PUBLISH_BATCH_SIZE = 100;

        @Nullable
        abstract ValueProvider<PubsubTopic> getTopicProvider();

        @Nullable
        abstract PubsubClient.PubsubClientFactory getPubsubClientFactory();

        @Nullable
        abstract Integer getMaxBatchSize();

        @Nullable
        abstract Integer getMaxBatchBytesSize();

        @Nullable
        abstract String getTimestampAttribute();

        @Nullable
        abstract String getIdAttribute();

        @Nullable
        abstract SimpleFunction<T, PubsubMessage> getFormatFn();

        abstract Builder<T> toBuilder();

        public Write<T> to(String topic) {
            return this.to((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)topic));
        }

        public Write<T> to(ValueProvider<String> topic) {
            return this.toBuilder().setTopicProvider((ValueProvider<PubsubTopic>)ValueProvider.NestedValueProvider.of(topic, (SerializableFunction)new TopicTranslator())).build();
        }

        public Write<T> withClientFactory(PubsubClient.PubsubClientFactory factory) {
            return this.toBuilder().setPubsubClientFactory(factory).build();
        }

        public Write<T> withMaxBatchSize(int batchSize) {
            return this.toBuilder().setMaxBatchSize(batchSize).build();
        }

        public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
            return this.toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build();
        }

        public Write<T> withTimestampAttribute(String timestampAttribute) {
            return this.toBuilder().setTimestampAttribute(timestampAttribute).build();
        }

        public Write<T> withIdAttribute(String idAttribute) {
            return this.toBuilder().setIdAttribute(idAttribute).build();
        }

        private Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) {
            return this.toBuilder().setFormatFn(formatFn).build();
        }

        public PDone expand(PCollection<T> input) {
            if (this.getTopicProvider() == null) {
                throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
            }
            switch (input.isBounded()) {
                case BOUNDED: {
                    input.apply((PTransform)ParDo.of((DoFn)new PubsubBoundedWriter((Integer)MoreObjects.firstNonNull((Object)this.getMaxBatchSize(), (Object)100), (Integer)MoreObjects.firstNonNull((Object)this.getMaxBatchBytesSize(), (Object)7500000))));
                    return PDone.in((Pipeline)input.getPipeline());
                }
                case UNBOUNDED: {
                    return (PDone)((PCollection)input.apply((PTransform)MapElements.via(this.getFormatFn()))).apply((PTransform)new PubsubUnboundedSink(Optional.ofNullable(this.getPubsubClientFactory()).orElse(FACTORY), (ValueProvider<PubsubClient.TopicPath>)ValueProvider.NestedValueProvider.of(this.getTopicProvider(), (SerializableFunction)new TopicPathTranslator()), this.getTimestampAttribute(), this.getIdAttribute(), 100, (Integer)MoreObjects.firstNonNull((Object)this.getMaxBatchSize(), (Object)1000), (Integer)MoreObjects.firstNonNull((Object)this.getMaxBatchBytesSize(), (Object)400000)));
                }
            }
            throw new RuntimeException();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            PubsubIO.populateCommonDisplayData(builder, this.getTimestampAttribute(), this.getIdAttribute(), (ValueProvider<PubsubTopic>)this.getTopicProvider());
        }

        public class PubsubBoundedWriter
        extends DoFn<T, Void> {
            private transient List<PubsubClient.OutgoingMessage> output;
            private transient PubsubClient pubsubClient;
            private transient int currentOutputBytes;
            private int maxPublishBatchByteSize;
            private int maxPublishBatchSize;

            PubsubBoundedWriter(int maxPublishBatchSize, int maxPublishBatchByteSize) {
                this.maxPublishBatchSize = maxPublishBatchSize;
                this.maxPublishBatchByteSize = maxPublishBatchByteSize;
            }

            PubsubBoundedWriter() {
                this(100, 7500000);
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.StartBundleContext c) throws IOException {
                this.output = new ArrayList<PubsubClient.OutgoingMessage>();
                this.currentOutputBytes = 0;
                this.pubsubClient = Optional.ofNullable(Write.this.getPubsubClientFactory()).orElse(FACTORY).newClient(Write.this.getTimestampAttribute(), null, (PubsubOptions)c.getPipelineOptions().as(PubsubOptions.class));
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) throws IOException, SizeLimitExceededException {
                PubsubMessage message = (PubsubMessage)Write.this.getFormatFn().apply(c.element());
                byte[] payload = message.getPayload();
                Map<String, String> attributes = message.getAttributeMap();
                if (payload.length > this.maxPublishBatchByteSize) {
                    String msg = String.format("Pub/Sub message size (%d) exceeded maximum batch size (%d)", payload.length, this.maxPublishBatchByteSize);
                    throw new SizeLimitExceededException(msg);
                }
                if (this.currentOutputBytes + payload.length >= this.maxPublishBatchByteSize || this.output.size() >= this.maxPublishBatchSize) {
                    this.publish();
                }
                this.output.add(PubsubClient.OutgoingMessage.of(com.google.pubsub.v1.PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])payload)).putAllAttributes(attributes).build(), c.timestamp().getMillis(), null));
                this.currentOutputBytes += payload.length;
            }

            @DoFn.FinishBundle
            public void finishBundle() throws IOException {
                if (!this.output.isEmpty()) {
                    this.publish();
                }
                this.output = null;
                this.currentOutputBytes = 0;
                this.pubsubClient.close();
                this.pubsubClient = null;
            }

            private void publish() throws IOException {
                PubsubTopic topic = (PubsubTopic)Write.this.getTopicProvider().get();
                int n = this.pubsubClient.publish(PubsubClient.topicPathFromName(topic.project, topic.topic), this.output);
                Preconditions.checkState((n == this.output.size() ? 1 : 0) != 0);
                this.output.clear();
                this.currentOutputBytes = 0;
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.delegate((HasDisplayData)Write.this);
            }
        }

        @Experimental
        public static class External
        implements ExternalTransformRegistrar {
            public static final String URN = "beam:external:java:pubsub:write:v1";

            public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
                return ImmutableMap.of((Object)URN, AutoValue_PubsubIO_Write.Builder.class);
            }

            public static class Configuration {
                private String topic;
                @Nullable
                private String idAttribute;
                @Nullable
                private String timestampAttribute;

                public void setTopic(String topic) {
                    this.topic = topic;
                }

                public void setIdLabel(@Nullable String idAttribute) {
                    this.idAttribute = idAttribute;
                }

                public void setTimestampAttribute(@Nullable String timestampAttribute) {
                    this.timestampAttribute = timestampAttribute;
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<T>
        implements ExternalTransformBuilder<External.Configuration, PCollection<T>, PDone> {
            Builder() {
            }

            abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> var1);

            abstract Builder<T> setPubsubClientFactory(PubsubClient.PubsubClientFactory var1);

            abstract Builder<T> setMaxBatchSize(Integer var1);

            abstract Builder<T> setMaxBatchBytesSize(Integer var1);

            abstract Builder<T> setTimestampAttribute(String var1);

            abstract Builder<T> setIdAttribute(String var1);

            abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage> var1);

            abstract Write<T> build();

            public PTransform<PCollection<T>, PDone> buildExternal(External.Configuration config) {
                if (config.topic != null) {
                    ValueProvider.StaticValueProvider topic = ValueProvider.StaticValueProvider.of((Object)config.topic);
                    this.setTopicProvider((ValueProvider<PubsubTopic>)ValueProvider.NestedValueProvider.of((ValueProvider)topic, (SerializableFunction)new TopicTranslator()));
                }
                if (config.idAttribute != null) {
                    this.setIdAttribute(config.idAttribute);
                }
                if (config.timestampAttribute != null) {
                    this.setTimestampAttribute(config.timestampAttribute);
                }
                FormatPayloadFromPubsubMessageProto parseFn = new FormatPayloadFromPubsubMessageProto();
                this.setFormatFn(parseFn);
                return this.build();
            }
        }
    }

    @AutoValue
    public static abstract class Read<T>
    extends PTransform<PBegin, PCollection<T>> {
        @Nullable
        abstract ValueProvider<PubsubTopic> getTopicProvider();

        @Nullable
        abstract PubsubClient.PubsubClientFactory getPubsubClientFactory();

        @Nullable
        abstract ValueProvider<PubsubSubscription> getSubscriptionProvider();

        @Nullable
        abstract String getTimestampAttribute();

        @Nullable
        abstract String getIdAttribute();

        @Nullable
        abstract Coder<T> getCoder();

        @Nullable
        abstract SimpleFunction<PubsubMessage, T> getParseFn();

        @Nullable
        abstract Schema getBeamSchema();

        @Nullable
        abstract TypeDescriptor<T> getTypeDescriptor();

        @Nullable
        abstract SerializableFunction<T, Row> getToRowFn();

        @Nullable
        abstract SerializableFunction<Row, T> getFromRowFn();

        @Nullable
        abstract Clock getClock();

        abstract boolean getNeedsAttributes();

        abstract boolean getNeedsMessageId();

        abstract Builder<T> toBuilder();

        public Read<T> fromSubscription(String subscription) {
            return this.fromSubscription((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)subscription));
        }

        public Read<T> fromSubscription(ValueProvider<String> subscription) {
            if (subscription.isAccessible()) {
                PubsubSubscription.fromPath((String)subscription.get());
            }
            return this.toBuilder().setSubscriptionProvider((ValueProvider<PubsubSubscription>)ValueProvider.NestedValueProvider.of(subscription, (SerializableFunction)new SubscriptionTranslator())).build();
        }

        public Read<T> fromTopic(String topic) {
            return this.fromTopic((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)topic));
        }

        public Read<T> fromTopic(ValueProvider<String> topic) {
            if (topic.isAccessible()) {
                PubsubTopic.fromPath((String)topic.get());
            }
            return this.toBuilder().setTopicProvider((ValueProvider<PubsubTopic>)ValueProvider.NestedValueProvider.of(topic, (SerializableFunction)new TopicTranslator())).build();
        }

        public Read<T> withClientFactory(PubsubClient.PubsubClientFactory factory) {
            return this.toBuilder().setPubsubClientFactory(factory).build();
        }

        public Read<T> withTimestampAttribute(String timestampAttribute) {
            return this.toBuilder().setTimestampAttribute(timestampAttribute).build();
        }

        public Read<T> withIdAttribute(String idAttribute) {
            return this.toBuilder().setIdAttribute(idAttribute).build();
        }

        public Read<T> withCoderAndParseFn(Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
            return this.toBuilder().setCoder(coder).setParseFn(parseFn).build();
        }

        @VisibleForTesting
        Read<T> withClock(Clock clock) {
            return this.toBuilder().setClock(clock).build();
        }

        public PCollection<T> expand(PBegin input) {
            if (this.getTopicProvider() == null && this.getSubscriptionProvider() == null) {
                throw new IllegalStateException("Need to set either the topic or the subscription for a PubsubIO.Read transform");
            }
            if (this.getTopicProvider() != null && this.getSubscriptionProvider() != null) {
                throw new IllegalStateException("Can't set both the topic and the subscription for a PubsubIO.Read transform");
            }
            ValueProvider.NestedValueProvider topicPath = this.getTopicProvider() == null ? null : ValueProvider.NestedValueProvider.of(this.getTopicProvider(), (SerializableFunction)new TopicPathTranslator());
            ValueProvider.NestedValueProvider subscriptionPath = this.getSubscriptionProvider() == null ? null : ValueProvider.NestedValueProvider.of(this.getSubscriptionProvider(), (SerializableFunction)new SubscriptionPathTranslator());
            PubsubUnboundedSource source = new PubsubUnboundedSource(this.getClock(), Optional.ofNullable(this.getPubsubClientFactory()).orElse(FACTORY), null, (ValueProvider<PubsubClient.TopicPath>)topicPath, (ValueProvider<PubsubClient.SubscriptionPath>)subscriptionPath, this.getTimestampAttribute(), this.getIdAttribute(), this.getNeedsAttributes(), this.getNeedsMessageId());
            PCollection read = (PCollection)((PCollection)input.apply((PTransform)source)).apply((PTransform)MapElements.via(this.getParseFn()));
            return this.getBeamSchema() != null ? read.setSchema(this.getBeamSchema(), this.getTypeDescriptor(), this.getToRowFn(), this.getFromRowFn()) : read.setCoder(this.getCoder());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            PubsubIO.populateCommonDisplayData(builder, this.getTimestampAttribute(), this.getIdAttribute(), (ValueProvider<PubsubTopic>)this.getTopicProvider());
            builder.addIfNotNull(DisplayData.item((String)"subscription", this.getSubscriptionProvider()).withLabel("Pubsub Subscription"));
        }

        @Experimental
        public static class External
        implements ExternalTransformRegistrar {
            public static final String URN = "beam:external:java:pubsub:read:v1";

            public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
                return ImmutableMap.of((Object)URN, AutoValue_PubsubIO_Read.Builder.class);
            }

            public static class Configuration {
                @Nullable
                private String topic;
                @Nullable
                private String subscription;
                @Nullable
                private String idAttribute;
                @Nullable
                private String timestampAttribute;
                private boolean needsAttributes;

                public void setTopic(@Nullable String topic) {
                    this.topic = topic;
                }

                public void setSubscription(@Nullable String subscription) {
                    this.subscription = subscription;
                }

                public void setIdLabel(@Nullable String idAttribute) {
                    this.idAttribute = idAttribute;
                }

                public void setTimestampAttribute(@Nullable String timestampAttribute) {
                    this.timestampAttribute = timestampAttribute;
                }

                public void setWithAttributes(Boolean needsAttributes) {
                    this.needsAttributes = needsAttributes;
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<T>
        implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<T>> {
            Builder() {
            }

            abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> var1);

            abstract Builder<T> setPubsubClientFactory(PubsubClient.PubsubClientFactory var1);

            abstract Builder<T> setSubscriptionProvider(ValueProvider<PubsubSubscription> var1);

            abstract Builder<T> setTimestampAttribute(String var1);

            abstract Builder<T> setIdAttribute(String var1);

            abstract Builder<T> setCoder(Coder<T> var1);

            abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> var1);

            abstract Builder<T> setBeamSchema(@Nullable Schema var1);

            abstract Builder<T> setTypeDescriptor(@Nullable TypeDescriptor<T> var1);

            abstract Builder<T> setToRowFn(@Nullable SerializableFunction<T, Row> var1);

            abstract Builder<T> setFromRowFn(@Nullable SerializableFunction<Row, T> var1);

            abstract Builder<T> setNeedsAttributes(boolean var1);

            abstract Builder<T> setNeedsMessageId(boolean var1);

            abstract Builder<T> setClock(@Nullable Clock var1);

            abstract Read<T> build();

            public PTransform<PBegin, PCollection<T>> buildExternal(External.Configuration config) {
                if (config.topic != null) {
                    ValueProvider.StaticValueProvider topic = ValueProvider.StaticValueProvider.of((Object)config.topic);
                    this.setTopicProvider((ValueProvider<PubsubTopic>)ValueProvider.NestedValueProvider.of((ValueProvider)topic, (SerializableFunction)new TopicTranslator()));
                }
                if (config.subscription != null) {
                    ValueProvider.StaticValueProvider subscription = ValueProvider.StaticValueProvider.of((Object)config.subscription);
                    this.setSubscriptionProvider((ValueProvider<PubsubSubscription>)ValueProvider.NestedValueProvider.of((ValueProvider)subscription, (SerializableFunction)new SubscriptionTranslator()));
                }
                if (config.idAttribute != null) {
                    this.setIdAttribute(config.idAttribute);
                }
                if (config.timestampAttribute != null) {
                    this.setTimestampAttribute(config.timestampAttribute);
                }
                this.setPubsubClientFactory(FACTORY);
                this.setNeedsAttributes(config.needsAttributes);
                ByteArrayCoder coder = ByteArrayCoder.of();
                if (config.needsAttributes) {
                    ParsePayloadAsPubsubMessageProto parseFn = new ParsePayloadAsPubsubMessageProto();
                    this.setParseFn(parseFn);
                    this.setCoder((Coder<T>)coder);
                } else {
                    this.setParseFn(new ParsePayloadUsingCoder(coder));
                    this.setCoder((Coder<T>)coder);
                }
                this.setNeedsMessageId(false);
                return this.build();
            }
        }
    }

    public static class PubsubTopic
    implements Serializable {
        private final Type type;
        private final String project;
        private final String topic;

        private PubsubTopic(Type type, String project, String topic) {
            this.type = type;
            this.project = project;
            this.topic = topic;
        }

        public static PubsubTopic fromPath(String path) {
            String topicName;
            String projectName;
            if (path.equals(PubsubIO.TOPIC_DEV_NULL_TEST_NAME)) {
                return new PubsubTopic(Type.FAKE, "", path);
            }
            Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path);
            if (v1beta1Match.matches()) {
                LOG.warn("Saw topic in v1beta1 format.  Topics should be in the format projects/<project_id>/topics/<topic_name>");
                projectName = v1beta1Match.group(1);
                topicName = v1beta1Match.group(2);
            } else {
                Matcher match = TOPIC_REGEXP.matcher(path);
                if (!match.matches()) {
                    throw new IllegalArgumentException("Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path);
                }
                projectName = match.group(1);
                topicName = match.group(2);
            }
            PubsubIO.validateProjectName(projectName);
            PubsubIO.validatePubsubName(topicName);
            return new PubsubTopic(Type.NORMAL, projectName, topicName);
        }

        @Deprecated
        public String asV1Beta1Path() {
            if (this.type == Type.NORMAL) {
                return "/topics/" + this.project + "/" + this.topic;
            }
            return this.topic;
        }

        @Deprecated
        public String asV1Beta2Path() {
            if (this.type == Type.NORMAL) {
                return "projects/" + this.project + "/topics/" + this.topic;
            }
            return this.topic;
        }

        public String asPath() {
            if (this.type == Type.NORMAL) {
                return "projects/" + this.project + "/topics/" + this.topic;
            }
            return this.topic;
        }

        public String toString() {
            return this.asPath();
        }

        private static enum Type {
            NORMAL,
            FAKE;

        }
    }

    private static class ProjectPathTranslator
    implements SerializableFunction<PubsubSubscription, PubsubClient.ProjectPath> {
        private ProjectPathTranslator() {
        }

        public PubsubClient.ProjectPath apply(PubsubSubscription from) {
            return PubsubClient.projectPathFromId(from.project);
        }
    }

    private static class TopicPathTranslator
    implements SerializableFunction<PubsubTopic, PubsubClient.TopicPath> {
        private TopicPathTranslator() {
        }

        public PubsubClient.TopicPath apply(PubsubTopic from) {
            return PubsubClient.topicPathFromName(from.project, from.topic);
        }
    }

    private static class TopicTranslator
    implements SerializableFunction<String, PubsubTopic> {
        private TopicTranslator() {
        }

        public PubsubTopic apply(String from) {
            return PubsubTopic.fromPath(from);
        }
    }

    private static class SubscriptionPathTranslator
    implements SerializableFunction<PubsubSubscription, PubsubClient.SubscriptionPath> {
        private SubscriptionPathTranslator() {
        }

        public PubsubClient.SubscriptionPath apply(PubsubSubscription from) {
            return PubsubClient.subscriptionPathFromName(from.project, from.subscription);
        }
    }

    private static class SubscriptionTranslator
    implements SerializableFunction<String, PubsubSubscription> {
        private SubscriptionTranslator() {
        }

        public PubsubSubscription apply(String from) {
            return PubsubSubscription.fromPath(from);
        }
    }

    public static class PubsubSubscription
    implements Serializable {
        private final Type type;
        private final String project;
        private final String subscription;

        private PubsubSubscription(Type type, String project, String subscription) {
            this.type = type;
            this.project = project;
            this.subscription = subscription;
        }

        public static PubsubSubscription fromPath(String path) {
            String subscriptionName;
            String projectName;
            if (path.startsWith(PubsubIO.SUBSCRIPTION_RANDOM_TEST_PREFIX) || path.startsWith(PubsubIO.SUBSCRIPTION_STARTING_SIGNAL)) {
                return new PubsubSubscription(Type.FAKE, "", path);
            }
            Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path);
            if (v1beta1Match.matches()) {
                LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format projects/<project_id>/subscriptions/<subscription_name>");
                projectName = v1beta1Match.group(1);
                subscriptionName = v1beta1Match.group(2);
            } else {
                Matcher match = SUBSCRIPTION_REGEXP.matcher(path);
                if (!match.matches()) {
                    throw new IllegalArgumentException("Pubsub subscription is not in projects/<project_id>/subscriptions/<subscription_name> format: " + path);
                }
                projectName = match.group(1);
                subscriptionName = match.group(2);
            }
            PubsubIO.validateProjectName(projectName);
            PubsubIO.validatePubsubName(subscriptionName);
            return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName);
        }

        @Deprecated
        public String asV1Beta1Path() {
            if (this.type == Type.NORMAL) {
                return "/subscriptions/" + this.project + "/" + this.subscription;
            }
            return this.subscription;
        }

        @Deprecated
        public String asV1Beta2Path() {
            if (this.type == Type.NORMAL) {
                return "projects/" + this.project + "/subscriptions/" + this.subscription;
            }
            return this.subscription;
        }

        public String asPath() {
            if (this.type == Type.NORMAL) {
                return "projects/" + this.project + "/subscriptions/" + this.subscription;
            }
            return this.subscription;
        }

        public String toString() {
            return this.asPath();
        }

        private static enum Type {
            NORMAL,
            FAKE;

        }
    }
}

