/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;

import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ReadChangeStreamPartitionAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.NullSizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DoFn.UnboundedPerElement
@Internal
public class ReadChangeStreamPartitionDoFn
extends DoFn<PartitionRecord, KV<ByteString, ChangeStreamRecord>> {
    private static final @UnknownKeyFor @NonNull @Initialized long serialVersionUID = 4418739381635104479L;
    private static final @UnknownKeyFor @NonNull @Initialized BigDecimal MAX_DOUBLE = BigDecimal.valueOf(Double.MAX_VALUE);
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
    private static final @UnknownKeyFor @NonNull @Initialized Duration HEARTBEAT_DURATION = Duration.standardSeconds((long)1L);
    private final @UnknownKeyFor @NonNull @Initialized DaoFactory daoFactory;
    private final @UnknownKeyFor @NonNull @Initialized ChangeStreamMetrics metrics;
    private final @UnknownKeyFor @NonNull @Initialized ActionFactory actionFactory;
    private @UnknownKeyFor @NonNull @Initialized SizeEstimator<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ByteString, @UnknownKeyFor @NonNull @Initialized ChangeStreamRecord>> sizeEstimator;
    private @UnknownKeyFor @NonNull @Initialized ReadChangeStreamPartitionAction readChangeStreamPartitionAction;

    public ReadChangeStreamPartitionDoFn(@UnknownKeyFor @NonNull @Initialized DaoFactory daoFactory, @UnknownKeyFor @NonNull @Initialized ActionFactory actionFactory, @UnknownKeyFor @NonNull @Initialized ChangeStreamMetrics metrics) {
        this.daoFactory = daoFactory;
        this.metrics = metrics;
        this.actionFactory = actionFactory;
        this.sizeEstimator = new NullSizeEstimator<KV<ByteString, ChangeStreamRecord>>();
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public @UnknownKeyFor @NonNull @Initialized Instant getInitialWatermarkEstimatorState(@DoFn.Element @UnknownKeyFor @NonNull @Initialized PartitionRecord partitionRecord) {
        return partitionRecord.getParentLowWatermark();
    }

    @DoFn.NewWatermarkEstimator
    public @UnknownKeyFor @NonNull @Initialized ManualWatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState @UnknownKeyFor @NonNull @Initialized Instant watermarkEstimatorState) {
        return new WatermarkEstimators.Manual(watermarkEstimatorState);
    }

    @DoFn.GetInitialRestriction
    public @UnknownKeyFor @NonNull @Initialized StreamProgress initialRestriction() {
        this.metrics.incPartitionStreamCount();
        return new StreamProgress();
    }

    @DoFn.NewTracker
    public @UnknownKeyFor @NonNull @Initialized ReadChangeStreamPartitionProgressTracker restrictionTracker(@DoFn.Restriction @UnknownKeyFor @NonNull @Initialized StreamProgress restriction) {
        return new ReadChangeStreamPartitionProgressTracker(restriction);
    }

    @DoFn.GetSize
    public @UnknownKeyFor @NonNull @Initialized double getSize(@DoFn.Restriction @UnknownKeyFor @NonNull @Initialized StreamProgress streamProgress) {
        if (streamProgress == null) {
            return 0.0;
        }
        Instant lowWatermark = streamProgress.getEstimatedLowWatermark();
        BigDecimal estimatedThroughput = streamProgress.getThroughputEstimate();
        Instant lastRunTimestamp = streamProgress.getLastRunTimestamp();
        if (lowWatermark == null || estimatedThroughput == null || lastRunTimestamp == null) {
            return 0.0;
        }
        String partition = "";
        if (streamProgress.getCurrentToken() != null) {
            partition = ByteStringRangeHelper.formatByteStringRange(((ChangeStreamContinuationToken)Preconditions.checkNotNull((Object)streamProgress.getCurrentToken())).getPartition());
        }
        Duration processingTimeLag = Duration.millis((long)(Instant.now().getMillis() - streamProgress.getLastRunTimestamp().getMillis()));
        Duration watermarkLag = Duration.millis((long)(Instant.now().getMillis() - lowWatermark.getMillis()));
        long lagInMillis = (streamProgress.isHeartbeat() ? processingTimeLag : watermarkLag).getMillis();
        double estimatedSize = estimatedThroughput.multiply(BigDecimal.valueOf(lagInMillis)).divide(BigDecimal.valueOf(1000L), 3, RoundingMode.DOWN).min(MAX_DOUBLE).max(BigDecimal.ZERO).doubleValue();
        LOG.debug("Estimated size (per second): partition: {}, isHeartbeat: {}, throughputBytes: {} x watermarkLagMillis {} = {}, lastRun = {}", new Object[]{partition, streamProgress.isHeartbeat(), estimatedThroughput, lagInMillis, estimatedSize, streamProgress.getLastRunTimestamp()});
        return estimatedSize;
    }

    @DoFn.Setup
    public void setup() throws @UnknownKeyFor @NonNull @Initialized IOException {
        MetadataTableDao metadataTableDao = this.daoFactory.getMetadataTableDao();
        ChangeStreamDao changeStreamDao = this.daoFactory.getChangeStreamDao();
        ChangeStreamAction changeStreamAction = this.actionFactory.changeStreamAction(this.metrics);
        this.readChangeStreamPartitionAction = this.actionFactory.readChangeStreamPartitionAction(metadataTableDao, changeStreamDao, this.metrics, changeStreamAction, HEARTBEAT_DURATION, this.sizeEstimator);
    }

    @DoFn.ProcessElement
    public // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized PartitionRecord partitionRecord, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized StreamProgress, @UnknownKeyFor @NonNull @Initialized StreamProgress> tracker, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ByteString, @UnknownKeyFor @NonNull @Initialized ChangeStreamRecord>> receiver, @UnknownKeyFor @NonNull @Initialized ManualWatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator) throws @UnknownKeyFor @NonNull @Initialized InterruptedException, @UnknownKeyFor @NonNull @Initialized IOException {
        return this.readChangeStreamPartitionAction.run(partitionRecord, tracker, receiver, watermarkEstimator);
    }

    public void setSizeEstimator(@UnknownKeyFor @NonNull @Initialized CoderSizeEstimator<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ByteString, @UnknownKeyFor @NonNull @Initialized ChangeStreamRecord>> sizeEstimator) {
        this.sizeEstimator = sizeEstimator;
    }
}

