/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsublite;

import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoOneOf;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsublite.AutoOneOf_PubsubLiteTableProvider_Location;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsublite.PubsubLiteSubscriptionTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsublite.PubsubLiteTopicTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsublite.RowHandler;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.DeadLetteredTransform;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;

@AutoService(value={TableProvider.class})
public class PubsubLiteTableProvider
extends InMemoryMetaTableProvider {
    @Override
    public String getTableType() {
        return "pubsublite";
    }

    private static Optional<PayloadSerializer> getSerializer(Schema schema, JSONObject properties) {
        if (schema.getField("payload").getType().equals((Object)Schema.FieldType.BYTES)) {
            Preconditions.checkArgument((!properties.containsKey((Object)"format") ? 1 : 0) != 0, (Object)"Must not set the 'format' property if not unpacking payload.");
            return Optional.empty();
        }
        String format = properties.containsKey((Object)"format") ? properties.getString("format") : "json";
        return Optional.of(PayloadSerializers.getSerializer((String)format, (Schema)schema, (Map)properties.getInnerMap()));
    }

    private static void checkFieldHasType(Schema.Field field, Schema.FieldType type) {
        Preconditions.checkArgument((boolean)type.equivalent(field.getType(), Schema.EquivalenceNullablePolicy.WEAKEN), (Object)String.format("'%s' field must have schema matching '%s'.", field.getName(), type));
    }

    private static void validateSchema(Schema schema) {
        Preconditions.checkArgument((boolean)schema.hasField("payload"), (Object)"Must provide a 'payload' field for Pub/Sub Lite.");
        block13: for (Schema.Field field : schema.getFields()) {
            switch (field.getName()) {
                case "attributes": {
                    PubsubLiteTableProvider.checkFieldHasType(field, RowHandler.ATTRIBUTES_FIELD_TYPE);
                    continue block13;
                }
                case "event_timestamp": 
                case "publish_timestamp": {
                    PubsubLiteTableProvider.checkFieldHasType(field, Schema.FieldType.DATETIME);
                    continue block13;
                }
                case "message_key": {
                    PubsubLiteTableProvider.checkFieldHasType(field, Schema.FieldType.BYTES);
                    continue block13;
                }
                case "payload": {
                    Preconditions.checkArgument((Schema.FieldType.BYTES.equivalent(field.getType(), Schema.EquivalenceNullablePolicy.WEAKEN) || field.getType().getTypeName().equals((Object)Schema.TypeName.ROW) ? 1 : 0) != 0, (Object)String.format("'%s' field must either have a 'BYTES NOT NULL' or 'ROW' schema.", field.getName()));
                    continue block13;
                }
            }
            throw new IllegalArgumentException(String.format("'%s' field is invalid at the top level for Pub/Sub Lite.", field.getName()));
        }
    }

    private static RowHandler getRowHandler(Schema schema, Optional<PayloadSerializer> optionalSerializer) {
        if (optionalSerializer.isPresent()) {
            return new RowHandler(schema, optionalSerializer.get());
        }
        return new RowHandler(schema);
    }

    private static <InputT, OutputT> PTransform<PCollection<? extends InputT>, PCollection<OutputT>> addDlqIfPresent(SimpleFunction<InputT, OutputT> transform, JSONObject properties) {
        if (properties.containsKey((Object)"deadLetterQueue")) {
            return new DeadLetteredTransform(transform, properties.getString("deadLetterQueue"));
        }
        return MapElements.via(transform);
    }

    @Override
    public BeamSqlTable buildBeamSqlTable(Table table) {
        Preconditions.checkArgument((boolean)table.getType().equals(this.getTableType()));
        PubsubLiteTableProvider.validateSchema(table.getSchema());
        Optional<PayloadSerializer> serializer = PubsubLiteTableProvider.getSerializer(table.getSchema(), table.getProperties());
        Location location = Location.parse((String)org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull((Object)table.getLocation()));
        RowHandler rowHandler = PubsubLiteTableProvider.getRowHandler(table.getSchema(), serializer);
        switch (location.getKind()) {
            case TOPIC: {
                Preconditions.checkArgument((!table.getSchema().hasField("publish_timestamp") ? 1 : 0) != 0, (Object)"May not write to publish timestamp, this field is read-only.");
                return new PubsubLiteTopicTable(table.getSchema(), location.topic(), PubsubLiteTableProvider.addDlqIfPresent(SimpleFunction.fromSerializableFunctionWithOutputType(rowHandler::rowToMessage, (TypeDescriptor)TypeDescriptor.of(PubSubMessage.class)), table.getProperties()));
            }
            case SUBSCRIPTION: {
                return new PubsubLiteSubscriptionTable(table.getSchema(), location.subscription(), PubsubLiteTableProvider.addDlqIfPresent(SimpleFunction.fromSerializableFunctionWithOutputType(rowHandler::messageToRow, (TypeDescriptor)TypeDescriptor.of(Row.class)), table.getProperties()));
            }
        }
        throw new IllegalArgumentException("Invalid kind for location: " + (Object)((Object)location.getKind()));
    }

    @AutoOneOf(value=Kind.class)
    static abstract class Location {
        Location() {
        }

        abstract Kind getKind();

        abstract TopicPath topic();

        abstract SubscriptionPath subscription();

        static Location parse(String location) {
            if (location.contains("/topics/")) {
                return AutoOneOf_PubsubLiteTableProvider_Location.topic(TopicPath.parse((String)location));
            }
            if (location.contains("/subscriptions/")) {
                return AutoOneOf_PubsubLiteTableProvider_Location.subscription(SubscriptionPath.parse((String)location));
            }
            throw new IllegalArgumentException(String.format("Location '%s' does not correspond to either a Pub/Sub Lite topic or subscription.", location));
        }

        static enum Kind {
            TOPIC,
            SUBSCRIPTION;

        }
    }
}

