/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteProgress;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteRange;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartition;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessor;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubscriptionPartitionProcessorImpl
implements SubscriptionPartitionProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
    private final SubscriptionPartition subscriptionPartition;
    private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
    private final DoFn.OutputReceiver<SequencedMessage> receiver;
    private final MemoryBufferedSubscriber subscriber;
    private Optional<Offset> lastClaimedOffset = Optional.empty();

    SubscriptionPartitionProcessorImpl(SubscriptionPartition subscriptionPartition, RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker, DoFn.OutputReceiver<SequencedMessage> receiver, Supplier<MemoryBufferedSubscriber> subscriberFactory) {
        this.subscriptionPartition = subscriptionPartition;
        this.tracker = tracker;
        this.receiver = receiver;
        this.subscriber = this.getReadySubscriber(subscriberFactory);
    }

    @Override
    public DoFn.ProcessContinuation run() {
        Optional<SequencedMessage> next = this.subscriber.peek();
        while (next.isPresent()) {
            SequencedMessage message = next.get();
            Offset messageOffset = Offset.of((long)message.getCursor().getOffset());
            if (!this.tracker.tryClaim((Object)OffsetByteProgress.of(messageOffset, message.getSizeBytes()))) {
                return DoFn.ProcessContinuation.stop();
            }
            this.subscriber.pop();
            this.lastClaimedOffset = Optional.of(messageOffset);
            this.receiver.outputWithTimestamp((Object)message, new Instant(Timestamps.toMillis((Timestamp)message.getPublishTime())));
            next = this.subscriber.peek();
        }
        return DoFn.ProcessContinuation.resume();
    }

    @Override
    public Optional<Offset> lastClaimed() {
        return this.lastClaimedOffset;
    }

    private MemoryBufferedSubscriber getReadySubscriber(Supplier<MemoryBufferedSubscriber> getOrCreate) {
        Offset startOffset = Offset.of((long)((OffsetByteRange)this.tracker.currentRestriction()).getRange().getFrom());
        while (true) {
            MemoryBufferedSubscriber subscriber;
            Offset fetchOffset;
            if (startOffset.equals(fetchOffset = (subscriber = getOrCreate.get()).fetchOffset())) {
                subscriber.rebuffer();
                return subscriber;
            }
            LOG.info("Discarding subscriber due to mismatch, this should be rare. {}, start: {} fetch: {}", new Object[]{this.subscriptionPartition, startOffset, fetchOffset});
            try {
                subscriber.stopAsync().awaitTerminated();
            }
            catch (Exception exception) {
            }
        }
    }
}

