/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ResultSet;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DoFn.UnboundedPerElement
public class DetectNewPartitionsDoFn
extends DoFn<byte[], PartitionMetadata> {
    private static final long serialVersionUID = 1523712495885011374L;
    private static final Duration DEFAULT_RESUME_DURATION = Duration.millis((long)100L);
    private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsDoFn.class);
    private static final Tracer TRACER = Tracing.getTracer();
    private final Duration resumeDuration;
    private final DaoFactory daoFactory;
    private final MapperFactory mapperFactory;
    private final ChangeStreamMetrics metrics;
    private transient PartitionMetadataDao partitionMetadataDao;
    private transient PartitionMetadataMapper partitionMetadataMapper;

    public DetectNewPartitionsDoFn(DaoFactory daoFactory, MapperFactory mapperFactory, ChangeStreamMetrics metrics) {
        this(daoFactory, mapperFactory, metrics, DEFAULT_RESUME_DURATION);
    }

    public DetectNewPartitionsDoFn(DaoFactory daoFactory, MapperFactory mapperFactory, ChangeStreamMetrics metrics, Duration resumeDuration) {
        this.daoFactory = daoFactory;
        this.mapperFactory = mapperFactory;
        this.metrics = metrics;
        this.resumeDuration = resumeDuration;
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant currentElementTimestamp) {
        return currentElementTimestamp;
    }

    @DoFn.NewWatermarkEstimator
    public ManualWatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant watermarkEstimatorState) {
        return new WatermarkEstimators.Manual(watermarkEstimatorState);
    }

    @DoFn.GetInitialRestriction
    public OffsetRange initialRestriction() {
        return new OffsetRange(0L, 9223372036854765807L);
    }

    @DoFn.NewTracker
    public OffsetRangeTracker restrictionTracker(@DoFn.Restriction OffsetRange restriction) {
        return new OffsetRangeTracker(restriction);
    }

    @DoFn.Setup
    public void setup() {
        this.partitionMetadataDao = this.daoFactory.getPartitionMetadataDao();
        this.partitionMetadataMapper = this.mapperFactory.partitionMetadataMapper();
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(RestrictionTracker<OffsetRange, Long> tracker, DoFn.OutputReceiver<PartitionMetadata> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator) {
        try (Scope scope = TRACER.spanBuilder("DetectNewPartitionsDoFn.processElement").setRecordEvents(true).startScopedSpan();){
            Timestamp minWatermark = this.getUnfinishedMinWatermark();
            if (minWatermark != null) {
                Instant minWatermarkInstant = new Instant((Object)minWatermark.toSqlTimestamp());
                watermarkEstimator.setWatermark(minWatermarkInstant);
                try (ResultSet resultSet = this.partitionMetadataDao.getPartitionsInState(PartitionMetadata.State.CREATED);){
                    long currentIndex = ((OffsetRange)tracker.currentRestriction()).getFrom();
                    while (resultSet.next()) {
                        if (!tracker.tryClaim((Object)currentIndex)) {
                            LOG.debug("Could not claim " + currentIndex + ", stopping...");
                            DoFn.ProcessContinuation processContinuation = DoFn.ProcessContinuation.stop();
                            return processContinuation;
                        }
                        PartitionMetadata partition = this.partitionMetadataMapper.from(resultSet.getCurrentRowAsStruct());
                        Timestamp scheduledAt = this.schedulePartition(partition);
                        PartitionMetadata updatedPartition = partition.toBuilder().setScheduledAt(scheduledAt).build();
                        receiver.outputWithTimestamp((Object)updatedPartition, minWatermarkInstant);
                        this.metrics.incPartitionRecordCount();
                        ++currentIndex;
                    }
                }
                DoFn.ProcessContinuation processContinuation = DoFn.ProcessContinuation.resume().withResumeDelay(this.resumeDuration);
                return processContinuation;
            }
            if (!tracker.tryClaim((Object)((OffsetRange)tracker.currentRestriction()).getTo())) {
                LOG.warn("Failed to claim the end of range in DetectNewPartitionsDoFn.");
            }
            LOG.info("All partitions have been processed, stopping");
            DoFn.ProcessContinuation processContinuation = DoFn.ProcessContinuation.stop();
            return processContinuation;
        }
    }

    private Timestamp getUnfinishedMinWatermark() {
        try (Scope scope = TRACER.spanBuilder("DetectNewPartitionsDoFn.getUnfinishedMinWatermark").setRecordEvents(true).startScopedSpan();){
            Timestamp timestamp = this.partitionMetadataDao.getUnfinishedMinWatermark();
            return timestamp;
        }
    }

    private Timestamp schedulePartition(PartitionMetadata partition) {
        String token = partition.getPartitionToken();
        try (Scope scope = TRACER.spanBuilder("DetectNewPartitionsDoFn.getUnfinishedMinWatermark").setRecordEvents(true).startScopedSpan();){
            TRACER.getCurrentSpan().putAttribute("PartitionID", AttributeValue.stringAttributeValue((String)token));
            Timestamp createdAt = partition.getCreatedAt();
            LOG.debug("[" + token + "] Scheduling partition");
            Timestamp scheduledAt = this.partitionMetadataDao.updateToScheduled(token);
            LOG.info("[" + token + "] Scheduled partition at " + scheduledAt + " with start time " + partition.getStartTimestamp() + " and end time " + partition.getEndTimestamp());
            this.metrics.updatePartitionCreatedToScheduled(new Duration(createdAt.toDate().getTime(), scheduledAt.toSqlTimestamp().getTime()));
            Timestamp timestamp = scheduledAt;
            return timestamp;
        }
    }
}

