/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;

public class ChangeStreamRecordMapper {
    private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
    private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
    private static final String CHILD_PARTITIONS_RECORD_COLUMN = "child_partitions_record";
    private static final String COMMIT_TIMESTAMP_COLUMN = "commit_timestamp";
    private static final String SERVER_TRANSACTION_ID_COLUMN = "server_transaction_id";
    private static final String IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN = "is_last_record_in_transaction_in_partition";
    private static final String RECORD_SEQUENCE_COLUMN = "record_sequence";
    private static final String TABLE_NAME_COLUMN = "table_name";
    private static final String COLUMN_TYPES_COLUMN = "column_types";
    private static final String MODS_COLUMN = "mods";
    private static final String MOD_TYPE_COLUMN = "mod_type";
    private static final String VALUE_CAPTURE_TYPE_COLUMN = "value_capture_type";
    private static final String NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN = "number_of_records_in_transaction";
    private static final String NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN = "number_of_partitions_in_transaction";
    private static final String NAME_COLUMN = "name";
    private static final String TYPE_COLUMN = "type";
    private static final String IS_PRIMARY_KEY_COLUMN = "is_primary_key";
    private static final String ORDINAL_POSITION_COLUMN = "ordinal_position";
    private static final String KEYS_COLUMN = "keys";
    private static final String OLD_VALUES_COLUMN = "old_values";
    private static final String NEW_VALUES_COLUMN = "new_values";
    private static final String TIMESTAMP_COLUMN = "timestamp";
    private static final String START_TIMESTAMP_COLUMN = "start_timestamp";
    private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
    private static final String PARENT_PARTITION_TOKENS_COLUMN = "parent_partition_tokens";
    private static final String TOKEN_COLUMN = "token";

    ChangeStreamRecordMapper() {
    }

    public List<ChangeStreamRecord> toChangeStreamRecords(PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
        return row.getStructList(0).stream().flatMap(struct -> this.toChangeStreamRecord(partition, (Struct)struct, resultSetMetadata)).collect(Collectors.toList());
    }

    private Stream<ChangeStreamRecord> toChangeStreamRecord(PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
        Stream<DataChangeRecord> dataChangeRecords = row.getStructList(DATA_CHANGE_RECORD_COLUMN).stream().filter(this::isNonNullDataChangeRecord).map(struct -> this.toDataChangeRecord(partition, (Struct)struct, resultSetMetadata));
        Stream<HeartbeatRecord> heartbeatRecords = row.getStructList(HEARTBEAT_RECORD_COLUMN).stream().filter(this::isNonNullHeartbeatRecord).map(struct -> this.toHeartbeatRecord(partition, (Struct)struct, resultSetMetadata));
        Stream<ChildPartitionsRecord> childPartitionsRecords = row.getStructList(CHILD_PARTITIONS_RECORD_COLUMN).stream().filter(this::isNonNullChildPartitionsRecord).map(struct -> this.toChildPartitionsRecord(partition, (Struct)struct, resultSetMetadata));
        return Stream.concat(Stream.concat(dataChangeRecords, heartbeatRecords), childPartitionsRecords);
    }

    private boolean isNonNullDataChangeRecord(Struct row) {
        return !row.isNull(COMMIT_TIMESTAMP_COLUMN);
    }

    private boolean isNonNullHeartbeatRecord(Struct row) {
        return !row.isNull(TIMESTAMP_COLUMN);
    }

    private boolean isNonNullChildPartitionsRecord(Struct row) {
        return !row.isNull(START_TIMESTAMP_COLUMN);
    }

