/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubReadSchemaTransformConfiguration;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@AutoService(value={SchemaTransformProvider.class})
public class PubsubReadSchemaTransformProvider
extends TypedSchemaTransformProvider<PubsubReadSchemaTransformConfiguration> {
    public static final @UnknownKeyFor @NonNull @Initialized String VALID_FORMATS_STR = "AVRO,JSON";
    public static final @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> VALID_DATA_FORMATS = Sets.newHashSet((Object[])"AVRO,JSON".split(","));
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Row> OUTPUT_TAG = new TupleTag<Row>(){};
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Row> ERROR_TAG = new TupleTag<Row>(){};
    public static final @UnknownKeyFor @NonNull @Initialized Schema ERROR_SCHEMA = Schema.builder().addStringField("error").addNullableByteArrayField("row").build();

    public @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized PubsubReadSchemaTransformConfiguration> configurationClass() {
        return PubsubReadSchemaTransformConfiguration.class;
    }

    public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(@UnknownKeyFor @NonNull @Initialized PubsubReadSchemaTransformConfiguration configuration) {
        if (configuration.getSubscription() == null && configuration.getTopic() == null) {
            throw new IllegalArgumentException("To read from Pubsub, a subscription name or a topic name must be provided");
        }
        if (configuration.getSubscription() != null && configuration.getTopic() != null) {
            throw new IllegalArgumentException("To read from Pubsub, a subscription name or a topic name must be provided. Not both.");
        }
        if (Strings.isNullOrEmpty((String)configuration.getSchema()) && !Strings.isNullOrEmpty((String)configuration.getFormat()) || !Strings.isNullOrEmpty((String)configuration.getSchema()) && Strings.isNullOrEmpty((String)configuration.getFormat())) {
            throw new IllegalArgumentException("A schema was provided without a data format (or viceversa). Please provide both of these parameters to read from Pubsub, or if you would like to use the Pubsub schema service, please leave both of these blank.");
        }
        if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
            throw new IllegalArgumentException(String.format("Format %s not supported. Only supported formats are %s", configuration.getFormat(), VALID_FORMATS_STR));
        }
        Schema beamSchema = Objects.equals(configuration.getFormat(), "JSON") ? JsonUtils.beamSchemaFromJsonSchema((String)configuration.getSchema()) : AvroUtils.toBeamSchema((org.apache.avro.Schema)new Schema.Parser().parse(configuration.getSchema()));
        SimpleFunction valueMapper = Objects.equals(configuration.getFormat(), "JSON") ? JsonUtils.getJsonBytesToRowFunction((Schema)beamSchema) : AvroUtils.getAvroBytesToRowFunction((Schema)beamSchema);
        PubsubReadSchemaTransform transform = new PubsubReadSchemaTransform(configuration.getTopic(), configuration.getSubscription(), beamSchema, (SerializableFunction<byte[], Row>)valueMapper);
        if (configuration.getClientFactory() != null) {
            transform.setClientFactory(configuration.getClientFactory());
        }
        if (configuration.getClock() != null) {
            transform.setClock(configuration.getClock());
        }
        return transform;
    }

    public @UnknownKeyFor @NonNull @Initialized String identifier() {
        return "beam:schematransform:org.apache.beam:pubsub_read:v1";
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() {
        return Collections.emptyList();
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() {
        return Arrays.asList("output", "errors");
    }

    private static class PubsubReadSchemaTransform
    extends SchemaTransform
    implements Serializable {
        final @UnknownKeyFor @NonNull @Initialized Schema beamSchema;
        final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> valueMapper;
        final @Nullable @UnknownKeyFor @Initialized String topic;
        final @Nullable @UnknownKeyFor @Initialized String subscription;
         @Nullable @UnknownKeyFor @Initialized PubsubTestClient.PubsubTestClientFactory clientFactory;
        @Nullable @UnknownKeyFor @Initialized Clock clock;

        PubsubReadSchemaTransform(@Nullable @UnknownKeyFor @Initialized String topic, @Nullable @UnknownKeyFor @Initialized String subscription, @UnknownKeyFor @NonNull @Initialized Schema beamSchema, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> valueMapper) {
            this.topic = topic;
            this.subscription = subscription;
            this.beamSchema = beamSchema;
            this.valueMapper = valueMapper;
        }

        void setClientFactory( @Nullable @UnknownKeyFor @Initialized PubsubTestClient.PubsubTestClientFactory factory) {
            this.clientFactory = factory;
        }

        void setClock(@Nullable @UnknownKeyFor @Initialized Clock clock) {
            this.clock = clock;
        }

        @UnknownKeyFor @NonNull @Initialized PubsubIO.Read<@UnknownKeyFor @NonNull @Initialized PubsubMessage> buildPubsubRead() {
            PubsubIO.Read<PubsubMessage> pubsubRead = PubsubIO.readMessages();
            pubsubRead = !Strings.isNullOrEmpty((String)this.topic) ? pubsubRead.fromTopic(this.topic) : pubsubRead.fromSubscription(this.subscription);
            if (this.clientFactory != null && this.clock != null) {
                pubsubRead = pubsubRead.withClientFactory(this.clientFactory);
                pubsubRead = this.clientFactory.setClock(pubsubRead, this.clock);
            } else if (this.clientFactory != null || this.clock != null) {
                throw new IllegalArgumentException("Both PubsubTestClientFactory and Clock need to be specified for testing, but only one is provided");
            }
            return pubsubRead;
        }

        public @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple expand(@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple input) {
            PubsubIO.Read<PubsubMessage> pubsubRead = this.buildPubsubRead();
            PCollectionTuple outputTuple = (PCollectionTuple)((PCollection)input.getPipeline().apply(pubsubRead)).apply((PTransform)ParDo.of((DoFn)new ErrorCounterFn("PubSub-read-error-counter", this.valueMapper)).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
            return PCollectionRowTuple.of((String)"output", (PCollection)outputTuple.get(OUTPUT_TAG).setRowSchema(this.beamSchema), (String)"errors", (PCollection)outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
        }

        private static class ErrorCounterFn
        extends DoFn<PubsubMessage, Row> {
            private @UnknownKeyFor @NonNull @Initialized Counter pubsubErrorCounter;
            private @UnknownKeyFor @NonNull @Initialized Long errorsInBundle = 0L;
            private @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> valueMapper;

            ErrorCounterFn(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> valueMapper) {
                this.pubsubErrorCounter = Metrics.counter(PubsubReadSchemaTransformProvider.class, (String)name);
                this.valueMapper = valueMapper;
            }

            @DoFn.ProcessElement
            public void process(@DoFn.Element @UnknownKeyFor @NonNull @Initialized PubsubMessage message, // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver receiver) {
                try {
                    receiver.get(OUTPUT_TAG).output((Object)((Row)this.valueMapper.apply((Object)message.getPayload())));
                }
                catch (Exception e) {
                    this.errorsInBundle = this.errorsInBundle + 1L;
                    receiver.get(ERROR_TAG).output((Object)Row.withSchema((Schema)ERROR_SCHEMA).addValues(new Object[]{e.toString(), message.getPayload()}).build());
                }
            }

            @DoFn.FinishBundle
            public void finish(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized FinishBundleContext c) {
                this.pubsubErrorCounter.inc(this.errorsInBundle.longValue());
                this.errorsInBundle = 0L;
            }
        }
    }
}

