/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_ReadSourceDescriptors;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_WriteRecords;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaCommitOffset;
import org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink;
import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaPublishTimestampFunction;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaWriter;
import org.apache.beam.sdk.io.kafka.LocalDeserializerProvider;
import org.apache.beam.sdk.io.kafka.ProducerRecordCoder;
import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.io.kafka.WatchKafkaTopicPartitionDoFn;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class KafkaIO {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);

    public static Read<byte[], byte[]> readBytes() {
        return KafkaIO.read().withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ByteArrayDeserializer.class);
    }

    public static <K, V> Read<K, V> read() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList<String>()).setTopicPartitions(new ArrayList<TopicPartition>()).setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).setCommitOffsetsInFinalizeEnabled(false).setDynamicRead(false).setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()).build();
    }

    public static <K, V> ReadSourceDescriptors<K, V> readSourceDescriptors() {
        return ReadSourceDescriptors.read();
    }

    public static <K, V> Write<K, V> write() {
        return new AutoValue_KafkaIO_Write.Builder().setWriteRecordsTransform(new AutoValue_KafkaIO_WriteRecords.Builder().setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES).setEOS(false).setNumShards(0).setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).build()).build();
    }

    public static <K, V> WriteRecords<K, V> writeRecords() {
        return new AutoValue_KafkaIO_WriteRecords.Builder().setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES).setEOS(false).setNumShards(0).setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).build();
    }

    private KafkaIO() {
    }

    private static Class resolveClass(String className) {
        try {
            return Class.forName(className);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("Could not find class: " + className);
        }
    }

    private static class NullOnlyCoder<T>
    extends AtomicCoder<T> {
        private NullOnlyCoder() {
        }

        public void encode(T value, OutputStream outStream) {
            Preconditions.checkArgument((value == null ? 1 : 0) != 0, (Object)"Can only encode nulls");
        }

        public T decode(InputStream inStream) {
            return null;
        }
    }

    private static class KafkaValueWrite<K, V>
    extends PTransform<PCollection<V>, PDone> {
        private final Write<K, V> kvWriteTransform;

        private KafkaValueWrite(Write<K, V> kvWriteTransform) {
            this.kvWriteTransform = kvWriteTransform;
        }

        public PDone expand(PCollection<V> input) {
            return (PDone)((PCollection)input.apply("Kafka values with default key", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<V, KV<K, V>>(){

                public KV<K, V> apply(V element) {
                    return KV.of(null, element);
                }
            }))).setCoder((Coder)KvCoder.of(new NullOnlyCoder(), (Coder)input.getCoder())).apply(this.kvWriteTransform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.kvWriteTransform.populateDisplayData(builder);
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class Write<K, V>
    extends PTransform<PCollection<KV<K, V>>, PDone> {
        abstract @Nullable String getTopic();

        abstract WriteRecords<K, V> getWriteRecordsTransform();

        abstract Builder<K, V> toBuilder();

        private Write<K, V> withWriteRecordsTransform(WriteRecords<K, V> transform) {
            return this.toBuilder().setWriteRecordsTransform(transform).build();
        }

        public Write<K, V> withBootstrapServers(String bootstrapServers) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withBootstrapServers(bootstrapServers));
        }

        public Write<K, V> withTopic(String topic) {
            return this.toBuilder().setTopic(topic).setWriteRecordsTransform(this.getWriteRecordsTransform().withTopic(topic)).build();
        }

        public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withKeySerializer(keySerializer));
        }

        public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withValueSerializer(valueSerializer));
        }

        public Write<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withProducerFactoryFn(producerFactoryFn));
        }

        public Write<K, V> withInputTimestamp() {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withInputTimestamp());
        }

        @Deprecated
        public Write<K, V> withPublishTimestampFunction(KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withPublishTimestampFunction(new PublishTimestampFunctionKV<K, V>(timestampFunction)));
        }

        public Write<K, V> withEOS(int numShards, String sinkGroupId) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withEOS(numShards, sinkGroupId));
        }

        public Write<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withConsumerFactoryFn(consumerFactoryFn));
        }

        @Deprecated
        public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().updateProducerProperties(configUpdates));
        }

        public Write<K, V> withProducerConfigUpdates(Map<String, Object> configUpdates) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withProducerConfigUpdates(configUpdates));
        }

        public PDone expand(PCollection<KV<K, V>> input) {
            Preconditions.checkArgument((this.getTopic() != null ? 1 : 0) != 0, (Object)"withTopic() is required");
            KvCoder kvCoder = (KvCoder)input.getCoder();
            return (PDone)((PCollection)input.apply("Kafka ProducerRecord", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>(){

                public ProducerRecord<K, V> apply(KV<K, V> element) {
                    return new ProducerRecord(this.getTopic(), element.getKey(), element.getValue());
                }
            }))).setCoder(ProducerRecordCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())).apply(this.getWriteRecordsTransform());
        }

        public void validate(PipelineOptions options) {
            this.getWriteRecordsTransform().validate(options);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.getWriteRecordsTransform().populateDisplayData(builder);
        }

        public PTransform<PCollection<V>, PDone> values() {
            return new KafkaValueWrite(this.withKeySerializer(StringSerializer.class));
        }

        private static class PublishTimestampFunctionKV<K, V>
        implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {
            private KafkaPublishTimestampFunction<KV<K, V>> fn;

            public PublishTimestampFunctionKV(KafkaPublishTimestampFunction<KV<K, V>> fn) {
                this.fn = fn;
            }

            @Override
            public Instant getTimestamp(ProducerRecord<K, V> e, Instant ts) {
                return this.fn.getTimestamp(KV.of((Object)e.key(), (Object)e.value()), ts);
            }
        }

        @Experimental(value=Experimental.Kind.PORTABILITY)
        @AutoService(value={ExternalTransformRegistrar.class})
        public static class External
        implements ExternalTransformRegistrar {
            public static final String URN = "beam:transform:org.apache.beam:kafka_write:v1";

            public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
                return ImmutableMap.of((Object)URN, AutoValue_KafkaIO_Write.Builder.class);
            }

            public static class Configuration {
                private Map<String, String> producerConfig;
                private String topic;
                private String keySerializer;
                private String valueSerializer;

                public void setProducerConfig(Map<String, String> producerConfig) {
                    this.producerConfig = producerConfig;
                }

                public void setTopic(String topic) {
                    this.topic = topic;
                }

                public void setKeySerializer(String keySerializer) {
                    this.keySerializer = keySerializer;
                }

                public void setValueSerializer(String valueSerializer) {
                    this.valueSerializer = valueSerializer;
                }
            }
        }

        @Experimental(value=Experimental.Kind.PORTABILITY)
        @AutoValue.Builder
        static abstract class Builder<K, V>
        implements ExternalTransformBuilder<External.Configuration, PCollection<KV<K, V>>, PDone> {
            Builder() {
            }

            abstract Builder<K, V> setTopic(String var1);

            abstract Builder<K, V> setWriteRecordsTransform(WriteRecords<K, V> var1);

            abstract Write<K, V> build();

            public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(External.Configuration configuration) {
                this.setTopic(configuration.topic);
                HashMap<String, Object> producerConfig = new HashMap<String, Object>(configuration.producerConfig);
                Class keySerializer = KafkaIO.resolveClass(configuration.keySerializer);
                Class valSerializer = KafkaIO.resolveClass(configuration.valueSerializer);
                WriteRecords writeRecords = KafkaIO.writeRecords().withProducerConfigUpdates(producerConfig).withKeySerializer(keySerializer).withValueSerializer(valSerializer).withTopic(configuration.topic);
                this.setWriteRecordsTransform(writeRecords);
                return this.build();
            }
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class WriteRecords<K, V>
    extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
        private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.of((Object)"retries", (Object)3);
        private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of((Object)"key.serializer", (Object)"Use withKeySerializer instead", (Object)"value.serializer", (Object)"Use withValueSerializer instead");

        abstract @Nullable String getTopic();

        abstract Map<String, Object> getProducerConfig();

        abstract @Nullable SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();

        abstract @Nullable Class<? extends Serializer<K>> getKeySerializer();

        abstract @Nullable Class<? extends Serializer<V>> getValueSerializer();

        abstract @Nullable KafkaPublishTimestampFunction<ProducerRecord<K, V>> getPublishTimestampFunction();

        abstract boolean isEOS();

        abstract @Nullable String getSinkGroupId();

        abstract int getNumShards();

        abstract @Nullable SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> getConsumerFactoryFn();

        abstract Builder<K, V> toBuilder();

        public WriteRecords<K, V> withBootstrapServers(String bootstrapServers) {
            return this.withProducerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"bootstrap.servers", (Object)bootstrapServers));
        }

        public WriteRecords<K, V> withTopic(String topic) {
            return this.toBuilder().setTopic(topic).build();
        }

        public WriteRecords<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
            return this.toBuilder().setKeySerializer(keySerializer).build();
        }

        public WriteRecords<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
            return this.toBuilder().setValueSerializer(valueSerializer).build();
        }

        @Deprecated
        public WriteRecords<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getProducerConfig(), configUpdates);
            return this.toBuilder().setProducerConfig(config).build();
        }

        public WriteRecords<K, V> withProducerConfigUpdates(Map<String, Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getProducerConfig(), configUpdates);
            return this.toBuilder().setProducerConfig(config).build();
        }

        public WriteRecords<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
            return this.toBuilder().setProducerFactoryFn(producerFactoryFn).build();
        }

        public WriteRecords<K, V> withInputTimestamp() {
            return this.withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
        }

        @Deprecated
        public WriteRecords<K, V> withPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) {
            return this.toBuilder().setPublishTimestampFunction(timestampFunction).build();
        }

        public WriteRecords<K, V> withEOS(int numShards, String sinkGroupId) {
            KafkaExactlyOnceSink.ensureEOSSupport();
            Preconditions.checkArgument((numShards >= 1 ? 1 : 0) != 0, (Object)"numShards should be >= 1");
            Preconditions.checkArgument((sinkGroupId != null ? 1 : 0) != 0, (Object)"sinkGroupId is required for exactly-once sink");
            return this.toBuilder().setEOS(true).setNumShards(numShards).setSinkGroupId(sinkGroupId).build();
        }

        public WriteRecords<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        public PDone expand(PCollection<ProducerRecord<K, V>> input) {
            Preconditions.checkArgument((this.getProducerConfig().get("bootstrap.servers") != null ? 1 : 0) != 0, (Object)"withBootstrapServers() is required");
            Preconditions.checkArgument((this.getKeySerializer() != null ? 1 : 0) != 0, (Object)"withKeySerializer() is required");
            Preconditions.checkArgument((this.getValueSerializer() != null ? 1 : 0) != 0, (Object)"withValueSerializer() is required");
            if (this.isEOS()) {
                Preconditions.checkArgument((this.getTopic() != null ? 1 : 0) != 0, (Object)"withTopic() is required when isEOS() is true");
                KafkaExactlyOnceSink.ensureEOSSupport();
                input.apply(new KafkaExactlyOnceSink(this));
            } else {
                input.apply((PTransform)ParDo.of(new KafkaWriter(this)));
            }
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void validate(PipelineOptions options) {
            if (this.isEOS()) {
                String runner = options.getRunner().getName();
                if ("org.apache.beam.runners.direct.DirectRunner".equals(runner) || runner.startsWith("org.apache.beam.runners.dataflow.") || runner.startsWith("org.apache.beam.runners.spark.") || runner.startsWith("org.apache.beam.runners.flink.")) {
                    return;
                }
                throw new UnsupportedOperationException(runner + " is not a runner known to be compatible with Kafka exactly-once sink. This implementation of exactly-once sink relies on specific checkpoint guarantees. Only the runners with known to have compatible checkpoint semantics are allowed.");
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"topic", (String)this.getTopic()).withLabel("Topic"));
            Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> conf : this.getProducerConfig().entrySet()) {
                String key = conf.getKey();
                if (ignoredProducerPropertiesKeys.contains(key)) continue;
                Object value = DisplayData.inferType((Object)conf.getValue()) != null ? conf.getValue() : String.valueOf(conf.getValue());
                builder.add(DisplayData.item((String)key, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)value)));
            }
        }

        @AutoValue.Builder
        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setTopic(String var1);

            abstract Builder<K, V> setProducerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> var1);

            abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> var1);

            abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> var1);

            abstract Builder<K, V> setPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> var1);

            abstract Builder<K, V> setEOS(boolean var1);

            abstract Builder<K, V> setSinkGroupId(String var1);

            abstract Builder<K, V> setNumShards(int var1);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> var1);

            abstract WriteRecords<K, V> build();
        }
    }

    @Experimental(value=Experimental.Kind.PORTABILITY)
    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class ReadSourceDescriptors<K, V>
    extends PTransform<PCollection<KafkaSourceDescriptor>, PCollection<KafkaRecord<K, V>>> {
        private static final Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class);

        abstract Map<String, Object> getConsumerConfig();

        abstract @Nullable Map<String, Object> getOffsetConsumerConfig();

        abstract @Nullable DeserializerProvider getKeyDeserializerProvider();

        abstract @Nullable DeserializerProvider getValueDeserializerProvider();

        abstract @Nullable Coder<K> getKeyCoder();

        abstract @Nullable Coder<V> getValueCoder();

        abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn();

        abstract @Nullable SerializableFunction<TopicPartition, Boolean> getCheckStopReadingFn();

        abstract @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> getExtractOutputTimestampFn();

        abstract @Nullable SerializableFunction<Instant, WatermarkEstimator<Instant>> getCreateWatermarkEstimatorFn();

        abstract boolean isCommitOffsetEnabled();

        abstract @Nullable TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

        abstract Builder<K, V> toBuilder();

        public static <K, V> ReadSourceDescriptors<K, V> read() {
            return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder().setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES).setCommitOffsetEnabled(false).build().withProcessingTime().withMonotonicallyIncreasingWatermarkEstimator();
        }

        public ReadSourceDescriptors<K, V> withBootstrapServers(String bootstrapServers) {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"bootstrap.servers", (Object)bootstrapServers));
        }

        public ReadSourceDescriptors<K, V> withKeyDeserializerProvider(DeserializerProvider<K> deserializerProvider) {
            return this.toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
        }

        public ReadSourceDescriptors<K, V> withValueDeserializerProvider(DeserializerProvider<V> deserializerProvider) {
            return this.toBuilder().setValueDeserializerProvider(deserializerProvider).build();
        }

        public ReadSourceDescriptors<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
            return this.withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
        }

        public ReadSourceDescriptors<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
            return this.withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
        }

        public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
            return this.withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
        }

        public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
            return this.withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
        }

        public ReadSourceDescriptors<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        public ReadSourceDescriptors<K, V> withCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) {
            return this.toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
        }

        public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(Map<String, Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getConsumerConfig(), configUpdates);
            return this.toBuilder().setConsumerConfig(config).build();
        }

        public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
            return this.toBuilder().setExtractOutputTimestampFn(fn).build();
        }

        public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
            return this.toBuilder().setCreateWatermarkEstimatorFn(fn).build();
        }

        public ReadSourceDescriptors<K, V> withLogAppendTime() {
            return this.withExtractOutputTimestampFn(ExtractOutputTimestampFns.useLogAppendTime());
        }

        public ReadSourceDescriptors<K, V> withProcessingTime() {
            return this.withExtractOutputTimestampFn(ExtractOutputTimestampFns.useProcessingTime());
        }

        public ReadSourceDescriptors<K, V> withCreateTime() {
            return this.withExtractOutputTimestampFn(ExtractOutputTimestampFns.useCreateTime());
        }

        public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
            return this.withCreatWatermarkEstimatorFn((SerializableFunction<Instant, WatermarkEstimator<Instant>>)(SerializableFunction & Serializable)state -> new WatermarkEstimators.WallTime(state));
        }

        public ReadSourceDescriptors<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
            return this.withCreatWatermarkEstimatorFn((SerializableFunction<Instant, WatermarkEstimator<Instant>>)(SerializableFunction & Serializable)state -> new WatermarkEstimators.MonotonicallyIncreasing(state));
        }

        public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
            return this.withCreatWatermarkEstimatorFn((SerializableFunction<Instant, WatermarkEstimator<Instant>>)(SerializableFunction & Serializable)state -> new WatermarkEstimators.Manual(state));
        }

        public ReadSourceDescriptors<K, V> withReadCommitted() {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"isolation.level", (Object)"read_committed"));
        }

        public ReadSourceDescriptors<K, V> commitOffsets() {
            return this.toBuilder().setCommitOffsetEnabled(true).build();
        }

        public ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {
            return this.toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
        }

        public ReadSourceDescriptors<K, V> withConsumerConfigOverrides(Map<String, Object> consumerConfig) {
            return this.toBuilder().setConsumerConfig(consumerConfig).build();
        }

        ReadAllFromRow forExternalBuild() {
            return new ReadAllFromRow(this);
        }

        ReadSourceDescriptors<K, V> withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) {
            return this.toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build().withManualWatermarkEstimator();
        }

        public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor> input) {
            Preconditions.checkArgument((this.getKeyDeserializerProvider() != null ? 1 : 0) != 0, (Object)"withKeyDeserializer() is required");
            Preconditions.checkArgument((this.getValueDeserializerProvider() != null ? 1 : 0) != 0, (Object)"withValueDeserializer() is required");
            if (!ConsumerSpEL.hasOffsetsForTimes()) {
                LOG.warn("Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and may not be supported in next release of Apache Beam. Please upgrade your Kafka client version.", (Object)AppInfoParser.getVersion());
            }
            if (this.isCommitOffsetEnabled() && this.configuredKafkaCommit()) {
                LOG.info("Either read_committed or auto_commit is set together with commitOffsetEnabled but you only need one of them. The commitOffsetEnabled is going to be ignored");
            }
            if (this.getConsumerConfig().get("bootstrap.servers") == null) {
                LOG.warn("The bootstrapServers is not set. It must be populated through the KafkaSourceDescriptor during runtime otherwise the pipeline will fail.");
            }
            CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
            Coder<K> keyCoder = this.getKeyCoder(coderRegistry);
            Coder<V> valueCoder = this.getValueCoder(coderRegistry);
            KafkaRecordCoder<K, V> recordCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
            try {
                PCollection outputWithDescriptor = ((PCollection)input.apply((PTransform)ParDo.of(new ReadFromKafkaDoFn(this)))).setCoder((Coder)KvCoder.of((Coder)input.getPipeline().getSchemaRegistry().getSchemaCoder(KafkaSourceDescriptor.class), recordCoder));
                if (this.isCommitOffsetEnabled() && !this.configuredKafkaCommit()) {
                    outputWithDescriptor = ((PCollection)outputWithDescriptor.apply((PTransform)Reshuffle.viaRandomKey())).setCoder((Coder)KvCoder.of((Coder)input.getPipeline().getSchemaRegistry().getSchemaCoder(KafkaSourceDescriptor.class), recordCoder));
                    PCollection unused = (PCollection)outputWithDescriptor.apply(new KafkaCommitOffset(this));
                    unused.setCoder((Coder)VoidCoder.of());
                }
                PCollection output = ((PCollection)outputWithDescriptor.apply((PTransform)MapElements.into((TypeDescriptor)new TypeDescriptor<KafkaRecord<K, V>>(){}).via((SerializableFunction & Serializable)element -> (KafkaRecord)element.getValue()))).setCoder(recordCoder);
                return output;
            }
            catch (NoSuchSchemaException e) {
                throw new RuntimeException(e.getMessage());
            }
        }

        private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
            return this.getKeyCoder() != null ? this.getKeyCoder() : this.getKeyDeserializerProvider().getCoder(coderRegistry);
        }

        private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
            return this.getValueCoder() != null ? this.getValueCoder() : this.getValueDeserializerProvider().getCoder(coderRegistry);
        }

        private boolean configuredKafkaCommit() {
            return this.getConsumerConfig().get("isolation.level") == "read_committed" || Boolean.TRUE.equals(this.getConsumerConfig().get("enable.auto.commit"));
        }

        static class ExtractOutputTimestampFns<K, V> {
            ExtractOutputTimestampFns() {
            }

            public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useProcessingTime() {
                return (SerializableFunction & Serializable)record -> Instant.now();
            }

            public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useCreateTime() {
                return (SerializableFunction & Serializable)record -> {
                    Preconditions.checkArgument((record.getTimestampType() == KafkaTimestampType.CREATE_TIME ? 1 : 0) != 0, (String)"Kafka record's timestamp is not 'CREATE_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", (Object)record.getTopic(), (Object)record.getPartition(), (Object)record.getOffset(), (Object)((Object)record.getTimestampType()));
                    return new Instant(record.getTimestamp());
                };
            }

            public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useLogAppendTime() {
                return (SerializableFunction & Serializable)record -> {
                    Preconditions.checkArgument((record.getTimestampType() == KafkaTimestampType.LOG_APPEND_TIME ? 1 : 0) != 0, (String)"Kafka record's timestamp is not 'LOG_APPEND_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", (Object)record.getTopic(), (Object)record.getPartition(), (Object)record.getOffset(), (Object)((Object)record.getTimestampType()));
                    return new Instant(record.getTimestamp());
                };
            }
        }

        private static class ReadAllFromRow<K, V>
        extends PTransform<PCollection<Row>, PCollection<KV<K, V>>> {
            private final ReadSourceDescriptors<K, V> readViaSDF;

            ReadAllFromRow(ReadSourceDescriptors read) {
                this.readViaSDF = read;
            }

            public PCollection<KV<K, V>> expand(PCollection<Row> input) {
                return ((PCollection)((PCollection)((PCollection)input.apply(Convert.fromRows(KafkaSourceDescriptor.class))).apply(this.readViaSDF)).apply((PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<K, V>, KV<K, V>>(){

                    @DoFn.ProcessElement
                    public void processElement(@DoFn.Element KafkaRecord element, DoFn.OutputReceiver<KV<K, V>> outputReceiver) {
                        outputReceiver.output(element.getKV());
                    }
                }))).setCoder((Coder)KvCoder.of(this.readViaSDF.getKeyCoder(), this.readViaSDF.getValueCoder()));
            }
        }

        @AutoValue.Builder
        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setConsumerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> var1);

            abstract Builder<K, V> setCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> var1);

            abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider var1);

            abstract Builder<K, V> setValueDeserializerProvider(DeserializerProvider var1);

            abstract Builder<K, V> setKeyCoder(Coder<K> var1);

            abstract Builder<K, V> setValueCoder(Coder<V> var1);

            abstract Builder<K, V> setExtractOutputTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> var1);

            abstract Builder<K, V> setCreateWatermarkEstimatorFn(SerializableFunction<Instant, WatermarkEstimator<Instant>> var1);

            abstract Builder<K, V> setCommitOffsetEnabled(boolean var1);

            abstract Builder<K, V> setTimestampPolicyFactory(TimestampPolicyFactory<K, V> var1);

            abstract ReadSourceDescriptors<K, V> build();
        }
    }

    static class RowsWithMetadata<K, V>
    extends PTransform<PBegin, PCollection<Row>> {
        private final Read<K, V> read;

        RowsWithMetadata(Read<K, V> read) {
            super("KafkaIO.RowsWithMetadata");
            this.read = read;
        }

        public static <K, V> ByteArrayKafkaRecord toExternalKafkaRecord(KafkaRecord<K, V> kafkaRecord) {
            List<KafkaHeader> headers = kafkaRecord.getHeaders() == null ? null : Arrays.stream(kafkaRecord.getHeaders().toArray()).map(h -> new KafkaHeader(h.key(), h.value())).collect(Collectors.toList());
            ByteArrayKafkaRecord byteArrayKafkaRecord = new ByteArrayKafkaRecord(kafkaRecord.getTopic(), kafkaRecord.getPartition(), kafkaRecord.getOffset(), kafkaRecord.getTimestamp(), (byte[])kafkaRecord.getKV().getKey(), (byte[])kafkaRecord.getKV().getValue(), headers, kafkaRecord.getTimestampType().id, kafkaRecord.getTimestampType().name);
            return byteArrayKafkaRecord;
        }

        public PCollection<Row> expand(PBegin begin) {
            return (PCollection)((PCollection)((PCollection)begin.apply(this.read)).apply("Convert to ExternalKafkaRecord", (PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<K, V>, ByteArrayKafkaRecord>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext ctx) {
                    KafkaRecord kafkRecord = (KafkaRecord)ctx.element();
                    ctx.output((Object)RowsWithMetadata.toExternalKafkaRecord(kafkRecord));
                }
            }))).apply(Convert.toRows());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }

        @Experimental(value=Experimental.Kind.PORTABILITY)
        static class Builder<K, V>
        implements ExternalTransformBuilder<Read.External.Configuration, PBegin, PCollection<Row>> {
            Builder() {
            }

            public PTransform<PBegin, PCollection<Row>> buildExternal(Read.External.Configuration config) {
                AutoValue_KafkaIO_Read.Builder readBuilder = new AutoValue_KafkaIO_Read.Builder();
                Read.Builder.setupExternalBuilder(readBuilder, config);
                Class keyDeserializer = KafkaIO.resolveClass(config.keyDeserializer);
                Coder keyCoder = Read.Builder.resolveCoder(keyDeserializer);
                if (!(keyCoder instanceof NullableCoder) || !(keyCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) {
                    throw new RuntimeException("ExternalWithMetadata transform only supports keys of type nullable(byte[])");
                }
                Class valueDeserializer = KafkaIO.resolveClass(config.valueDeserializer);
                Coder valueCoder = Read.Builder.resolveCoder(valueDeserializer);
                if (!(valueCoder instanceof NullableCoder) || !(valueCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) {
                    throw new RuntimeException("ExternalWithMetadata transform only supports values of type nullable(byte[])");
                }
                return ((Read.Builder)readBuilder).build().externalWithMetadata();
            }
        }
    }

    @DefaultSchema(value=JavaFieldSchema.class)
    @SuppressFBWarnings(value={"URF_UNREAD_FIELD"})
    static class ByteArrayKafkaRecord {
        String topic;
        int partition;
        long offset;
        long timestamp;
        byte[] key;
        byte[] value;
        List<KafkaHeader> headers;
        int timestampTypeId;
        String timestampTypeName;

        @SchemaCreate
        public ByteArrayKafkaRecord(String topic, int partition, long offset, long timestamp, byte @Nullable [] key, byte @Nullable [] value, @Nullable List<KafkaHeader> headers, int timestampTypeId, String timestampTypeName) {
            this.topic = topic;
            this.partition = partition;
            this.offset = offset;
            this.timestamp = timestamp;
            this.key = key;
            this.value = value;
            this.headers = headers;
            this.timestampTypeId = timestampTypeId;
            this.timestampTypeName = timestampTypeName;
        }
    }

    @DefaultSchema(value=JavaFieldSchema.class)
    @SuppressFBWarnings(value={"URF_UNREAD_FIELD"})
    static class KafkaHeader {
        String key;
        byte[] value;

        @SchemaCreate
        public KafkaHeader(String key, byte[] value) {
            this.key = key;
            this.value = value;
        }
    }

    public static class TypedWithoutMetadata<K, V>
    extends PTransform<PBegin, PCollection<KV<K, V>>> {
        private final Read<K, V> read;

        TypedWithoutMetadata(Read<K, V> read) {
            super("KafkaIO.Read");
            this.read = read;
        }

        public PCollection<KV<K, V>> expand(PBegin begin) {
            return (PCollection)((PCollection)begin.apply(this.read)).apply("Remove Kafka Metadata", (PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<K, V>, KV<K, V>>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext ctx) {
                    ctx.output(((KafkaRecord)ctx.element()).getKV());
                }
            }));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }

        @Experimental(value=Experimental.Kind.PORTABILITY)
        static class Builder<K, V>
        implements ExternalTransformBuilder<Read.External.Configuration, PBegin, PCollection<KV<K, V>>> {
            Builder() {
            }

            public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(Read.External.Configuration config) {
                AutoValue_KafkaIO_Read.Builder readBuilder = new AutoValue_KafkaIO_Read.Builder();
                Read.Builder.setupExternalBuilder(readBuilder, config);
                return ((Read.Builder)readBuilder).build().withoutMetadata();
            }
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class Read<K, V>
    extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
        @Internal
        public static final PTransformOverride KAFKA_READ_OVERRIDE = PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(ReadFromKafkaViaSDF.class), new KafkaReadOverrideFactory());

        abstract Map<String, Object> getConsumerConfig();

        abstract @Nullable List<String> getTopics();

        abstract @Nullable List<TopicPartition> getTopicPartitions();

        abstract @Nullable Coder<K> getKeyCoder();

        abstract @Nullable Coder<V> getValueCoder();

        abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn();

        abstract @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();

        abstract long getMaxNumRecords();

        abstract @Nullable Duration getMaxReadTime();

        abstract @Nullable Instant getStartReadTime();

        abstract @Nullable Instant getStopReadTime();

        abstract boolean isCommitOffsetsInFinalizeEnabled();

        abstract boolean isDynamicRead();

        abstract @Nullable Duration getWatchTopicPartitionDuration();

        abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

        abstract @Nullable Map<String, Object> getOffsetConsumerConfig();

        abstract @Nullable DeserializerProvider getKeyDeserializerProvider();

        abstract @Nullable DeserializerProvider getValueDeserializerProvider();

        abstract @Nullable SerializableFunction<TopicPartition, Boolean> getCheckStopReadingFn();

        abstract Builder<K, V> toBuilder();

        public Read<K, V> withBootstrapServers(String bootstrapServers) {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"bootstrap.servers", (Object)bootstrapServers));
        }

        public Read<K, V> withTopic(String topic) {
            return this.withTopics((List<String>)ImmutableList.of((Object)topic));
        }

        public Read<K, V> withTopics(List<String> topics) {
            Preconditions.checkState((this.getTopicPartitions() == null || this.getTopicPartitions().isEmpty() ? 1 : 0) != 0, (Object)"Only topics or topicPartitions can be set, not both");
            return this.toBuilder().setTopics((List<String>)ImmutableList.copyOf(topics)).build();
        }

        public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
            Preconditions.checkState((this.getTopics() == null || this.getTopics().isEmpty() ? 1 : 0) != 0, (Object)"Only topics or topicPartitions can be set, not both");
            return this.toBuilder().setTopicPartitions((List<TopicPartition>)ImmutableList.copyOf(topicPartitions)).build();
        }

        public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
            return this.withKeyDeserializer(LocalDeserializerProvider.of(keyDeserializer));
        }

        public Read<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
            return this.withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
        }

        public Read<K, V> withKeyDeserializer(DeserializerProvider<K> deserializerProvider) {
            return this.toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
        }

        public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
            return this.withValueDeserializer(LocalDeserializerProvider.of(valueDeserializer));
        }

        public Read<K, V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
            return this.withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
        }

        public Read<K, V> withValueDeserializer(DeserializerProvider<V> deserializerProvider) {
            return this.toBuilder().setValueDeserializerProvider(deserializerProvider).build();
        }

        public Read<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        @Deprecated
        public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getConsumerConfig(), configUpdates);
            return this.toBuilder().setConsumerConfig(config).build();
        }

        public Read<K, V> withMaxNumRecords(long maxNumRecords) {
            return this.toBuilder().setMaxNumRecords(maxNumRecords).build();
        }

        public Read<K, V> withStartReadTime(Instant startReadTime) {
            return this.toBuilder().setStartReadTime(startReadTime).build();
        }

        public Read<K, V> withStopReadTime(Instant stopReadTime) {
            return this.toBuilder().setStopReadTime(stopReadTime).build();
        }

        public Read<K, V> withMaxReadTime(Duration maxReadTime) {
            return this.toBuilder().setMaxReadTime(maxReadTime).build();
        }

        public Read<K, V> withLogAppendTime() {
            return this.withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
        }

        public Read<K, V> withProcessingTime() {
            return this.withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
        }

        public Read<K, V> withCreateTime(Duration maxDelay) {
            return this.withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));
        }

        public Read<K, V> withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) {
            return this.toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
        }

        @Deprecated
        public Read<K, V> withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
            Preconditions.checkArgument((timestampFn != null ? 1 : 0) != 0, (Object)"timestampFn can not be null");
            return this.toBuilder().setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn)).build();
        }

        @Deprecated
        public Read<K, V> withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
            Preconditions.checkArgument((watermarkFn != null ? 1 : 0) != 0, (Object)"watermarkFn can not be null");
            return this.toBuilder().setWatermarkFn(watermarkFn).build();
        }

        @Deprecated
        public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
            Preconditions.checkArgument((timestampFn != null ? 1 : 0) != 0, (Object)"timestampFn can not be null");
            return this.withTimestampFn2(Read.unwrapKafkaAndThen(timestampFn));
        }

        @Deprecated
        public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
            Preconditions.checkArgument((watermarkFn != null ? 1 : 0) != 0, (Object)"watermarkFn can not be null");
            return this.withWatermarkFn2(Read.unwrapKafkaAndThen(watermarkFn));
        }

        public Read<K, V> withReadCommitted() {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"isolation.level", (Object)"read_committed"));
        }

        public Read<K, V> commitOffsetsInFinalize() {
            return this.toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
        }

        public Read<K, V> withDynamicRead(Duration duration) {
            return this.toBuilder().setDynamicRead(true).setWatchTopicPartitionDuration(duration).build();
        }

        public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {
            return this.toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
        }

        public Read<K, V> withConsumerConfigUpdates(Map<String, Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getConsumerConfig(), configUpdates);
            return this.toBuilder().setConsumerConfig(config).build();
        }

        public Read<K, V> withCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) {
            return this.toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
        }

        public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
            return new TypedWithoutMetadata(this);
        }

        PTransform<PBegin, PCollection<Row>> externalWithMetadata() {
            return new RowsWithMetadata(this);
        }

        public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
            Preconditions.checkArgument((this.getConsumerConfig().get("bootstrap.servers") != null ? 1 : 0) != 0, (Object)"withBootstrapServers() is required");
            if (!this.isDynamicRead()) {
                Preconditions.checkArgument((this.getTopics() != null && this.getTopics().size() > 0 || this.getTopicPartitions() != null && this.getTopicPartitions().size() > 0 ? 1 : 0) != 0, (Object)"Either withTopic(), withTopics() or withTopicPartitions() is required");
            } else {
                Preconditions.checkArgument((boolean)ExperimentalOptions.hasExperiment((PipelineOptions)input.getPipeline().getOptions(), (String)"beam_fn_api"), (Object)"Kafka Dynamic Read requires enabling experiment beam_fn_api.");
            }
            Preconditions.checkArgument((this.getKeyDeserializerProvider() != null ? 1 : 0) != 0, (Object)"withKeyDeserializer() is required");
            Preconditions.checkArgument((this.getValueDeserializerProvider() != null ? 1 : 0) != 0, (Object)"withValueDeserializer() is required");
            if (!ConsumerSpEL.hasOffsetsForTimes()) {
                LOG.warn("Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and may not be supported in next release of Apache Beam. Please upgrade your Kafka client version.", (Object)AppInfoParser.getVersion());
            }
            if (this.getStartReadTime() != null) {
                Preconditions.checkArgument((boolean)ConsumerSpEL.hasOffsetsForTimes(), (Object)("Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer."));
            }
            if (this.getStopReadTime() != null) {
                Preconditions.checkArgument((boolean)ConsumerSpEL.hasOffsetsForTimes(), (Object)("Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer."));
            }
            if (this.isCommitOffsetsInFinalizeEnabled()) {
                Preconditions.checkArgument((this.getConsumerConfig().get("group.id") != null ? 1 : 0) != 0, (Object)"commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config is not set. Offset management requires group.id.");
                if (Boolean.TRUE.equals(this.getConsumerConfig().get("enable.auto.commit"))) {
                    LOG.warn("'{}' in consumer config is enabled even though commitOffsetsInFinalize() is set. You need only one of them.", (Object)"enable.auto.commit");
                }
            }
            CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
            Coder<K> keyCoder = this.getKeyCoder(coderRegistry);
            Coder<V> valueCoder = this.getValueCoder(coderRegistry);
            KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityResult compatibility = KafkaIOReadImplementationCompatibility.getCompatibility(this);
            if (ExperimentalOptions.hasExperiment((PipelineOptions)input.getPipeline().getOptions(), (String)"beam_fn_api_use_deprecated_read") || ExperimentalOptions.hasExperiment((PipelineOptions)input.getPipeline().getOptions(), (String)"use_deprecated_read") || compatibility.supportsOnly(KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY) || compatibility.supports(KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY) && this.runnerPrefersLegacyRead(input.getPipeline().getOptions())) {
                return (PCollection)input.apply(new ReadFromKafkaViaUnbounded<K, V>(this, keyCoder, valueCoder));
            }
            return (PCollection)input.apply(new ReadFromKafkaViaSDF<K, V>(this, keyCoder, valueCoder));
        }

        private boolean runnerPrefersLegacyRead(PipelineOptions options) {
            if (ExperimentalOptions.hasExperiment((PipelineOptions)options, (String)"use_sdf_read")) {
                return false;
            }
            if (options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) {
                return false;
            }
            return !ExperimentalOptions.hasExperiment((PipelineOptions)options, (String)"beam_fn_api");
        }

        private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
            return this.getKeyCoder() != null ? this.getKeyCoder() : this.getKeyDeserializerProvider().getCoder(coderRegistry);
        }

        private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
            return this.getValueCoder() != null ? this.getValueCoder() : this.getValueDeserializerProvider().getCoder(coderRegistry);
        }

        @VisibleForTesting
        UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
            return new KafkaUnboundedSource(this, -1);
        }

        private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
            return (SerializableFunction & Serializable)record -> fn.apply(record.getKV());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            List<String> topics = this.getTopics();
            List<TopicPartition> topicPartitions = this.getTopicPartitions();
            if (topics.size() > 0) {
                builder.add(DisplayData.item((String)"topics", (String)Joiner.on((String)",").join(topics)).withLabel("Topic/s"));
            } else if (topicPartitions.size() > 0) {
                builder.add(DisplayData.item((String)"topicPartitions", (String)Joiner.on((String)",").join(topicPartitions)).withLabel("Topic Partition/s"));
            }
            Set<String> disallowedConsumerPropertiesKeys = KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> conf : this.getConsumerConfig().entrySet()) {
                String key = conf.getKey();
                if (disallowedConsumerPropertiesKeys.contains(key)) continue;
                Object value = DisplayData.inferType((Object)conf.getValue()) != null ? conf.getValue() : String.valueOf(conf.getValue());
                builder.add(DisplayData.item((String)key, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)value)));
            }
        }

        @VisibleForTesting
        static class GenerateKafkaSourceDescriptor
        extends DoFn<byte[], KafkaSourceDescriptor> {
            private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
            private final List<TopicPartition> topicPartitions;
            private final Instant startReadTime;
            private final Instant stopReadTime;
            @VisibleForTesting
            final Map<String, Object> consumerConfig;
            @VisibleForTesting
            final List<String> topics;

            GenerateKafkaSourceDescriptor(Read read) {
                this.consumerConfig = read.getConsumerConfig();
                this.consumerFactoryFn = read.getConsumerFactoryFn();
                this.topics = read.getTopics();
                this.topicPartitions = read.getTopicPartitions();
                this.startReadTime = read.getStartReadTime();
                this.stopReadTime = read.getStopReadTime();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.OutputReceiver<KafkaSourceDescriptor> receiver) {
                ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>(this.topicPartitions);
                if (partitions.isEmpty()) {
                    try (Consumer consumer = (Consumer)this.consumerFactoryFn.apply(this.consumerConfig);){
                        for (String topic : this.topics) {
                            for (PartitionInfo p : consumer.partitionsFor(topic)) {
                                partitions.add(new TopicPartition(p.topic(), p.partition()));
                            }
                        }
                    }
                }
                for (TopicPartition topicPartition : partitions) {
                    receiver.output((Object)KafkaSourceDescriptor.of(topicPartition, null, this.startReadTime, null, this.stopReadTime, null));
                }
            }
        }

        static class ReadFromKafkaViaSDF<K, V>
        extends AbstractReadFromKafka<K, V> {
            ReadFromKafkaViaSDF(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V> valueCoder) {
                super(kafkaRead, keyCoder, valueCoder, KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.SDF);
            }

            public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
                PCollection output;
                ReadSourceDescriptors readTransform = ReadSourceDescriptors.read().withConsumerConfigOverrides(this.kafkaRead.getConsumerConfig()).withOffsetConsumerConfigOverrides(this.kafkaRead.getOffsetConsumerConfig()).withConsumerFactoryFn(this.kafkaRead.getConsumerFactoryFn()).withKeyDeserializerProvider(this.kafkaRead.getKeyDeserializerProvider()).withValueDeserializerProvider(this.kafkaRead.getValueDeserializerProvider()).withManualWatermarkEstimator().withTimestampPolicyFactory(this.kafkaRead.getTimestampPolicyFactory()).withCheckStopReadingFn(this.kafkaRead.getCheckStopReadingFn());
                if (this.kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
                    readTransform = readTransform.commitOffsets();
                }
                if (this.kafkaRead.isDynamicRead()) {
                    HashSet<String> topics = new HashSet<String>();
                    if (this.kafkaRead.getTopics() != null && this.kafkaRead.getTopics().size() > 0) {
                        topics.addAll(this.kafkaRead.getTopics());
                    }
                    if (this.kafkaRead.getTopicPartitions() != null && this.kafkaRead.getTopicPartitions().size() > 0) {
                        for (TopicPartition topicPartition : this.kafkaRead.getTopicPartitions()) {
                            topics.add(topicPartition.topic());
                        }
                    }
                    output = (PCollection)((PCollection)((PCollection)input.getPipeline().apply((PTransform)Impulse.create())).apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.kvs((TypeDescriptor)new TypeDescriptor<byte[]>(){}, (TypeDescriptor)new TypeDescriptor<byte[]>(){})).via((SerializableFunction & Serializable)element -> KV.of((Object)element, (Object)element)))).apply((PTransform)ParDo.of((DoFn)new WatchKafkaTopicPartitionDoFn(this.kafkaRead.getWatchTopicPartitionDuration(), this.kafkaRead.getConsumerFactoryFn(), this.kafkaRead.getCheckStopReadingFn(), this.kafkaRead.getConsumerConfig(), this.kafkaRead.getStartReadTime(), this.kafkaRead.getStopReadTime(), topics.stream().collect(Collectors.toList()))));
                } else {
                    output = (PCollection)((PCollection)input.getPipeline().apply((PTransform)Impulse.create())).apply((PTransform)ParDo.of((DoFn)new GenerateKafkaSourceDescriptor(this.kafkaRead)));
                }
                return ((PCollection)output.apply(readTransform)).setCoder(KafkaRecordCoder.of(this.keyCoder, this.valueCoder));
            }
        }

        static class ReadFromKafkaViaUnbounded<K, V>
        extends AbstractReadFromKafka<K, V> {
            ReadFromKafkaViaUnbounded(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V> valueCoder) {
                super(kafkaRead, keyCoder, valueCoder, KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY);
            }

            public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
                Read.Unbounded unbounded;
                Read.Unbounded transform = unbounded = org.apache.beam.sdk.io.Read.from(this.kafkaRead.toBuilder().setKeyCoder(this.keyCoder).setValueCoder(this.valueCoder).build().makeSource());
                if (this.kafkaRead.getMaxNumRecords() < Long.MAX_VALUE || this.kafkaRead.getMaxReadTime() != null) {
                    transform = unbounded.withMaxReadTime(this.kafkaRead.getMaxReadTime()).withMaxNumRecords(this.kafkaRead.getMaxNumRecords());
                }
                return (PCollection)input.getPipeline().apply((PTransform)transform);
            }
        }

        private static abstract class AbstractReadFromKafka<K, V>
        extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
            Read<K, V> kafkaRead;
            Coder<K> keyCoder;
            Coder<V> valueCoder;

            AbstractReadFromKafka(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V> valueCoder, KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation implementation) {
                KafkaIOReadImplementationCompatibility.getCompatibility(kafkaRead).checkSupport(implementation);
                this.kafkaRead = kafkaRead;
                this.keyCoder = keyCoder;
                this.valueCoder = valueCoder;
            }
        }

        private static class KafkaReadOverrideFactory<K, V>
        implements PTransformOverrideFactory<PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>> {
            private KafkaReadOverrideFactory() {
            }

            public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>> transform) {
                return PTransformOverrideFactory.PTransformReplacement.of((PInput)transform.getPipeline().begin(), new ReadFromKafkaViaUnbounded(((ReadFromKafkaViaSDF)transform.getTransform()).kafkaRead, ((ReadFromKafkaViaSDF)transform.getTransform()).keyCoder, ((ReadFromKafkaViaSDF)transform.getTransform()).valueCoder));
            }

            public Map<PCollection<?>, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K, V>> newOutput) {
                return ReplacementOutputs.singleton(outputs, newOutput);
            }
        }

        @Experimental(value=Experimental.Kind.PORTABILITY)
        @AutoService(value={ExternalTransformRegistrar.class})
        public static class External
        implements ExternalTransformRegistrar {
            public static final String URN_WITH_METADATA = "beam:transform:org.apache.beam:kafka_read_with_metadata:v1";
            public static final String URN_WITHOUT_METADATA = "beam:transform:org.apache.beam:kafka_read_without_metadata:v1";

            public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
                return ImmutableMap.of((Object)URN_WITH_METADATA, RowsWithMetadata.Builder.class, (Object)URN_WITHOUT_METADATA, TypedWithoutMetadata.Builder.class);
            }

            public static class Configuration {
                private Map<String, String> consumerConfig;
                private List<String> topics;
                private String keyDeserializer;
                private String valueDeserializer;
                private Long startReadTime;
                private Long stopReadTime;
                private Long maxNumRecords;
                private Long maxReadTime;
                private Boolean commitOffsetInFinalize;
                private String timestampPolicy;

                public void setConsumerConfig(Map<String, String> consumerConfig) {
                    this.consumerConfig = consumerConfig;
                }

                public void setTopics(List<String> topics) {
                    this.topics = topics;
                }

                public void setKeyDeserializer(String keyDeserializer) {
                    this.keyDeserializer = keyDeserializer;
                }

                public void setValueDeserializer(String valueDeserializer) {
                    this.valueDeserializer = valueDeserializer;
                }

                public void setStartReadTime(Long startReadTime) {
                    this.startReadTime = startReadTime;
                }

                public void setStopReadTime(Long stopReadTime) {
                    this.stopReadTime = stopReadTime;
                }

                public void setMaxNumRecords(Long maxNumRecords) {
                    this.maxNumRecords = maxNumRecords;
                }

                public void setMaxReadTime(Long maxReadTime) {
                    this.maxReadTime = maxReadTime;
                }

                public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
                    this.commitOffsetInFinalize = commitOffsetInFinalize;
                }

                public void setTimestampPolicy(String timestampPolicy) {
                    this.timestampPolicy = timestampPolicy;
                }
            }
        }

        @Experimental(value=Experimental.Kind.PORTABILITY)
        @AutoValue.Builder
        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setConsumerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setTopics(List<String> var1);

            abstract Builder<K, V> setTopicPartitions(List<TopicPartition> var1);

            abstract Builder<K, V> setKeyCoder(Coder<K> var1);

            abstract Builder<K, V> setValueCoder(Coder<V> var1);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> var1);

            abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> var1);

            abstract Builder<K, V> setMaxNumRecords(long var1);

            abstract Builder<K, V> setMaxReadTime(Duration var1);

            abstract Builder<K, V> setStartReadTime(Instant var1);

            abstract Builder<K, V> setStopReadTime(Instant var1);

            abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean var1);

            abstract Builder<K, V> setDynamicRead(boolean var1);

            abstract Builder<K, V> setWatchTopicPartitionDuration(Duration var1);

            abstract Builder<K, V> setTimestampPolicyFactory(TimestampPolicyFactory<K, V> var1);

            abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider var1);

            abstract Builder<K, V> setValueDeserializerProvider(DeserializerProvider var1);

            abstract Builder<K, V> setCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> var1);

            abstract Read<K, V> build();

            static void setupExternalBuilder(Builder builder, External.Configuration config) {
                ImmutableList.Builder listBuilder = ImmutableList.builder();
                for (String topic : config.topics) {
                    listBuilder.add((Object)topic);
                }
                builder.setTopics((List<String>)listBuilder.build());
                Class keyDeserializer = KafkaIO.resolveClass(config.keyDeserializer);
                builder.setKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
                builder.setKeyCoder(Builder.resolveCoder(keyDeserializer));
                Class valueDeserializer = KafkaIO.resolveClass(config.valueDeserializer);
                builder.setValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
                builder.setValueCoder(Builder.resolveCoder(valueDeserializer));
                HashMap<String, Object> consumerConfig = new HashMap<String, Object>(config.consumerConfig);
                consumerConfig.put("key.deserializer", keyDeserializer.getName());
                consumerConfig.put("value.deserializer", valueDeserializer.getName());
                builder.setConsumerConfig(consumerConfig);
                builder.setTopicPartitions(Collections.emptyList());
                builder.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN);
                if (config.maxReadTime != null) {
                    builder.setMaxReadTime(Duration.standardSeconds((long)config.maxReadTime));
                }
                builder.setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords);
                builder.setCommitOffsetsInFinalizeEnabled(config.commitOffsetInFinalize);
                String timestampPolicy = config.timestampPolicy;
                if (timestampPolicy.equals("ProcessingTime")) {
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
                } else if (timestampPolicy.equals("CreateTime")) {
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO));
                } else if (timestampPolicy.equals("LogAppendTime")) {
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
                } else {
                    throw new IllegalArgumentException("timestampPolicy should be one of (ProcessingTime, CreateTime, LogAppendTime)");
                }
                if (config.startReadTime != null) {
                    builder.setStartReadTime(Instant.ofEpochMilli((long)config.startReadTime));
                }
                if (config.stopReadTime != null) {
                    builder.setStopReadTime(Instant.ofEpochMilli((long)config.stopReadTime));
                }
                builder.setDynamicRead(false);
            }

            private static Coder resolveCoder(Class deserializer) {
                for (Method method : deserializer.getDeclaredMethods()) {
                    Class<?> returnType;
                    if (!method.getName().equals("deserialize") || (returnType = method.getReturnType()).equals(Object.class)) continue;
                    if (returnType.equals(byte[].class)) {
                        return NullableCoder.of((Coder)ByteArrayCoder.of());
                    }
                    if (returnType.equals(Integer.class)) {
                        return NullableCoder.of((Coder)VarIntCoder.of());
                    }
                    if (returnType.equals(Long.class)) {
                        return NullableCoder.of((Coder)VarLongCoder.of());
                    }
                    throw new RuntimeException("Couldn't infer Coder from " + deserializer);
                }
                throw new RuntimeException("Couldn't resolve coder for Deserializer: " + deserializer);
            }
        }
    }
}