    private DataChangeRecord toDataChangeRecord(PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
        Timestamp commitTimestamp = row.getTimestamp(COMMIT_TIMESTAMP_COLUMN);
        return new DataChangeRecord(partition.getPartitionToken(), commitTimestamp, row.getString(SERVER_TRANSACTION_ID_COLUMN), row.getBoolean(IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN), row.getString(RECORD_SEQUENCE_COLUMN), row.getString(TABLE_NAME_COLUMN), row.getStructList(COLUMN_TYPES_COLUMN).stream().map(this::columnTypeFrom).collect(Collectors.toList()), row.getStructList(MODS_COLUMN).stream().map(this::modFrom).collect(Collectors.toList()), this.modTypeFrom(row.getString(MOD_TYPE_COLUMN)), this.valueCaptureTypeFrom(row.getString(VALUE_CAPTURE_TYPE_COLUMN)), row.getLong(NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN), row.getLong(NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN), this.changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
    }

    private HeartbeatRecord toHeartbeatRecord(PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
        Timestamp timestamp = row.getTimestamp(TIMESTAMP_COLUMN);
        return new HeartbeatRecord(timestamp, this.changeStreamRecordMetadataFrom(partition, timestamp, resultSetMetadata));
    }

    private ChildPartitionsRecord toChildPartitionsRecord(PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
        Timestamp startTimestamp = row.getTimestamp(START_TIMESTAMP_COLUMN);
        return new ChildPartitionsRecord(startTimestamp, row.getString(RECORD_SEQUENCE_COLUMN), row.getStructList(CHILD_PARTITIONS_COLUMN).stream().map(struct -> this.childPartitionFrom(partition.getPartitionToken(), (Struct)struct)).collect(Collectors.toList()), this.changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
    }

    private ColumnType columnTypeFrom(Struct struct) {
        String type = this.getJsonString(struct, TYPE_COLUMN);
        return new ColumnType(struct.getString(NAME_COLUMN), new TypeCode(type), struct.getBoolean(IS_PRIMARY_KEY_COLUMN), struct.getLong(ORDINAL_POSITION_COLUMN));
    }

    private Mod modFrom(Struct struct) {
        String keys = this.getJsonString(struct, KEYS_COLUMN);
        String oldValues = struct.isNull(OLD_VALUES_COLUMN) ? null : this.getJsonString(struct, OLD_VALUES_COLUMN);
        String newValues = struct.isNull(NEW_VALUES_COLUMN) ? null : this.getJsonString(struct, NEW_VALUES_COLUMN);
        return new Mod(keys, oldValues, newValues);
    }

    private ModType modTypeFrom(String name) {
        try {
            return ModType.valueOf(name);
        }
        catch (IllegalArgumentException e) {
            return ModType.UNKNOWN;
        }
    }

    private ValueCaptureType valueCaptureTypeFrom(String name) {
        try {
            return ValueCaptureType.valueOf(name);
        }
        catch (IllegalArgumentException e) {
            return ValueCaptureType.UNKNOWN;
        }
    }

    private ChildPartition childPartitionFrom(String partitionToken, Struct struct) {
        HashSet parentTokens = Sets.newHashSet((Iterable)struct.getStringList(PARENT_PARTITION_TOKENS_COLUMN));
        if (InitialPartition.isInitialPartition(partitionToken)) {
            parentTokens.add(partitionToken);
        }
        return new ChildPartition(struct.getString(TOKEN_COLUMN), parentTokens);
    }

    private ChangeStreamRecordMetadata changeStreamRecordMetadataFrom(PartitionMetadata partition, Timestamp recordTimestamp, ChangeStreamResultSetMetadata resultSetMetadata) {
        return ChangeStreamRecordMetadata.newBuilder().withRecordTimestamp(recordTimestamp).withPartitionToken(partition.getPartitionToken()).withPartitionStartTimestamp(partition.getStartTimestamp()).withPartitionEndTimestamp(partition.getEndTimestamp()).withPartitionCreatedAt(partition.getCreatedAt()).withPartitionScheduledAt(partition.getScheduledAt()).withPartitionRunningAt(partition.getRunningAt()).withQueryStartedAt(resultSetMetadata.getQueryStartedAt()).withRecordStreamStartedAt(resultSetMetadata.getRecordStreamStartedAt()).withRecordStreamEndedAt(resultSetMetadata.getRecordStreamEndedAt()).withRecordReadAt(resultSetMetadata.getRecordReadAt()).withTotalStreamTimeMillis(resultSetMetadata.getTotalStreamDuration().getMillis()).withNumberOfRecordsRead(resultSetMetadata.getNumberOfRecordsRead()).build();
    }

    private String getJsonString(Struct struct, String columnName) {
        if (struct.getColumnType(columnName).equals((Object)Type.json())) {
            return struct.getJson(columnName);
        }
        if (struct.getColumnType(columnName).equals((Object)Type.string())) {
            return struct.getString(columnName);
        }
        throw new IllegalArgumentException("Can not extract string from value " + columnName);
    }
}

