/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.ApiServices;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.BlockingCommitter;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.CheckpointMarkImpl;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.TopicBacklogReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

public class UnboundedReaderImpl
extends UnboundedSource.UnboundedReader<SequencedMessage> {
    private final @UnknownKeyFor @NonNull @Initialized UnboundedSource<@UnknownKeyFor @NonNull @Initialized SequencedMessage, @UnknownKeyFor @NonNull @Initialized CheckpointMarkImpl> source;
    private final @UnknownKeyFor @NonNull @Initialized MemoryBufferedSubscriber subscriber;
    private final @UnknownKeyFor @NonNull @Initialized TopicBacklogReader backlogReader;
    private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized BlockingCommitter> committer;
    private @UnknownKeyFor @NonNull @Initialized Offset fetchOffset;
    private @UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized Instant> lastMessageTimestamp = Optional.empty();
    private @UnknownKeyFor @NonNull @Initialized boolean advanced = false;

    UnboundedReaderImpl(@UnknownKeyFor @NonNull @Initialized UnboundedSource<@UnknownKeyFor @NonNull @Initialized SequencedMessage, @UnknownKeyFor @NonNull @Initialized CheckpointMarkImpl> source, @UnknownKeyFor @NonNull @Initialized MemoryBufferedSubscriber subscriber, @UnknownKeyFor @NonNull @Initialized TopicBacklogReader backlogReader, @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized BlockingCommitter> committer, @UnknownKeyFor @NonNull @Initialized Offset initialOffset) {
        Preconditions.checkArgument((boolean)initialOffset.equals(subscriber.fetchOffset()));
        this.source = source;
        this.subscriber = subscriber;
        this.backlogReader = backlogReader;
        this.committer = committer;
        this.fetchOffset = initialOffset;
    }

    public @UnknownKeyFor @NonNull @Initialized SequencedMessage getCurrent() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
        if (!this.advanced) {
            throw new NoSuchElementException();
        }
        return this.subscriber.peek().get();
    }

    public @UnknownKeyFor @NonNull @Initialized Instant getCurrentTimestamp() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
        return UnboundedReaderImpl.getTimestamp(this.getCurrent());
    }

    private static @UnknownKeyFor @NonNull @Initialized Instant getTimestamp(@UnknownKeyFor @NonNull @Initialized SequencedMessage message) {
        return Instant.ofEpochMilli((long)Timestamps.toMillis((Timestamp)message.getPublishTime()));
    }

    public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
        try (TopicBacklogReader c1 = this.backlogReader;){
            AutoCloseable c3 = ApiServices.asCloseable(this.subscriber);
            if (c3 != null) {
                c3.close();
            }
        }
        catch (Exception e) {
            throw new IOException("Failed when closing reader.", e);
        }
    }

    public @UnknownKeyFor @NonNull @Initialized boolean start() throws @UnknownKeyFor @NonNull @Initialized IOException {
        try {
            this.subscriber.startAsync().awaitRunning(1L, TimeUnit.MINUTES);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        return this.advance();
    }

    public @UnknownKeyFor @NonNull @Initialized boolean advance() throws @UnknownKeyFor @NonNull @Initialized IOException {
        if (!this.subscriber.state().equals((Object)ApiService.State.RUNNING)) {
            Throwable t = this.subscriber.failureCause();
            if ("DUPLICATE_SUBSCRIBER_CONNECTIONS".equals(ExtractStatus.getErrorInfoReason((CheckedApiException)ExtractStatus.toCanonical((Throwable)t)))) {
                throw new IOException("Partition reassigned to a different worker- this is expected and can be ignored.", t);
            }
            throw new IOException("Subscriber failed when trying to advance.", t);
        }
        if (this.advanced) {
            this.subscriber.pop();
        }
        Optional<SequencedMessage> next = this.subscriber.peek();
        this.advanced = next.isPresent();
        if (!this.advanced) {
            return false;
        }
        Offset nextOffset = Offset.of((long)(next.get().getCursor().getOffset() + 1L));
        Preconditions.checkState((nextOffset.value() > this.fetchOffset.value() ? 1 : 0) != 0);
        this.fetchOffset = nextOffset;
        this.lastMessageTimestamp = Optional.of(UnboundedReaderImpl.getTimestamp(next.get()));
        return true;
    }

    public @UnknownKeyFor @NonNull @Initialized Instant getWatermark() {
        return this.lastMessageTimestamp.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
    }

    public @UnknownKeyFor @NonNull @Initialized CheckpointMarkImpl getCheckpointMark() {
        this.subscriber.rebuffer();
        return new CheckpointMarkImpl(this.fetchOffset, this.committer);
    }

    public @UnknownKeyFor @NonNull @Initialized UnboundedSource<@UnknownKeyFor @NonNull @Initialized SequencedMessage, @UnknownKeyFor @NonNull @Initialized CheckpointMarkImpl> getCurrentSource() {
        return this.source;
    }

    public @UnknownKeyFor @NonNull @Initialized long getSplitBacklogBytes() {
        return this.backlogReader.computeMessageStats(this.fetchOffset).getMessageBytes();
    }
}

