package org.apache.beam.sdk.io.kafka;

import com.google.auto.service.AutoService;
import java.util.List;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.Duration;

@AutoService({SchemaTransformProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.class */
public class KafkaReadSchemaTransformProvider extends TypedSchemaTransformProvider<KafkaReadSchemaTransformConfiguration> {
    final Boolean isTest;
    final Integer testTimeoutSecs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider$KafkaReadSchemaTransform.class */
    public static class KafkaReadSchemaTransform implements SchemaTransform {
        private final KafkaReadSchemaTransformConfiguration configuration;
        private final Boolean isTest;
        private final Integer testTimeoutSeconds;
        static final /* synthetic */ boolean $assertionsDisabled;

        KafkaReadSchemaTransform(KafkaReadSchemaTransformConfiguration kafkaReadSchemaTransformConfiguration, Boolean bool, Integer num) {
            kafkaReadSchemaTransformConfiguration.validate();
            this.configuration = kafkaReadSchemaTransformConfiguration;
            this.isTest = bool;
            this.testTimeoutSeconds = num;
        }

        public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
            String schema = this.configuration.getSchema();
            final Integer valueOf = Integer.valueOf(this.configuration.hashCode() % Integer.MAX_VALUE);
            final String autoOffsetResetConfig = this.configuration.getAutoOffsetResetConfig() == null ? "latest" : this.configuration.getAutoOffsetResetConfig();
            if (schema == null) {
                return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() { // from class: org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.KafkaReadSchemaTransform.2
                    static final /* synthetic */ boolean $assertionsDisabled;

                    public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
                        String confluentSchemaRegistryUrl = KafkaReadSchemaTransform.this.configuration.getConfluentSchemaRegistryUrl();
                        String confluentSchemaRegistrySubject = KafkaReadSchemaTransform.this.configuration.getConfluentSchemaRegistrySubject();
                        if (confluentSchemaRegistryUrl == null || confluentSchemaRegistrySubject == null) {
                            throw new IllegalArgumentException("To read from Kafka, a schema must be provided directly or though Confluent Schema Registry. Make sure you are providing one of these parameters.");
                        }
                        KafkaIO.Read withValueDeserializer = KafkaIO.read().withTopic(KafkaReadSchemaTransform.this.configuration.getTopic()).withBootstrapServers(KafkaReadSchemaTransform.this.configuration.getBootstrapServers()).withConsumerConfigUpdates(ImmutableMap.of("group.id", "kafka-read-provider-" + valueOf, "enable.auto.commit", true, "auto.commit.interval.ms", 100, "auto.offset.reset", autoOffsetResetConfig)).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(confluentSchemaRegistryUrl, confluentSchemaRegistrySubject));
                        if (KafkaReadSchemaTransform.this.isTest.booleanValue()) {
                            withValueDeserializer = withValueDeserializer.withMaxReadTime(Duration.standardSeconds(KafkaReadSchemaTransform.this.testTimeoutSeconds.intValue()));
                        }
                        PCollection apply = pCollectionRowTuple.getPipeline().apply(withValueDeserializer.withoutMetadata()).apply(Values.create());
                        if ($assertionsDisabled || apply.getCoder().getClass() == AvroCoder.class) {
                            return PCollectionRowTuple.of("output", apply.setCoder(AvroUtils.schemaCoder(apply.getCoder().getSchema())).apply(Convert.toRows()));
                        }
                        throw new AssertionError();
                    }

                    static {
                        $assertionsDisabled = !KafkaReadSchemaTransformProvider.class.desiredAssertionStatus();
                    }
                };
            }
            if (!$assertionsDisabled && this.configuration.getConfluentSchemaRegistryUrl() != null) {
                throw new AssertionError("To read from Kafka, a schema must be provided directly or though Confluent Schema Registry, but not both.");
            }
            final Schema beamSchemaFromJsonSchema = Objects.equals(this.configuration.getDataFormat(), "JSON") ? JsonUtils.beamSchemaFromJsonSchema(schema) : AvroUtils.toBeamSchema(new Schema.Parser().parse(schema));
            final SimpleFunction jsonBytesToRowFunction = Objects.equals(this.configuration.getDataFormat(), "JSON") ? JsonUtils.getJsonBytesToRowFunction(beamSchemaFromJsonSchema) : AvroUtils.getAvroBytesToRowFunction(beamSchemaFromJsonSchema);
            return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() { // from class: org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.KafkaReadSchemaTransform.1
                public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
                    KafkaIO.Read<byte[], byte[]> withBootstrapServers = KafkaIO.readBytes().withConsumerConfigUpdates(ImmutableMap.of("group.id", "kafka-read-provider-" + valueOf, "enable.auto.commit", true, "auto.commit.interval.ms", 100, "auto.offset.reset", autoOffsetResetConfig)).withTopic(KafkaReadSchemaTransform.this.configuration.getTopic()).withBootstrapServers(KafkaReadSchemaTransform.this.configuration.getBootstrapServers());
                    if (KafkaReadSchemaTransform.this.isTest.booleanValue()) {
                        withBootstrapServers = withBootstrapServers.withMaxReadTime(Duration.standardSeconds(KafkaReadSchemaTransform.this.testTimeoutSeconds.intValue()));
                    }
                    return PCollectionRowTuple.of("output", pCollectionRowTuple.getPipeline().apply(withBootstrapServers.withoutMetadata()).apply(Values.create()).apply(MapElements.into(TypeDescriptors.rows()).via(jsonBytesToRowFunction)).setRowSchema(beamSchemaFromJsonSchema));
                }
            };
        }

        static {
            $assertionsDisabled = !KafkaReadSchemaTransformProvider.class.desiredAssertionStatus();
        }
    }

    public KafkaReadSchemaTransformProvider() {
        this(false, 0);
    }

    @VisibleForTesting
    KafkaReadSchemaTransformProvider(Boolean bool, Integer num) {
        this.isTest = bool;
        this.testTimeoutSecs = num;
    }

    protected Class<KafkaReadSchemaTransformConfiguration> configurationClass() {
        return KafkaReadSchemaTransformConfiguration.class;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SchemaTransform from(KafkaReadSchemaTransformConfiguration kafkaReadSchemaTransformConfiguration) {
        return new KafkaReadSchemaTransform(kafkaReadSchemaTransformConfiguration, this.isTest, this.testTimeoutSecs);
    }

    public String identifier() {
        return "beam:schematransform:org.apache.beam:kafka_read:v1";
    }

    public List<String> inputCollectionNames() {
        return Lists.newArrayList();
    }

    public List<String> outputCollectionNames() {
        return Lists.newArrayList(new String[]{"output"});
    }
}
