/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DataChangeRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.HeartbeatRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.PartitionEndRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.PartitionEventRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.PartitionStartRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryChangeStreamAction {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
    private static final @UnknownKeyFor @NonNull @Initialized Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes((long)5L);
    private static final @UnknownKeyFor @NonNull @Initialized Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds((long)40L);
    private static final @UnknownKeyFor @NonNull @Initialized String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";
    private final @UnknownKeyFor @NonNull @Initialized ChangeStreamDao changeStreamDao;
    private final @UnknownKeyFor @NonNull @Initialized PartitionMetadataDao partitionMetadataDao;
    private final @UnknownKeyFor @NonNull @Initialized ChangeStreamRecordMapper changeStreamRecordMapper;
    private final @UnknownKeyFor @NonNull @Initialized PartitionMetadataMapper partitionMetadataMapper;
    private final @UnknownKeyFor @NonNull @Initialized DataChangeRecordAction dataChangeRecordAction;
    private final @UnknownKeyFor @NonNull @Initialized HeartbeatRecordAction heartbeatRecordAction;
    private final @UnknownKeyFor @NonNull @Initialized ChildPartitionsRecordAction childPartitionsRecordAction;
    private final @UnknownKeyFor @NonNull @Initialized PartitionStartRecordAction partitionStartRecordAction;
    private final @UnknownKeyFor @NonNull @Initialized PartitionEndRecordAction partitionEndRecordAction;
    private final @UnknownKeyFor @NonNull @Initialized PartitionEventRecordAction partitionEventRecordAction;
    private final @UnknownKeyFor @NonNull @Initialized ChangeStreamMetrics metrics;

    QueryChangeStreamAction(@UnknownKeyFor @NonNull @Initialized ChangeStreamDao changeStreamDao, @UnknownKeyFor @NonNull @Initialized PartitionMetadataDao partitionMetadataDao, @UnknownKeyFor @NonNull @Initialized ChangeStreamRecordMapper changeStreamRecordMapper, @UnknownKeyFor @NonNull @Initialized PartitionMetadataMapper partitionMetadataMapper, @UnknownKeyFor @NonNull @Initialized DataChangeRecordAction dataChangeRecordAction, @UnknownKeyFor @NonNull @Initialized HeartbeatRecordAction heartbeatRecordAction, @UnknownKeyFor @NonNull @Initialized ChildPartitionsRecordAction childPartitionsRecordAction, @UnknownKeyFor @NonNull @Initialized PartitionStartRecordAction partitionStartRecordAction, @UnknownKeyFor @NonNull @Initialized PartitionEndRecordAction partitionEndRecordAction, @UnknownKeyFor @NonNull @Initialized PartitionEventRecordAction partitionEventRecordAction, @UnknownKeyFor @NonNull @Initialized ChangeStreamMetrics metrics) {
        this.changeStreamDao = changeStreamDao;
        this.partitionMetadataDao = partitionMetadataDao;
        this.changeStreamRecordMapper = changeStreamRecordMapper;
        this.partitionMetadataMapper = partitionMetadataMapper;
        this.dataChangeRecordAction = dataChangeRecordAction;
        this.heartbeatRecordAction = heartbeatRecordAction;
        this.childPartitionsRecordAction = childPartitionsRecordAction;
        this.partitionStartRecordAction = partitionStartRecordAction;
        this.partitionEndRecordAction = partitionEndRecordAction;
        this.partitionEventRecordAction = partitionEventRecordAction;
        this.metrics = metrics;
    }

    /*
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation run(@UnknownKeyFor @NonNull @Initialized PartitionMetadata partition, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized TimestampRange, @UnknownKeyFor @NonNull @Initialized Timestamp> tracker, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized DataChangeRecord> receiver, @UnknownKeyFor @NonNull @Initialized ManualWatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer bundleFinalizer) {
        String token = partition.getPartitionToken();
        Timestamp startTimestamp = ((TimestampRange)tracker.currentRestriction()).getFrom();
        Timestamp endTimestamp = partition.getEndTimestamp();
        Timestamp changeStreamQueryEndTimestamp = endTimestamp.equals((Object)ChangeStreamsConstants.MAX_INCLUSIVE_END_AT) ? this.getNextReadChangeStreamEndTimestamp() : endTimestamp;
        PartitionMetadata updatedPartition = Optional.ofNullable(this.partitionMetadataDao.getPartition(token)).map(this.partitionMetadataMapper::from).orElseThrow(() -> new IllegalStateException("Partition " + token + " not found in metadata table"));
        RestrictionInterrupter<Timestamp> interrupter = RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);
        try (ChangeStreamResultSet resultSet = this.changeStreamDao.changeStreamQuery(token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis());){
            this.metrics.incQueryCounter();
            while (resultSet.next()) {
                List<ChangeStreamRecord> records = this.changeStreamRecordMapper.toChangeStreamRecords(updatedPartition, resultSet, resultSet.getMetadata());
                for (ChangeStreamRecord record : records) {
                    Optional<DoFn.ProcessContinuation> maybeContinuation;
                    if (record instanceof DataChangeRecord) {
                        maybeContinuation = this.dataChangeRecordAction.run(updatedPartition, (DataChangeRecord)record, tracker, interrupter, receiver, watermarkEstimator);
                    } else if (record instanceof HeartbeatRecord) {
                        maybeContinuation = this.heartbeatRecordAction.run(updatedPartition, (HeartbeatRecord)record, tracker, interrupter, watermarkEstimator);
                    } else if (record instanceof ChildPartitionsRecord) {
                        maybeContinuation = this.childPartitionsRecordAction.run(updatedPartition, (ChildPartitionsRecord)record, tracker, interrupter, watermarkEstimator);
                    } else if (record instanceof PartitionStartRecord) {
                        maybeContinuation = this.partitionStartRecordAction.run(updatedPartition, (PartitionStartRecord)record, tracker, interrupter, watermarkEstimator);
                    } else if (record instanceof PartitionEndRecord) {
                        maybeContinuation = this.partitionEndRecordAction.run(updatedPartition, (PartitionEndRecord)record, tracker, interrupter, watermarkEstimator);
                    } else if (record instanceof PartitionEventRecord) {
                        maybeContinuation = this.partitionEventRecordAction.run(updatedPartition, (PartitionEventRecord)record, tracker, interrupter, watermarkEstimator);
                    } else {
                        LOG.error("[{}] Unknown record type {}", (Object)token, record.getClass());
                        throw new IllegalArgumentException("Unknown record type " + record.getClass());
                    }
                    if (!maybeContinuation.isPresent()) continue;
                    LOG.debug("[{}] Continuation present, returning {}", (Object)token, maybeContinuation);
                    bundleFinalizer.afterBundleCommit(Instant.now().plus((ReadableDuration)BUNDLE_FINALIZER_TIMEOUT), this.updateWatermarkCallback(token, (WatermarkEstimator<Instant>)watermarkEstimator));
                    DoFn.ProcessContinuation processContinuation = maybeContinuation.get();
                    return processContinuation;
                }
            }
            bundleFinalizer.afterBundleCommit(Instant.now().plus((ReadableDuration)BUNDLE_FINALIZER_TIMEOUT), this.updateWatermarkCallback(token, (WatermarkEstimator<Instant>)watermarkEstimator));
        }
        catch (SpannerException e) {
            if (this.isTimestampOutOfRange(e)) {
                LOG.info("[{}] query change stream is out of range for {} to {}, finishing stream.", new Object[]{token, startTimestamp, endTimestamp, e});
            }
            throw e;
        }
        catch (Exception e) {
            LOG.error("[{}] query change stream had exception processing range {} to {}.", new Object[]{token, startTimestamp, endTimestamp, e});
            throw e;
        }
        LOG.debug("[{}] change stream completed successfully", (Object)token);
        if (tracker.tryClaim((Object)endTimestamp)) {
            LOG.debug("[{}] Finishing partition", (Object)token);
            this.partitionMetadataDao.updateToFinished(token);
            this.metrics.decActivePartitionReadCounter();
            LOG.info("[{}] After attempting to finish the partition", (Object)token);
        }
        return DoFn.ProcessContinuation.stop();
    }

    private // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer.Callback updateWatermarkCallback(@UnknownKeyFor @NonNull @Initialized String token, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator) {
        return () -> {
            Instant watermark = watermarkEstimator.currentWatermark();
            LOG.debug("[{}] Updating current watermark to {}", (Object)token, (Object)watermark);
            try {
                this.partitionMetadataDao.updateWatermark(token, Timestamp.ofTimeMicroseconds((long)(watermark.getMillis() * 1000L)));
            }
            catch (SpannerException e) {
                if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
                    LOG.debug("[{}] Unable to update the current watermark, partition NOT FOUND", (Object)token);
                }
                LOG.error("[{}] Error updating the current watermark: {}", new Object[]{token, e.getMessage(), e});
            }
        };
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isTimestampOutOfRange(@UnknownKeyFor @NonNull @Initialized SpannerException e) {
        return (e.getErrorCode() == ErrorCode.INVALID_ARGUMENT || e.getErrorCode() == ErrorCode.OUT_OF_RANGE) && e.getMessage() != null && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
    }

    private @UnknownKeyFor @NonNull @Initialized Timestamp getNextReadChangeStreamEndTimestamp() {
        Timestamp current = Timestamp.now();
        return Timestamp.ofTimeSecondsAndNanos((long)(current.getSeconds() + 120L), (int)current.getNanos());
    }
}

