/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.sdk.io.gcp.pubsublite.InitialOffsetReader;
import org.apache.beam.sdk.io.gcp.pubsublite.OffsetByteProgress;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartition;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionProcessor;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionProcessorFactory;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;

class PerSubscriptionPartitionSdf
extends DoFn<SubscriptionPartition, SequencedMessage> {
    private final @UnknownKeyFor @NonNull @Initialized Duration maxSleepTime;
    private final @UnknownKeyFor @NonNull @Initialized SubscriptionPartitionProcessorFactory processorFactory;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized InitialOffsetReader> offsetReaderFactory;
    private final @UnknownKeyFor @NonNull @Initialized SerializableBiFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized OffsetByteProgress>> trackerFactory;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized Committer> committerFactory;

    PerSubscriptionPartitionSdf(@UnknownKeyFor @NonNull @Initialized Duration maxSleepTime, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized InitialOffsetReader> offsetReaderFactory, @UnknownKeyFor @NonNull @Initialized SerializableBiFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized OffsetByteProgress>> trackerFactory, @UnknownKeyFor @NonNull @Initialized SubscriptionPartitionProcessorFactory processorFactory, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized Committer> committerFactory) {
        this.maxSleepTime = maxSleepTime;
        this.processorFactory = processorFactory;
        this.offsetReaderFactory = offsetReaderFactory;
        this.trackerFactory = trackerFactory;
        this.committerFactory = committerFactory;
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public @UnknownKeyFor @NonNull @Initialized Instant getInitialWatermarkState() {
        return Instant.EPOCH;
    }

    @DoFn.NewWatermarkEstimator
    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized WatermarkEstimators.MonotonicallyIncreasing newWatermarkEstimator(@DoFn.WatermarkEstimatorState @UnknownKeyFor @NonNull @Initialized Instant state) {
        return new WatermarkEstimators.MonotonicallyIncreasing(state);
    }

    @DoFn.ProcessElement
    public // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation processElement(@UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized OffsetByteProgress> tracker, @DoFn.Element @UnknownKeyFor @NonNull @Initialized SubscriptionPartition subscriptionPartition, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized SequencedMessage> receiver, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer finalizer) throws @UnknownKeyFor @NonNull @Initialized Exception {
        try (SubscriptionPartitionProcessor processor = this.processorFactory.newProcessor(subscriptionPartition, tracker, receiver);){
            processor.start();
            DoFn.ProcessContinuation result = processor.waitForCompletion(this.maxSleepTime);
            processor.lastClaimed().ifPresent(lastClaimedOffset -> finalizer.afterBundleCommit(Instant.ofEpochMilli((long)Long.MAX_VALUE), () -> {
                Committer committer = (Committer)this.committerFactory.apply((Object)subscriptionPartition);
                committer.startAsync().awaitRunning();
                committer.commitOffset(Offset.of((long)(lastClaimedOffset.value() + 1L))).get();
                committer.stopAsync().awaitTerminated();
            }));
            DoFn.ProcessContinuation processContinuation = result;
            return processContinuation;
        }
    }

    @DoFn.GetInitialRestriction
    public @UnknownKeyFor @NonNull @Initialized OffsetRange getInitialRestriction(@DoFn.Element @UnknownKeyFor @NonNull @Initialized SubscriptionPartition subscriptionPartition) {
        try (InitialOffsetReader reader = (InitialOffsetReader)this.offsetReaderFactory.apply((Object)subscriptionPartition);){
            Offset offset = reader.read();
            OffsetRange offsetRange = new OffsetRange(offset.value(), Long.MAX_VALUE);
            return offsetRange;
        }
    }

    @DoFn.NewTracker
    public @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized OffsetByteProgress> newTracker(@DoFn.Element @UnknownKeyFor @NonNull @Initialized SubscriptionPartition subscriptionPartition, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange range) {
        return (RestrictionTracker)this.trackerFactory.apply((Object)subscriptionPartition, (Object)range);
    }
}

