/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaWriteSchemaTransformProvider_KafkaWriteSchemaTransformConfiguration;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.construction.BeamUrns;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(value={SchemaTransformProvider.class})
public class KafkaWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<KafkaWriteSchemaTransformConfiguration> {
    public static final @UnknownKeyFor @NonNull @Initialized String SUPPORTED_FORMATS_STR = "RAW,JSON,AVRO,PROTO";
    public static final @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> SUPPORTED_FORMATS = Sets.newHashSet((Object[])"RAW,JSON,AVRO,PROTO".split(","));
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Row> ERROR_TAG = new TupleTag<Row>(){};
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> OUTPUT_TAG = new TupleTag<KV<byte[], byte[]>>(){};
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized GenericRecord>> RECORD_OUTPUT_TAG = new TupleTag<KV<byte[], GenericRecord>>(){};
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(KafkaWriteSchemaTransformProvider.class);

    protected @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized KafkaWriteSchemaTransformConfiguration> configurationClass() {
        return KafkaWriteSchemaTransformConfiguration.class;
    }

    protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(@UnknownKeyFor @NonNull @Initialized KafkaWriteSchemaTransformConfiguration configuration) {
        if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) {
            throw new IllegalArgumentException("Format " + configuration.getFormat() + " is not supported. Supported formats are: " + String.join((CharSequence)", ", SUPPORTED_FORMATS));
        }
        return new KafkaWriteSchemaTransform(configuration);
    }

    public static @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Row, @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> getRowToRawBytesFunction(final @UnknownKeyFor @NonNull @Initialized String rowFieldName) {
        return new SimpleFunction<Row, byte[]>(){

            public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] apply(@UnknownKeyFor @NonNull @Initialized Row input) {
                byte[] rawBytes = input.getBytes(rowFieldName);
                if (rawBytes == null) {
                    throw new NullPointerException();
                }
                return rawBytes;
            }
        };
    }

    public @UnknownKeyFor @NonNull @Initialized String identifier() {
        return BeamUrns.getUrn((ProtocolMessageEnum)ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE);
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() {
        return Collections.singletonList("input");
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() {
        return Collections.emptyList();
    }

    @DefaultSchema(value=AutoValueSchema.class)
    @AutoValue
    public static abstract class KafkaWriteSchemaTransformConfiguration
    implements Serializable {
        @SchemaFieldDescription(value="The encoding format for the data stored in Kafka. Valid options are: RAW,JSON,AVRO,PROTO")
        @SchemaFieldNumber(value="0")
        public abstract @UnknownKeyFor @NonNull @Initialized String getFormat();

        @SchemaFieldNumber(value="1")
        public abstract @UnknownKeyFor @NonNull @Initialized String getTopic();

        @SchemaFieldDescription(value="A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping\u2014this list only impacts the initial hosts used to discover the full set of servers. | Format: host1:port1,host2:port2,...")
        @SchemaFieldNumber(value="2")
        public abstract @UnknownKeyFor @NonNull @Initialized String getBootstrapServers();

        @SchemaFieldDescription(value="A list of key-value pairs that act as configuration parameters for Kafka producers. Most of these configurations will not be needed, but if you need to customize your Kafka producer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html")
        @SchemaFieldNumber(value="3")
        @javax.annotation.Nullable
        public abstract @UnknownKeyFor @Nullable @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> getProducerConfigUpdates();

        @SchemaFieldDescription(value="This option specifies whether and where to output unwritable rows.")
        @SchemaFieldNumber(value="4")
        @javax.annotation.Nullable
        public abstract @UnknownKeyFor @Nullable @Initialized ErrorHandling getErrorHandling();

        @SchemaFieldDescription(value="The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization.")
        @SchemaFieldNumber(value="5")
        @javax.annotation.Nullable
        public abstract @UnknownKeyFor @Nullable @Initialized String getFileDescriptorPath();

        @SchemaFieldDescription(value="The name of the Protocol Buffer message to be used for schema extraction and data conversion.")
        @SchemaFieldNumber(value="6")
        @javax.annotation.Nullable
        public abstract @UnknownKeyFor @Nullable @Initialized String getMessageName();

        @SchemaFieldNumber(value="7")
        @javax.annotation.Nullable
        public abstract @UnknownKeyFor @Nullable @Initialized String getSchema();

        public static @UnknownKeyFor @NonNull @Initialized Builder builder() {
            return new AutoValue_KafkaWriteSchemaTransformProvider_KafkaWriteSchemaTransformConfiguration.Builder();
        }

        @AutoValue.Builder
        public static abstract class Builder {
            public abstract @UnknownKeyFor @NonNull @Initialized Builder setFormat(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setTopic(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setBootstrapServers(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setProducerConfigUpdates(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setErrorHandling(@UnknownKeyFor @NonNull @Initialized ErrorHandling var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setFileDescriptorPath(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setMessageName(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setSchema(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized KafkaWriteSchemaTransformConfiguration build();
        }
    }

    static final class KafkaWriteSchemaTransform
    extends SchemaTransform
    implements Serializable {
        final @UnknownKeyFor @NonNull @Initialized KafkaWriteSchemaTransformConfiguration configuration;

        KafkaWriteSchemaTransform(@UnknownKeyFor @NonNull @Initialized KafkaWriteSchemaTransformConfiguration configuration) {
            this.configuration = configuration;
        }

        @UnknownKeyFor @NonNull @Initialized Row getConfigurationRow() {
            try {
                return ((Row)SchemaRegistry.createDefault().getToRowFunction(KafkaWriteSchemaTransformConfiguration.class).apply((Object)this.configuration)).sorted().toSnakeCase();
            }
            catch (NoSuchSchemaException e) {
                throw new RuntimeException(e);
            }
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple expand(@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple input) {
            PCollectionTuple outputTuple;
            SerializableFunction toBytesFn;
            Schema inputSchema = input.get("input").getSchema();
            org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema((Schema)inputSchema);
            SerializableFunction toGenericRecordsFn = null;
            if (this.configuration.getFormat().equals("RAW")) {
                int numFields = inputSchema.getFields().size();
                if (numFields != 1) {
                    throw new IllegalArgumentException("Expecting exactly one field, found " + numFields);
                }
                if (!inputSchema.getField(0).getType().equals((Object)Schema.FieldType.BYTES)) {
                    throw new IllegalArgumentException("The input schema must have exactly one field of type byte.");
                }
                toBytesFn = KafkaWriteSchemaTransformProvider.getRowToRawBytesFunction(inputSchema.getField(0).getName());
            } else if (this.configuration.getFormat().equals("JSON")) {
                toBytesFn = JsonUtils.getRowToJsonBytesFunction((Schema)inputSchema);
            } else if (this.configuration.getFormat().equals("PROTO")) {
                String descriptorPath = this.configuration.getFileDescriptorPath();
                String schema = this.configuration.getSchema();
                String messageName = this.configuration.getMessageName();
                if (messageName == null) {
                    throw new IllegalArgumentException("Expecting messageName to be non-null.");
                }
                if (descriptorPath != null && schema != null) {
                    throw new IllegalArgumentException("You must include a descriptorPath or a proto Schema but not both.");
                }
                if (descriptorPath != null) {
                    toBytesFn = ProtoByteUtils.getRowToProtoBytes((String)descriptorPath, (String)messageName);
                } else {
                    if (schema == null) throw new IllegalArgumentException("At least a descriptorPath or a proto Schema is required.");
                    toBytesFn = ProtoByteUtils.getRowToProtoBytesFromSchema((String)schema, (String)messageName);
                }
            } else if (this.configuration.getProducerConfigUpdates() != null && this.configuration.getProducerConfigUpdates().containsKey("schema.registry.url")) {
                toGenericRecordsFn = AvroUtils.getRowToGenericRecordFunction((org.apache.avro.Schema)avroSchema);
                toBytesFn = null;
            } else {
                toBytesFn = AvroUtils.getRowToAvroBytesFunction((Schema)inputSchema);
            }
            boolean handleErrors = ErrorHandling.hasOutput((ErrorHandling)this.configuration.getErrorHandling());
            Map<String, String> configOverrides = this.configuration.getProducerConfigUpdates();
            Schema errorSchema = ErrorHandling.errorSchema((Schema)inputSchema);
            if (toGenericRecordsFn != null) {
                LOG.info("Convert to GenericRecord with schema {}", (Object)avroSchema);
                outputTuple = (PCollectionTuple)input.get("input").apply("Map rows to Kafka messages", (PTransform)ParDo.of((DoFn)new GenericRecordErrorCounterFn("Kafka-write-error-counter", (SerializableFunction<Row, GenericRecord>)toGenericRecordsFn, errorSchema, handleErrors)).withOutputTags(RECORD_OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
                HashMap<String, Object> producerConfig = new HashMap<String, Object>(configOverrides);
                outputTuple.get(RECORD_OUTPUT_TAG).setCoder((Coder)KvCoder.of((Coder)ByteArrayCoder.of(), (Coder)AvroCoder.of((org.apache.avro.Schema)avroSchema))).apply("Map Rows to GenericRecords", KafkaIO.write().withTopic(this.configuration.getTopic()).withBootstrapServers(this.configuration.getBootstrapServers()).withProducerConfigUpdates(producerConfig).withKeySerializer(ByteArraySerializer.class).withValueSerializer(KafkaAvroSerializer.class));
            } else {
                outputTuple = (PCollectionTuple)input.get("input").apply("Map rows to Kafka messages", (PTransform)ParDo.of((DoFn)new ErrorCounterFn("Kafka-write-error-counter", (SerializableFunction<Row, byte[]>)toBytesFn, errorSchema, handleErrors)).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
                outputTuple.get(OUTPUT_TAG).apply(KafkaIO.write().withTopic(this.configuration.getTopic()).withBootstrapServers(this.configuration.getBootstrapServers()).withProducerConfigUpdates(configOverrides == null ? new HashMap<String, Object>() : new HashMap<String, String>(configOverrides)).withKeySerializer(ByteArraySerializer.class).withValueSerializer(ByteArraySerializer.class));
            }
            PCollection errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(ErrorHandling.errorSchema((Schema)errorSchema));
            return PCollectionRowTuple.of((String)(handleErrors ? this.configuration.getErrorHandling().getOutput() : "errors"), (PCollection)errorOutput);
        }

        public static class GenericRecordErrorCounterFn
        extends BaseKafkaWriterFn<GenericRecord> {
            public GenericRecordErrorCounterFn(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Row, @UnknownKeyFor @NonNull @Initialized GenericRecord> toGenericRecordsFn, @UnknownKeyFor @NonNull @Initialized Schema errorSchema, @UnknownKeyFor @NonNull @Initialized boolean handleErrors) {
                super(name, toGenericRecordsFn, errorSchema, handleErrors, RECORD_OUTPUT_TAG);
            }
        }

        public static class ErrorCounterFn
        extends BaseKafkaWriterFn<byte[]> {
            public ErrorCounterFn(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Row, @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> toBytesFn, @UnknownKeyFor @NonNull @Initialized Schema errorSchema, @UnknownKeyFor @NonNull @Initialized boolean handleErrors) {
                super(name, toBytesFn, errorSchema, handleErrors, OUTPUT_TAG);
            }
        }

        public static abstract class BaseKafkaWriterFn<@UnknownKeyFor T>
        extends DoFn<Row, KV<byte[], T>> {
            private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Row, T> conversionFn;
            private final @UnknownKeyFor @NonNull @Initialized Counter errorCounter;
            private @UnknownKeyFor @NonNull @Initialized Long errorsInBundle = 0L;
            private final @UnknownKeyFor @NonNull @Initialized boolean handleErrors;
            private final @UnknownKeyFor @NonNull @Initialized Schema errorSchema;
            private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], T>> successTag;

            public BaseKafkaWriterFn(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Row, T> conversionFn, @UnknownKeyFor @NonNull @Initialized Schema errorSchema, @UnknownKeyFor @NonNull @Initialized boolean handleErrors, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], T>> successTag) {
                this.conversionFn = conversionFn;
                this.errorCounter = Metrics.counter(KafkaWriteSchemaTransformProvider.class, (String)name);
                this.handleErrors = handleErrors;
                this.errorSchema = errorSchema;
                this.successTag = successTag;
            }

            @DoFn.ProcessElement
            public void process(@DoFn.Element @UnknownKeyFor @NonNull @Initialized Row row, // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver receiver) {
                KV output = null;
                try {
                    output = KV.of((Object)new byte[1], (Object)this.conversionFn.apply((Object)row));
                }
                catch (Exception e) {
                    if (!this.handleErrors) {
                        throw new RuntimeException(e);
                    }
                    this.errorsInBundle = this.errorsInBundle + 1L;
                    LOG.warn("Error while processing the element", (Throwable)e);
                    receiver.get(ERROR_TAG).output((Object)ErrorHandling.errorRecord((Schema)this.errorSchema, (Row)row, (Throwable)e));
                }
                if (output != null) {
                    receiver.get(this.successTag).output((Object)output);
                }
            }

            @DoFn.FinishBundle
            public void finish() {
                this.errorCounter.inc(this.errorsInBundle.longValue());
                this.errorsInBundle = 0L;
            }
        }
    }
}

