/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams;

import com.google.auto.value.AutoValue;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Type;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.spanner.ReadSpannerSchema;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.AutoValue_SpannerChangestreamsReadSchemaTransformProvider_SpannerChangestreamsReadConfiguration;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.gson.Gson;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.DateTime;
import org.joda.time.Instant;

public class SpannerChangestreamsReadSchemaTransformProvider
extends TypedSchemaTransformProvider<SpannerChangestreamsReadConfiguration> {
    private static final @UnknownKeyFor @NonNull @Initialized HashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized SpannerSchema> TABLE_SCHEMAS = new HashMap();

    protected @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized SpannerChangestreamsReadConfiguration> configurationClass() {
        return SpannerChangestreamsReadConfiguration.class;
    }

    public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(final @UnknownKeyFor @NonNull @Initialized SpannerChangestreamsReadConfiguration configuration) {
        return new SchemaTransform(){

            public @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> buildTransform() {
                return new PTransform<PCollectionRowTuple, PCollectionRowTuple>(){

                    public @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple expand(@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple input) {
                        Pipeline p = input.getPipeline();
                        Schema tableChangesSchema = SpannerChangestreamsReadSchemaTransformProvider.getTableSchema(configuration);
                        SpannerIO.ReadChangeStream readChangeStream = SpannerIO.readChangeStream().withSpannerConfig(SpannerConfig.create().withProjectId(configuration.getProjectId()).withInstanceId(configuration.getInstanceId()).withDatabaseId(configuration.getDatabaseId())).withChangeStreamName(configuration.getChangeStreamName()).withInclusiveStartAt(Timestamp.parseTimestamp((String)configuration.getStartAtTimestamp())).withDatabaseId(configuration.getDatabaseId()).withProjectId(configuration.getProjectId()).withInstanceId(configuration.getInstanceId());
                        if (configuration.getEndAtTimestamp() != null) {
                            String endTs = Objects.requireNonNull(Objects.requireNonNull(configuration.getEndAtTimestamp()));
                            readChangeStream = readChangeStream.withInclusiveEndAt(Timestamp.parseTimestamp((String)endTs));
                        }
                        return PCollectionRowTuple.of((String)"output", (PCollection)((PCollection)((PCollection)p.apply((PTransform)readChangeStream)).apply((PTransform)ParDo.of((DoFn)new DataChangeRecordToRow(configuration.getTable(), tableChangesSchema)))).setRowSchema(tableChangesSchema));
                    }
                };
            }
        };
    }

    public @UnknownKeyFor @NonNull @Initialized String identifier() {
        return "beam:schematransform:org.apache.beam:spanner_cdc_read:v1";
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() {
        return Collections.emptyList();
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() {
        return Collections.singletonList("output");
    }

    private static @UnknownKeyFor @NonNull @Initialized Schema getTableSchema(final @UnknownKeyFor @NonNull @Initialized SpannerChangestreamsReadConfiguration config) {
        Pipeline miniPipeline = Pipeline.create();
        PCollectionView sqlDialectView = (PCollectionView)((PCollection)miniPipeline.apply("Create Dialect", (PTransform)Create.of((Object)Dialect.GOOGLE_STANDARD_SQL, (Object[])new Dialect[0]))).apply("Dialect to View", (PTransform)View.asSingleton());
        ((PCollection)((PCollection)((PCollection)miniPipeline.apply((PTransform)Create.of((Object)null, (Object[])new Void[0]))).apply((PTransform)ParDo.of((DoFn)new ReadSpannerSchema(SpannerConfig.create().withDatabaseId(config.getDatabaseId()).withInstanceId(config.getInstanceId()).withProjectId(config.getProjectId()), (PCollectionView<Dialect>)sqlDialectView, Sets.newHashSet((Object[])new String[]{config.getTable()}))).withSideInput("dialect", sqlDialectView))).apply((PTransform)ParDo.of((DoFn)new DoFn<SpannerSchema, String>(){

            @DoFn.ProcessElement
            public void process(@DoFn.Element @UnknownKeyFor @NonNull @Initialized SpannerSchema schema) {
                TABLE_SCHEMAS.put(config.getTable(), schema);
            }
        }))).setCoder((Coder)StringUtf8Coder.of());
        miniPipeline.run().waitUntilFinish();
        SpannerSchema finalSchemaObj = TABLE_SCHEMAS.remove(config.getTable());
        if (finalSchemaObj == null) {
            throw new RuntimeException(String.format("Could not get schema for configuration %s", config));
        }
        return SpannerChangestreamsReadSchemaTransformProvider.spannerSchemaToBeamSchema(finalSchemaObj, config.getTable());
    }

    private static @UnknownKeyFor @NonNull @Initialized Schema spannerSchemaToBeamSchema(@UnknownKeyFor @NonNull @Initialized SpannerSchema spannerSchema, @UnknownKeyFor @NonNull @Initialized String tableName) {
        OptionalInt optionalIdx = IntStream.range(0, spannerSchema.getTables().size()).filter(idx -> spannerSchema.getTables().get(idx).equalsIgnoreCase(tableName)).findAny();
        if (!optionalIdx.isPresent()) {
            throw new IllegalArgumentException(String.format("Unable to retrieve schema for table %s. Found only tables: [%s]", tableName, String.join((CharSequence)", ", spannerSchema.getTables())));
        }
        Schema.Builder schemaBuilder = Schema.builder();
        String spannerTableName = spannerSchema.getTables().get(optionalIdx.getAsInt());
        for (SpannerSchema.Column col : spannerSchema.getColumns(spannerTableName)) {
            schemaBuilder = schemaBuilder.addNullableField(col.getName(), SpannerChangestreamsReadSchemaTransformProvider.spannerTypeToBeamType(col.getType()));
        }
        schemaBuilder = schemaBuilder.setOptions(Schema.Options.builder().setOption("primaryKeyColumns", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING), spannerSchema.getKeyParts(spannerTableName).stream().map(SpannerSchema.KeyPart::getField).collect(Collectors.toList())));
        return Schema.builder().addStringField("operation").addStringField("commitTimestamp").addInt64Field("recordSequence").addRowField("rowValues", schemaBuilder.build()).build();
    }

    private static @UnknownKeyFor @NonNull @Initialized Object stringToParsedValue(// Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized Schema.FieldType fieldType, @UnknownKeyFor @NonNull @Initialized String fieldValue) {
        switch (fieldType.getTypeName()) {
            case STRING: {
                return fieldValue;
            }
            case INT64: {
                return Long.valueOf(fieldValue);
            }
            case INT16: 
            case INT32: {
                return Integer.valueOf(fieldValue);
            }
            case FLOAT: {
                return Float.valueOf(Float.parseFloat(fieldValue));
            }
            case DOUBLE: {
                return Double.parseDouble(fieldValue);
            }
            case BOOLEAN: {
                return Boolean.parseBoolean(fieldValue);
            }
            case BYTES: {
                return fieldValue.getBytes(StandardCharsets.UTF_8);
            }
            case DATETIME: {
                return new DateTime((Object)fieldValue);
            }
            case DECIMAL: {
                return new BigDecimal(fieldValue);
            }
        }
        throw new IllegalArgumentException(String.format("Unable to parse field with type %s", fieldType));
    }

    private static // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized Schema.FieldType spannerTypeToBeamType(@UnknownKeyFor @NonNull @Initialized Type spannerType) {
        switch (spannerType.getCode()) {
            case BOOL: {
                return Schema.FieldType.BOOLEAN;
            }
            case BYTES: {
                return Schema.FieldType.BYTES;
            }
            case STRING: {
                return Schema.FieldType.STRING;
            }
            case INT64: {
                return Schema.FieldType.INT64;
            }
            case NUMERIC: {
                return Schema.FieldType.DECIMAL;
            }
            case FLOAT64: {
                return Schema.FieldType.DOUBLE;
            }
            case TIMESTAMP: 
            case DATE: {
                return Schema.FieldType.DATETIME;
            }
            case ARRAY: {
                return Schema.FieldType.array((Schema.FieldType)SpannerChangestreamsReadSchemaTransformProvider.spannerTypeToBeamType(spannerType.getArrayElementType()));
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported spanner type: %s", spannerType));
    }

    private static final class DataChangeRecordToRow
    extends DoFn<DataChangeRecord, Row> {
        private final @UnknownKeyFor @NonNull @Initialized Schema tableChangeRecordSchema;
        private final @UnknownKeyFor @NonNull @Initialized String tableName;
        private transient @UnknownKeyFor @NonNull @Initialized Gson gson;

        DataChangeRecordToRow(@UnknownKeyFor @NonNull @Initialized String tableName, @UnknownKeyFor @NonNull @Initialized Schema tableChangeRecordSchema) {
            this.tableName = tableName;
            this.tableChangeRecordSchema = tableChangeRecordSchema;
            this.gson = new Gson();
        }

        public @UnknownKeyFor @NonNull @Initialized Gson getGson() {
            if (this.gson == null) {
                this.gson = new Gson();
            }
            return this.gson;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element @UnknownKeyFor @NonNull @Initialized DataChangeRecord record, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized Row> receiver) {
            if (!record.getTableName().equalsIgnoreCase(this.tableName)) {
                return;
            }
            Instant timestamp = new Instant((Object)record.getRecordTimestamp().toSqlTimestamp());
            for (Mod mod : record.getMods()) {
                Schema internalRowSchema = this.tableChangeRecordSchema.getField("rowValues").getType().getRowSchema();
                if (internalRowSchema == null) {
                    throw new RuntimeException("Row schema for internal row is null and cannot be utilized.");
                }
                Row.FieldValueBuilder rowBuilder = Row.fromRow((Row)Row.nullRow((Schema)internalRowSchema));
                Map newValues = Optional.ofNullable(mod.getNewValuesJson()).map(nonNullValues -> (Map)this.getGson().fromJson(nonNullValues, Map.class)).orElseGet(Collections::emptyMap);
                Map keyValues = Optional.ofNullable(mod.getKeysJson()).map(nonNullValues -> (Map)this.getGson().fromJson(nonNullValues, Map.class)).orElseGet(Collections::emptyMap);
                for (Map.Entry valueEntry : newValues.entrySet()) {
                    if (valueEntry.getValue() == null) continue;
                    rowBuilder = rowBuilder.withFieldValue(((String)valueEntry.getKey()).toLowerCase(), SpannerChangestreamsReadSchemaTransformProvider.stringToParsedValue(internalRowSchema.getField(((String)valueEntry.getKey()).toLowerCase()).getType(), (String)valueEntry.getValue()));
                }
                for (Map.Entry pkEntry : keyValues.entrySet()) {
                    if (pkEntry.getValue() == null) continue;
                    rowBuilder = rowBuilder.withFieldValue(((String)pkEntry.getKey()).toLowerCase(), SpannerChangestreamsReadSchemaTransformProvider.stringToParsedValue(internalRowSchema.getField(((String)pkEntry.getKey()).toLowerCase()).getType(), (String)pkEntry.getValue()));
                }
                receiver.outputWithTimestamp((Object)Row.withSchema((Schema)this.tableChangeRecordSchema).addValue((Object)record.getModType().toString()).addValue((Object)record.getCommitTimestamp().toString()).addValue((Object)Long.parseLong(record.getRecordSequence())).addValue((Object)rowBuilder.build()).build(), timestamp);
            }
        }
    }

    @DefaultSchema(value=AutoValueSchema.class)
    @AutoValue
    public static abstract class SpannerChangestreamsReadConfiguration
    implements Serializable {
        public abstract @UnknownKeyFor @NonNull @Initialized String getDatabaseId();

        public abstract @UnknownKeyFor @NonNull @Initialized String getProjectId();

        public abstract @UnknownKeyFor @NonNull @Initialized String getInstanceId();

        public abstract @UnknownKeyFor @NonNull @Initialized String getTable();

        public abstract @UnknownKeyFor @NonNull @Initialized String getStartAtTimestamp();

        public abstract @Nullable @UnknownKeyFor @Initialized String getEndAtTimestamp();

        public abstract @UnknownKeyFor @NonNull @Initialized String getChangeStreamName();

        public static @UnknownKeyFor @NonNull @Initialized Builder builder() {
            return new AutoValue_SpannerChangestreamsReadSchemaTransformProvider_SpannerChangestreamsReadConfiguration.Builder();
        }

        @AutoValue.Builder
        public static abstract class Builder {
            public abstract @UnknownKeyFor @NonNull @Initialized Builder setDatabaseId(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setProjectId(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setInstanceId(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setTable(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setStartAtTimestamp(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setEndAtTimestamp(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setChangeStreamName(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized SpannerChangestreamsReadConfiguration build();
        }
    }
}

