/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.AvroRuntimeException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubMessageToRow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.RowJson;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

@Internal
@Experimental
@AutoValue
abstract class PubsubMessageToRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple>
implements Serializable {
    static final @UnknownKeyFor @NonNull @Initialized String TIMESTAMP_FIELD = "event_timestamp";
    static final @UnknownKeyFor @NonNull @Initialized String ATTRIBUTES_FIELD = "attributes";
    static final @UnknownKeyFor @NonNull @Initialized String PAYLOAD_FIELD = "payload";
    static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized PubsubMessage> DLQ_TAG = new TupleTag<PubsubMessage>(){};
    static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Row> MAIN_TAG = new TupleTag<Row>(){};

    PubsubMessageToRow() {
    }

    public abstract @UnknownKeyFor @NonNull @Initialized Schema messageSchema();

    public abstract @UnknownKeyFor @NonNull @Initialized boolean useDlq();

    public abstract @UnknownKeyFor @NonNull @Initialized boolean useFlatSchema();

    public abstract @UnknownKeyFor @NonNull @Initialized PubsubSchemaIOProvider.PayloadFormat payloadFormat();

    public static @UnknownKeyFor @NonNull @Initialized Builder builder() {
        return new AutoValue_PubsubMessageToRow.Builder().payloadFormat(PubsubSchemaIOProvider.PayloadFormat.JSON);
    }

    public @UnknownKeyFor @NonNull @Initialized PCollectionTuple expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized PubsubMessage> input) {
        PCollectionTuple rows = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)(this.useFlatSchema() ? new FlatSchemaPubsubMessageToRow(this.messageSchema(), this.useDlq(), this.payloadFormat()) : new NestedSchemaPubsubMessageToRow(this.messageSchema(), this.useDlq(), this.payloadFormat()))).withOutputTags(MAIN_TAG, this.useDlq() ? TupleTagList.of(DLQ_TAG) : TupleTagList.empty()));
        rows.get(MAIN_TAG).setRowSchema(this.messageSchema());
        return rows;
    }

    @VisibleForTesting
    static @UnknownKeyFor @NonNull @Initialized SimpleFunction<@UnknownKeyFor @NonNull @Initialized PubsubMessage, @UnknownKeyFor @NonNull @Initialized Row> getParsePayloadFn(@UnknownKeyFor @NonNull @Initialized PubsubSchemaIOProvider.PayloadFormat format, @UnknownKeyFor @NonNull @Initialized Schema payloadSchema) {
        switch (format) {
            case JSON: {
                return new ParseJsonPayloadFn(payloadSchema);
            }
            case AVRO: {
                return new ParseAvroPayloadFn(payloadSchema);
            }
        }
        throw new IllegalArgumentException("Unsupported payload format given: " + (Object)((Object)format));
    }

    public static class ParseException
    extends RuntimeException {
        ParseException(@UnknownKeyFor @NonNull @Initialized Throwable cause) {
            super("Error parsing message", cause);
        }
    }

    @AutoValue.Builder
    static abstract class Builder {
        Builder() {
        }

        public abstract @UnknownKeyFor @NonNull @Initialized Builder messageSchema(@UnknownKeyFor @NonNull @Initialized Schema var1);

        public abstract @UnknownKeyFor @NonNull @Initialized Builder useDlq(@UnknownKeyFor @NonNull @Initialized boolean var1);

        public abstract @UnknownKeyFor @NonNull @Initialized Builder useFlatSchema(@UnknownKeyFor @NonNull @Initialized boolean var1);

        public abstract @UnknownKeyFor @NonNull @Initialized Builder payloadFormat(@UnknownKeyFor @NonNull @Initialized PubsubSchemaIOProvider.PayloadFormat var1);

        public abstract @UnknownKeyFor @NonNull @Initialized PubsubMessageToRow build();
    }

    @Internal
    private static class NestedSchemaPubsubMessageToRow
    extends DoFn<PubsubMessage, Row> {
        private final @UnknownKeyFor @NonNull @Initialized Schema messageSchema;
        private final @UnknownKeyFor @NonNull @Initialized boolean useDlq;
        private final @UnknownKeyFor @NonNull @Initialized SimpleFunction<@UnknownKeyFor @NonNull @Initialized PubsubMessage, @UnknownKeyFor @NonNull @Initialized Row> parsePayloadFn;

        protected NestedSchemaPubsubMessageToRow(@UnknownKeyFor @NonNull @Initialized Schema messageSchema, @UnknownKeyFor @NonNull @Initialized boolean useDlq, @UnknownKeyFor @NonNull @Initialized PubsubSchemaIOProvider.PayloadFormat payloadFormat) {
            this.messageSchema = messageSchema;
            this.useDlq = useDlq;
            Schema payloadSchema = messageSchema.getField(PubsubMessageToRow.PAYLOAD_FIELD).getType().getRowSchema();
            this.parsePayloadFn = PubsubMessageToRow.getParsePayloadFn(payloadFormat, payloadSchema);
        }

        private @UnknownKeyFor @NonNull @Initialized Object getValueForFieldNestedSchema(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized Schema.Field field, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> attributeMap, @UnknownKeyFor @NonNull @Initialized Row payload) {
            switch (field.getName()) {
                case "event_timestamp": {
                    return timestamp;
                }
                case "attributes": {
                    return attributeMap;
                }
                case "payload": {
                    return payload;
                }
            }
            throw new IllegalArgumentException("Unexpected field '" + field.getName() + "' in top level schema for Pubsub message. Top level schema should only contain 'timestamp', 'attributes', and 'payload' fields");
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized PubsubMessage element, @DoFn.Timestamp @UnknownKeyFor @NonNull @Initialized Instant timestamp, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver o) {
            try {
                Row payload = (Row)this.parsePayloadFn.apply((Object)element);
                List values = this.messageSchema.getFields().stream().map(field -> this.getValueForFieldNestedSchema((Schema.Field)field, timestamp, element.getAttributeMap(), payload)).collect(Collectors.toList());
                o.get(MAIN_TAG).output((Object)Row.withSchema((Schema)this.messageSchema).addValues(values).build());
            }
            catch (ParseException exception) {
                if (this.useDlq) {
                    o.get(DLQ_TAG).output((Object)element);
                }
                throw new RuntimeException(exception);
            }
        }
    }

    @Internal
    private static class FlatSchemaPubsubMessageToRow
    extends DoFn<PubsubMessage, Row> {
        private final @UnknownKeyFor @NonNull @Initialized Schema messageSchema;
        private final @UnknownKeyFor @NonNull @Initialized boolean useDlq;
        private final @UnknownKeyFor @NonNull @Initialized SimpleFunction<@UnknownKeyFor @NonNull @Initialized PubsubMessage, @UnknownKeyFor @NonNull @Initialized Row> parsePayloadFn;

        protected FlatSchemaPubsubMessageToRow(@UnknownKeyFor @NonNull @Initialized Schema messageSchema, @UnknownKeyFor @NonNull @Initialized boolean useDlq, @UnknownKeyFor @NonNull @Initialized PubsubSchemaIOProvider.PayloadFormat payloadFormat) {
            this.messageSchema = messageSchema;
            Schema payloadSchema = new Schema(messageSchema.getFields().stream().filter(f -> !f.getName().equals(PubsubMessageToRow.TIMESTAMP_FIELD)).collect(Collectors.toList()));
            this.useDlq = useDlq;
            this.parsePayloadFn = PubsubMessageToRow.getParsePayloadFn(payloadFormat, payloadSchema);
        }

        private @UnknownKeyFor @NonNull @Initialized Object getValueForFieldFlatSchema(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized Schema.Field field, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Row payload) {
            String fieldName = field.getName();
            if (PubsubMessageToRow.TIMESTAMP_FIELD.equals(fieldName)) {
                return timestamp;
            }
            return payload.getValue(fieldName);
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized PubsubMessage element, @DoFn.Timestamp @UnknownKeyFor @NonNull @Initialized Instant timestamp, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver o) {
            try {
                Row payload = (Row)this.parsePayloadFn.apply((Object)element);
                List values = this.messageSchema.getFields().stream().map(field -> this.getValueForFieldFlatSchema((Schema.Field)field, timestamp, payload)).collect(Collectors.toList());
                o.get(MAIN_TAG).output((Object)Row.withSchema((Schema)this.messageSchema).addValues(values).build());
            }
            catch (ParseException pe) {
                if (this.useDlq) {
                    o.get(DLQ_TAG).output((Object)element);
                }
                throw new RuntimeException(pe);
            }
        }
    }

    private static class ParseAvroPayloadFn
    extends SimpleFunction<PubsubMessage, Row> {
        private final @UnknownKeyFor @NonNull @Initialized SimpleFunction<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized Row> avroBytesToRowFn;

        public ParseAvroPayloadFn(@UnknownKeyFor @NonNull @Initialized Schema payloadSchema) {
            this.avroBytesToRowFn = AvroUtils.getAvroBytesToRowFunction((Schema)payloadSchema);
        }

        public @UnknownKeyFor @NonNull @Initialized Row apply(@UnknownKeyFor @NonNull @Initialized PubsubMessage message) {
            try {
                return (Row)this.avroBytesToRowFn.apply((Object)message.getPayload());
            }
            catch (AvroRuntimeException e) {
                throw new ParseException(e);
            }
        }
    }

    private static class ParseJsonPayloadFn
    extends SimpleFunction<PubsubMessage, Row> {
        private final @UnknownKeyFor @NonNull @Initialized ObjectMapper jsonMapper;

        ParseJsonPayloadFn(@UnknownKeyFor @NonNull @Initialized Schema payloadSchema) {
            this.jsonMapper = RowJsonUtils.newObjectMapperWith((RowJson.RowJsonDeserializer)RowJson.RowJsonDeserializer.forSchema((Schema)payloadSchema));
        }

        public @UnknownKeyFor @NonNull @Initialized Row apply(@UnknownKeyFor @NonNull @Initialized PubsubMessage message) {
            String payloadJson = new String(message.getPayload(), StandardCharsets.UTF_8);
            try {
                return RowJsonUtils.jsonToRow((ObjectMapper)this.jsonMapper, (String)payloadJson);
            }
            catch (RowJson.UnsupportedRowJsonException e) {
                throw new ParseException(e);
            }
        }
    }
}

