/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.auto.service.AutoService;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformConfiguration;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.construction.BeamUrns;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(value={SchemaTransformProvider.class})
public class KafkaReadSchemaTransformProvider
extends TypedSchemaTransformProvider<KafkaReadSchemaTransformConfiguration> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(KafkaReadSchemaTransformProvider.class);
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Row> OUTPUT_TAG = new TupleTag<Row>(){};
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Row> ERROR_TAG = new TupleTag<Row>(){};

    protected @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized KafkaReadSchemaTransformConfiguration> configurationClass() {
        return KafkaReadSchemaTransformConfiguration.class;
    }

    protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(@UnknownKeyFor @NonNull @Initialized KafkaReadSchemaTransformConfiguration configuration) {
        return new KafkaReadSchemaTransform(configuration);
    }

    public static @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> getRawBytesToRowFunction(final @UnknownKeyFor @NonNull @Initialized Schema rawSchema) {
        return new SimpleFunction<byte[], Row>(){

            public @UnknownKeyFor @NonNull @Initialized Row apply(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] input) {
                return Row.withSchema((Schema)rawSchema).addValue((Object)input).build();
            }
        };
    }

    public static @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> getRawStringToRowFunction(final @UnknownKeyFor @NonNull @Initialized Schema stringSchema) {
        return new SimpleFunction<byte[], Row>(){

            public @UnknownKeyFor @NonNull @Initialized Row apply(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] input) {
                return Row.withSchema((Schema)stringSchema).addValue((Object)new String(input, StandardCharsets.UTF_8)).build();
            }
        };
    }

    public @UnknownKeyFor @NonNull @Initialized String identifier() {
        return BeamUrns.getUrn((ProtocolMessageEnum)ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ);
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() {
        return Lists.newArrayList();
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() {
        return Arrays.asList("output", "errors");
    }

    private static class ConsumerFactoryWithGcsTrustStores
    implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
        private ConsumerFactoryWithGcsTrustStores() {
        }

        public @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> apply(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> input) {
            return (Consumer)KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN.apply(input.entrySet().stream().map(entry -> Maps.immutableEntry((Object)((String)entry.getKey()), (Object)ConsumerFactoryWithGcsTrustStores.identityOrGcsToLocalFile(entry.getValue()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
        }

        private static @UnknownKeyFor @NonNull @Initialized Object identityOrGcsToLocalFile(@UnknownKeyFor @NonNull @Initialized Object configValue) {
            if (configValue instanceof String) {
                String configStr = (String)configValue;
                if (configStr.startsWith("gs://")) {
                    try {
                        Path localFile = Files.createTempFile("", "", new FileAttribute[0]);
                        LOG.info("Downloading {} into local filesystem ({})", (Object)configStr, (Object)localFile.toAbsolutePath());
                        ReadableByteChannel channel = FileSystems.open((ResourceId)((MatchResult.Metadata)FileSystems.match((String)configStr).metadata().get(0)).resourceId());
                        FileOutputStream outputStream = new FileOutputStream(localFile.toFile());
                        WritableByteChannel outputChannel = Channels.newChannel(outputStream);
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        while (channel.read(buffer) != -1) {
                            buffer.flip();
                            outputChannel.write(buffer);
                            buffer.compact();
                        }
                        channel.close();
                        outputChannel.close();
                        outputStream.close();
                        return localFile.toAbsolutePath().toString();
                    }
                    catch (IOException e) {
                        throw new IllegalArgumentException(String.format("Unable to fetch file %s to be used locally to create a Kafka Consumer.", configStr));
                    }
                }
                return configValue;
            }
            return configValue;
        }
    }

    public static class ErrorFn
    extends DoFn<byte[], Row> {
        private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> valueMapper;
        private final @UnknownKeyFor @NonNull @Initialized Counter errorCounter;
        private @UnknownKeyFor @NonNull @Initialized Long errorsInBundle = 0L;
        private final @UnknownKeyFor @NonNull @Initialized boolean handleErrors;
        private final @UnknownKeyFor @NonNull @Initialized Schema errorSchema;

        public ErrorFn(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> valueMapper, @UnknownKeyFor @NonNull @Initialized Schema errorSchema, @UnknownKeyFor @NonNull @Initialized boolean handleErrors) {
            this.errorCounter = Metrics.counter(KafkaReadSchemaTransformProvider.class, (String)name);
            this.valueMapper = valueMapper;
            this.handleErrors = handleErrors;
            this.errorSchema = errorSchema;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] msg, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver receiver) {
            Row mappedRow = null;
            try {
                mappedRow = (Row)this.valueMapper.apply((Object)msg);
            }
            catch (Exception e) {
                if (!this.handleErrors) {
                    throw new RuntimeException(e);
                }
                this.errorsInBundle = this.errorsInBundle + 1L;
                LOG.warn("Error while parsing the element", (Throwable)e);
                receiver.get(ERROR_TAG).output((Object)ErrorHandling.errorRecord((Schema)this.errorSchema, (byte[])msg, (Throwable)e));
            }
            if (mappedRow != null) {
                receiver.get(OUTPUT_TAG).output((Object)mappedRow);
            }
        }

        @DoFn.FinishBundle
        public void finish(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized FinishBundleContext c) {
            this.errorCounter.inc(this.errorsInBundle.longValue());
            this.errorsInBundle = 0L;
        }
    }

    static class KafkaReadSchemaTransform
    extends SchemaTransform {
        private final @UnknownKeyFor @NonNull @Initialized KafkaReadSchemaTransformConfiguration configuration;

        KafkaReadSchemaTransform(@UnknownKeyFor @NonNull @Initialized KafkaReadSchemaTransformConfiguration configuration) {
            this.configuration = configuration;
        }

        @UnknownKeyFor @NonNull @Initialized Row getConfigurationRow() {
            try {
                return ((Row)SchemaRegistry.createDefault().getToRowFunction(KafkaReadSchemaTransformConfiguration.class).apply((Object)this.configuration)).sorted().toSnakeCase();
            }
            catch (NoSuchSchemaException e) {
                throw new RuntimeException(e);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple expand(@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple input) {
            SimpleFunction valueMapper;
            Schema beamSchema;
            this.configuration.validate();
            String inputSchema = this.configuration.getSchema();
            int groupId = this.configuration.hashCode() % Integer.MAX_VALUE;
            String autoOffsetReset = (String)MoreObjects.firstNonNull((Object)this.configuration.getAutoOffsetResetConfig(), (Object)"latest");
            HashMap<String, Object> consumerConfigs = new HashMap<String, Object>((Map)MoreObjects.firstNonNull(this.configuration.getConsumerConfigUpdates(), new HashMap()));
            consumerConfigs.putIfAbsent("group.id", "kafka-read-provider-" + groupId);
            consumerConfigs.putIfAbsent("enable.auto.commit", true);
            consumerConfigs.putIfAbsent("auto.commit.interval.ms", 100);
            consumerConfigs.putIfAbsent("auto.offset.reset", autoOffsetReset);
            String format = this.configuration.getFormat();
            boolean handleErrors = ErrorHandling.hasOutput((ErrorHandling)this.configuration.getErrorHandling());
            String confluentSchemaRegUrl = this.configuration.getConfluentSchemaRegistryUrl();
            if (confluentSchemaRegUrl != null) {
                String confluentSchemaRegSubject = (String)Preconditions.checkArgumentNotNull((Object)this.configuration.getConfluentSchemaRegistrySubject());
                KafkaIO.Read kafkaRead = KafkaIO.read().withTopic(this.configuration.getTopic()).withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()).withBootstrapServers(this.configuration.getBootstrapServers()).withConsumerConfigUpdates(consumerConfigs).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(confluentSchemaRegUrl, confluentSchemaRegSubject));
                Integer maxReadTimeSeconds = this.configuration.getMaxReadTimeSeconds();
                if (maxReadTimeSeconds != null) {
                    kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds((long)maxReadTimeSeconds.intValue()));
                }
                PCollection kafkaValues = (PCollection)((PCollection)input.getPipeline().apply(kafkaRead.withoutMetadata())).apply((PTransform)Values.create());
                assert (kafkaValues.getCoder() instanceof AvroCoder);
                AvroCoder coder = (AvroCoder)kafkaValues.getCoder();
                kafkaValues = kafkaValues.setCoder((Coder)AvroUtils.schemaCoder((org.apache.avro.Schema)coder.getSchema()));
                return PCollectionRowTuple.of((String)"output", (PCollection)((PCollection)kafkaValues.apply(Convert.toRows())));
            }
            if ("RAW".equals(format)) {
                beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
                valueMapper = KafkaReadSchemaTransformProvider.getRawBytesToRowFunction(beamSchema);
            } else if ("STRING".equals(format)) {
                beamSchema = Schema.builder().addField("payload", Schema.FieldType.STRING).build();
                valueMapper = KafkaReadSchemaTransformProvider.getRawStringToRowFunction(beamSchema);
            } else if ("PROTO".equals(format)) {
                String fileDescriptorPath = this.configuration.getFileDescriptorPath();
                String messageName = (String)Preconditions.checkArgumentNotNull((Object)this.configuration.getMessageName());
                if (fileDescriptorPath != null) {
                    beamSchema = ProtoByteUtils.getBeamSchemaFromProto((String)fileDescriptorPath, (String)messageName);
                    valueMapper = ProtoByteUtils.getProtoBytesToRowFunction((String)fileDescriptorPath, (String)messageName);
                } else {
                    beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema((String)((String)Preconditions.checkArgumentNotNull((Object)inputSchema)), (String)messageName);
                    valueMapper = ProtoByteUtils.getProtoBytesToRowFromSchemaFunction((String)((String)Preconditions.checkArgumentNotNull((Object)inputSchema)), (String)messageName);
                }
            } else if ("JSON".equals(format)) {
                beamSchema = JsonUtils.beamSchemaFromJsonSchema((String)((String)Preconditions.checkArgumentNotNull((Object)inputSchema)));
                valueMapper = JsonUtils.getJsonBytesToRowFunction((Schema)beamSchema);
            } else {
                beamSchema = AvroUtils.toBeamSchema((org.apache.avro.Schema)new Schema.Parser().parse((String)Preconditions.checkArgumentNotNull((Object)inputSchema)));
                valueMapper = AvroUtils.getAvroBytesToRowFunction((Schema)beamSchema);
            }
            KafkaIO.Read<byte[], byte[]> kafkaRead = KafkaIO.readBytes().withConsumerConfigUpdates(consumerConfigs).withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()).withTopic(this.configuration.getTopic()).withBootstrapServers(this.configuration.getBootstrapServers());
            Integer maxReadTimeSeconds = this.configuration.getMaxReadTimeSeconds();
            if (maxReadTimeSeconds != null) {
                kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds((long)maxReadTimeSeconds.intValue()));
            }
            PCollection kafkaValues = (PCollection)((PCollection)input.getPipeline().apply(kafkaRead.withoutMetadata())).apply((PTransform)Values.create());
            Schema errorSchema = ErrorHandling.errorSchemaBytes();
            PCollectionTuple outputTuple = (PCollectionTuple)kafkaValues.apply((PTransform)ParDo.of((DoFn)new ErrorFn("Kafka-read-error-counter", (SerializableFunction<byte[], Row>)valueMapper, errorSchema, handleErrors)).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
            PCollectionRowTuple outputRows = PCollectionRowTuple.of((String)"output", (PCollection)outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema));
            PCollection errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);
            if (handleErrors) {
                outputRows = outputRows.and(((ErrorHandling)Preconditions.checkArgumentNotNull((Object)this.configuration.getErrorHandling())).getOutput(), errorOutput);
            }
            return outputRows;
        }
    }
}

