/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessagePayloadOnlyCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BucketingFunction;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubUnboundedSource
extends PTransform<PBegin, PCollection<PubsubMessage>> {
    private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
    private static final int DEAULT_ACK_TIMEOUT_SEC = 60;
    private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = PubsubCheckpointCoder.of();
    private static final int PULL_BATCH_SIZE = 1000;
    private static final int ACK_BATCH_SIZE = 2000;
    private static final int MAX_IN_FLIGHT = 20000;
    private static final Duration PROCESSING_TIMEOUT = Duration.standardSeconds((long)120L);
    private static final int ACK_EXTENSION_PCT = 50;
    private static final int ACK_SAFETY_PCT = 20;
    private static final Duration ACK_TOO_LATE = Duration.standardSeconds((long)2L);
    private static final Duration SAMPLE_PERIOD = Duration.standardMinutes((long)1L);
    private static final Duration SAMPLE_UPDATE = Duration.standardSeconds((long)5L);
    private static final Duration LOG_PERIOD = Duration.standardSeconds((long)30L);
    private static final int MIN_WATERMARK_MESSAGES = 10;
    private static final int MIN_WATERMARK_SPREAD = 2;
    private static final int SCALE_OUT = 4;
    private static final Combine.BinaryCombineLongFn MIN = new Combine.BinaryCombineLongFn(){

        public long apply(long left, long right) {
            return Math.min(left, right);
        }

        public long identity() {
            return Long.MAX_VALUE;
        }
    };
    private static final Combine.BinaryCombineLongFn MAX = new Combine.BinaryCombineLongFn(){

        public long apply(long left, long right) {
            return Math.max(left, right);
        }

        public long identity() {
            return Long.MIN_VALUE;
        }
    };
    private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
    @Nullable
    private Clock clock;
    private final PubsubClient.PubsubClientFactory pubsubFactory;
    @Nullable
    private final ValueProvider<PubsubClient.ProjectPath> project;
    @Nullable
    private final ValueProvider<PubsubClient.TopicPath> topic;
    @Nullable
    private ValueProvider<PubsubClient.SubscriptionPath> subscription;
    @Nullable
    private final String timestampAttribute;
    @Nullable
    private final String idAttribute;
    private final boolean needsAttributes;

    @VisibleForTesting
    PubsubUnboundedSource(Clock clock, PubsubClient.PubsubClientFactory pubsubFactory, @Nullable ValueProvider<PubsubClient.ProjectPath> project, @Nullable ValueProvider<PubsubClient.TopicPath> topic, @Nullable ValueProvider<PubsubClient.SubscriptionPath> subscription, @Nullable String timestampAttribute, @Nullable String idAttribute, boolean needsAttributes) {
        Preconditions.checkArgument((topic == null != (subscription == null) ? 1 : 0) != 0, (Object)"Exactly one of topic and subscription must be given");
        Preconditions.checkArgument((topic == null == (project == null) ? 1 : 0) != 0, (Object)"Project must be given if topic is given");
        this.clock = clock;
        this.pubsubFactory = (PubsubClient.PubsubClientFactory)Preconditions.checkNotNull((Object)pubsubFactory);
        this.project = project;
        this.topic = topic;
        this.subscription = subscription;
        this.timestampAttribute = timestampAttribute;
        this.idAttribute = idAttribute;
        this.needsAttributes = needsAttributes;
    }

    public PubsubUnboundedSource(PubsubClient.PubsubClientFactory pubsubFactory, @Nullable ValueProvider<PubsubClient.ProjectPath> project, @Nullable ValueProvider<PubsubClient.TopicPath> topic, @Nullable ValueProvider<PubsubClient.SubscriptionPath> subscription, @Nullable String timestampAttribute, @Nullable String idAttribute, boolean needsAttributes) {
        this(null, pubsubFactory, project, topic, subscription, timestampAttribute, idAttribute, needsAttributes);
    }

    @Nullable
    public PubsubClient.ProjectPath getProject() {
        return this.project == null ? null : (PubsubClient.ProjectPath)this.project.get();
    }

    @Nullable
    public PubsubClient.TopicPath getTopic() {
        return this.topic == null ? null : (PubsubClient.TopicPath)this.topic.get();
    }

    @Nullable
    public ValueProvider<PubsubClient.TopicPath> getTopicProvider() {
        return this.topic;
    }

    @Nullable
    public PubsubClient.SubscriptionPath getSubscription() {
        return this.subscription == null ? null : (PubsubClient.SubscriptionPath)this.subscription.get();
    }

    @Nullable
    public ValueProvider<PubsubClient.SubscriptionPath> getSubscriptionProvider() {
        return this.subscription;
    }

    @Nullable
    public String getTimestampAttribute() {
        return this.timestampAttribute;
    }

    @Nullable
    public String getIdAttribute() {
        return this.idAttribute;
    }

    public boolean getNeedsAttributes() {
        return this.needsAttributes;
    }

    public PCollection<PubsubMessage> expand(PBegin input) {
        return (PCollection)((PCollection)input.getPipeline().begin().apply((PTransform)Read.from((UnboundedSource)new PubsubSource(this)))).apply("PubsubUnboundedSource.Stats", (PTransform)ParDo.of((DoFn)new StatsFn(this.pubsubFactory, this.subscription, this.topic, this.timestampAttribute, this.idAttribute)));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private PubsubClient.SubscriptionPath createRandomSubscription(PipelineOptions options) {
        try (PubsubClient pubsubClient = this.pubsubFactory.newClient(this.timestampAttribute, this.idAttribute, (PubsubOptions)options.as(PubsubOptions.class));){
            PubsubClient.SubscriptionPath subscriptionPath = pubsubClient.createRandomSubscription((PubsubClient.ProjectPath)this.project.get(), (PubsubClient.TopicPath)this.topic.get(), 60);
            LOG.warn("Created subscription {} to topic {}. Note this subscription WILL NOT be deleted when the pipeline terminates", (Object)subscriptionPath, this.topic);
            PubsubClient.SubscriptionPath subscriptionPath2 = subscriptionPath;
            return subscriptionPath2;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create subscription: ", e);
        }
    }

    private static class StatsFn
    extends DoFn<PubsubMessage, PubsubMessage> {
        private final Counter elementCounter = SourceMetrics.elementsRead();
        private final PubsubClient.PubsubClientFactory pubsubFactory;
        @Nullable
        private final ValueProvider<PubsubClient.SubscriptionPath> subscription;
        @Nullable
        private final ValueProvider<PubsubClient.TopicPath> topic;
        @Nullable
        private final String timestampAttribute;
        @Nullable
        private final String idAttribute;

        public StatsFn(PubsubClient.PubsubClientFactory pubsubFactory, @Nullable ValueProvider<PubsubClient.SubscriptionPath> subscription, @Nullable ValueProvider<PubsubClient.TopicPath> topic, @Nullable String timestampAttribute, @Nullable String idAttribute) {
            Preconditions.checkArgument((pubsubFactory != null ? 1 : 0) != 0, (Object)"pubsubFactory should not be null");
            this.pubsubFactory = pubsubFactory;
            this.subscription = subscription;
            this.topic = topic;
            this.timestampAttribute = timestampAttribute;
            this.idAttribute = idAttribute;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            this.elementCounter.inc();
            c.output(c.element());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"subscription", this.subscription)).addIfNotNull(DisplayData.item((String)"topic", this.topic)).add(DisplayData.item((String)"transport", (String)this.pubsubFactory.getKind())).addIfNotNull(DisplayData.item((String)"timestampAttribute", (String)this.timestampAttribute)).addIfNotNull(DisplayData.item((String)"idAttribute", (String)this.idAttribute));
        }
    }

    @VisibleForTesting
    static class PubsubSource
    extends UnboundedSource<PubsubMessage, PubsubCheckpoint> {
        public final PubsubUnboundedSource outer;
        @VisibleForTesting
        final ValueProvider<PubsubClient.SubscriptionPath> subscriptionPath;

        public PubsubSource(PubsubUnboundedSource outer) {
            this(outer, outer.getSubscriptionProvider());
        }

        private PubsubSource(PubsubUnboundedSource outer, ValueProvider<PubsubClient.SubscriptionPath> subscriptionPath) {
            this.outer = outer;
            this.subscriptionPath = subscriptionPath;
        }

        public List<PubsubSource> split(int desiredNumSplits, PipelineOptions options) throws Exception {
            ArrayList<PubsubSource> result = new ArrayList<PubsubSource>(desiredNumSplits);
            PubsubSource splitSource = this;
            if (this.subscriptionPath == null) {
                splitSource = new PubsubSource(this.outer, (ValueProvider<PubsubClient.SubscriptionPath>)ValueProvider.StaticValueProvider.of((Object)this.outer.createRandomSubscription(options)));
            }
            for (int i = 0; i < desiredNumSplits * 4; ++i) {
                result.add(splitSource);
            }
            return result;
        }

        public PubsubReader createReader(PipelineOptions options, @Nullable PubsubCheckpoint checkpoint) {
            PubsubReader reader;
            PubsubClient.SubscriptionPath subscription = this.subscriptionPath == null || this.subscriptionPath.get() == null ? (checkpoint == null ? this.outer.createRandomSubscription(options) : checkpoint.getSubscription()) : (PubsubClient.SubscriptionPath)this.subscriptionPath.get();
            try {
                reader = new PubsubReader((PubsubOptions)options.as(PubsubOptions.class), this, subscription);
            }
            catch (IOException | GeneralSecurityException e) {
                throw new RuntimeException("Unable to subscribe to " + this.subscriptionPath + ": ", e);
            }
            if (checkpoint != null) {
                try {
                    checkpoint.nackAll(reader);
                }
                catch (IOException e) {
                    LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: {}", new Object[]{this.subscriptionPath, checkpoint.notYetReadIds.size(), e});
                }
            }
            return reader;
        }

        @Nullable
        public Coder<PubsubCheckpoint> getCheckpointMarkCoder() {
            return CHECKPOINT_CODER;
        }

        public Coder<PubsubMessage> getOutputCoder() {
            return this.outer.getNeedsAttributes() ? PubsubMessageWithAttributesCoder.of() : PubsubMessagePayloadOnlyCoder.of();
        }

        public void validate() {
        }

        public boolean requiresDeduping() {
            return true;
        }
    }

    @VisibleForTesting
    static class PubsubReader
    extends UnboundedSource.UnboundedReader<PubsubMessage> {
        private final PubsubSource outer;
        @VisibleForTesting
        final PubsubClient.SubscriptionPath subscription;
        private AtomicReference<PubsubClient> pubsubClient;
        private AtomicBoolean active = new AtomicBoolean(true);
        private int ackTimeoutMs;
        private Set<String> safeToAckIds;
        private final Queue<PubsubClient.IncomingMessage> notYetRead;
        private final LinkedHashMap<String, InFlightState> inFlight;
        private final Queue<List<String>> ackedIds;
        private long notYetReadBytes;
        private BucketingFunction minUnreadTimestampMsSinceEpoch;
        private MovingFunction minReadTimestampMsSinceEpoch;
        private long lastReceivedMsSinceEpoch;
        private long lastWatermarkMsSinceEpoch;
        @Nullable
        private PubsubClient.IncomingMessage current;
        private long lastLogTimestampMsSinceEpoch;
        private long numReceived;
        private MovingFunction numReceivedRecently;
        private MovingFunction numExtendedDeadlines;
        private MovingFunction numLateDeadlines;
        private MovingFunction numAcked;
        private MovingFunction numExpired;
        private MovingFunction numNacked;
        private MovingFunction numReadBytes;
        private MovingFunction minReceivedTimestampMsSinceEpoch;
        private MovingFunction maxReceivedTimestampMsSinceEpoch;
        private MovingFunction minWatermarkMsSinceEpoch;
        private MovingFunction maxWatermarkMsSinceEpoch;
        private MovingFunction numLateMessages;
        private AtomicInteger numInFlightCheckpoints;
        private int maxInFlightCheckpoints;

        private static MovingFunction newFun(Combine.BinaryCombineLongFn function) {
            return new MovingFunction(SAMPLE_PERIOD.getMillis(), SAMPLE_UPDATE.getMillis(), 2, 10, function);
        }

        public PubsubReader(PubsubOptions options, PubsubSource outer, PubsubClient.SubscriptionPath subscription) throws IOException, GeneralSecurityException {
            this.outer = outer;
            this.subscription = subscription;
            this.pubsubClient = new AtomicReference<PubsubClient>(outer.outer.pubsubFactory.newClient(outer.outer.timestampAttribute, outer.outer.idAttribute, options));
            this.ackTimeoutMs = -1;
            this.safeToAckIds = new HashSet<String>();
            this.notYetRead = new ArrayDeque<PubsubClient.IncomingMessage>();
            this.inFlight = new LinkedHashMap();
            this.ackedIds = new ConcurrentLinkedQueue<List<String>>();
            this.notYetReadBytes = 0L;
            this.minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(), 2, 10, MIN);
            this.minReadTimestampMsSinceEpoch = PubsubReader.newFun(MIN);
            this.lastReceivedMsSinceEpoch = -1L;
            this.lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
            this.current = null;
            this.lastLogTimestampMsSinceEpoch = -1L;
            this.numReceived = 0L;
            this.numReceivedRecently = PubsubReader.newFun(SUM);
            this.numExtendedDeadlines = PubsubReader.newFun(SUM);
            this.numLateDeadlines = PubsubReader.newFun(SUM);
            this.numAcked = PubsubReader.newFun(SUM);
            this.numExpired = PubsubReader.newFun(SUM);
            this.numNacked = PubsubReader.newFun(SUM);
            this.numReadBytes = PubsubReader.newFun(SUM);
            this.minReceivedTimestampMsSinceEpoch = PubsubReader.newFun(MIN);
            this.maxReceivedTimestampMsSinceEpoch = PubsubReader.newFun(MAX);
            this.minWatermarkMsSinceEpoch = PubsubReader.newFun(MIN);
            this.maxWatermarkMsSinceEpoch = PubsubReader.newFun(MAX);
            this.numLateMessages = PubsubReader.newFun(SUM);
            this.numInFlightCheckpoints = new AtomicInteger();
            this.maxInFlightCheckpoints = 0;
        }

        @VisibleForTesting
        PubsubClient getPubsubClient() {
            return this.pubsubClient.get();
        }

        void ackBatch(List<String> ackIds) throws IOException {
            this.pubsubClient.get().acknowledge(this.subscription, ackIds);
            this.ackedIds.add(ackIds);
        }

        public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
            this.pubsubClient.get().modifyAckDeadline(this.subscription, ackIds, 0);
            this.numNacked.add(nowMsSinceEpoch, (long)ackIds.size());
        }

        private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
            int extensionSec = this.ackTimeoutMs * 50 / 100000;
            this.pubsubClient.get().modifyAckDeadline(this.subscription, ackIds, extensionSec);
            this.numExtendedDeadlines.add(nowMsSinceEpoch, (long)ackIds.size());
        }

        private long now() {
            if (this.outer.outer.clock == null) {
                return System.currentTimeMillis();
            }
            return this.outer.outer.clock.currentTimeMillis();
        }

        private void retire() throws IOException {
            long nowMsSinceEpoch = this.now();
            List<String> ackIds;
            block0: while ((ackIds = this.ackedIds.poll()) != null) {
                this.numAcked.add(nowMsSinceEpoch, (long)ackIds.size());
                Iterator<String> iterator = ackIds.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block0;
                    String ackId = iterator.next();
                    this.inFlight.remove(ackId);
                    this.safeToAckIds.remove(ackId);
                }
                break;
            }
            return;
        }

        private void extend() throws IOException {
            while (true) {
                long nowMsSinceEpoch = this.now();
                ArrayList<String> assumeExpired = new ArrayList<String>();
                ArrayList<String> toBeExtended = new ArrayList<String>();
                ArrayList<String> toBeExpired = new ArrayList<String>();
                for (Map.Entry<String, InFlightState> entry : this.inFlight.entrySet()) {
                    if (entry.getValue().ackDeadlineMsSinceEpoch - (long)(this.ackTimeoutMs * 20 / 100) > nowMsSinceEpoch) break;
                    if (entry.getValue().ackDeadlineMsSinceEpoch - ACK_TOO_LATE.getMillis() < nowMsSinceEpoch) {
                        assumeExpired.add(entry.getKey());
                        continue;
                    }
                    if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis() < nowMsSinceEpoch) {
                        toBeExpired.add(entry.getKey());
                        continue;
                    }
                    toBeExtended.add(entry.getKey());
                    if (toBeExtended.size() < 2000) continue;
                    break;
                }
                if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && toBeExpired.isEmpty()) {
                    return;
                }
                if (!assumeExpired.isEmpty()) {
                    this.numLateDeadlines.add(nowMsSinceEpoch, (long)assumeExpired.size());
                    for (String ackId : assumeExpired) {
                        this.inFlight.remove(ackId);
                    }
                }
                if (!toBeExpired.isEmpty()) {
                    this.numExpired.add(nowMsSinceEpoch, (long)toBeExpired.size());
                    for (String ackId : toBeExpired) {
                        this.inFlight.remove(ackId);
                    }
                }
                if (toBeExtended.isEmpty()) continue;
                long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (long)(this.ackTimeoutMs * 50 / 100);
                for (String ackId : toBeExtended) {
                    InFlightState state = (InFlightState)this.inFlight.remove(ackId);
                    this.inFlight.put(ackId, new InFlightState(state.requestTimeMsSinceEpoch, newDeadlineMsSinceEpoch));
                }
                this.extendBatch(nowMsSinceEpoch, toBeExtended);
            }
        }

        private void pull() throws IOException {
            if (this.inFlight.size() >= 20000) {
                return;
            }
            long requestTimeMsSinceEpoch = this.now();
            long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + (long)this.ackTimeoutMs;
            List<PubsubClient.IncomingMessage> receivedMessages = this.pubsubClient.get().pull(requestTimeMsSinceEpoch, this.subscription, 1000, true);
            if (receivedMessages.isEmpty()) {
                return;
            }
            this.lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch;
            for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
                this.notYetRead.add(incomingMessage);
                this.notYetReadBytes += (long)incomingMessage.elementBytes.length;
                this.inFlight.put(incomingMessage.ackId, new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch));
                ++this.numReceived;
                this.numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
                this.minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
                this.maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
                this.minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
            }
        }

        private void stats() {
            long nowMsSinceEpoch = this.now();
            if (this.lastLogTimestampMsSinceEpoch < 0L) {
                this.lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
                return;
            }
            long deltaMs = nowMsSinceEpoch - this.lastLogTimestampMsSinceEpoch;
            if (deltaMs < LOG_PERIOD.getMillis()) {
                return;
            }
            String messageSkew = "unknown";
            long minTimestamp = this.minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            long maxTimestamp = this.maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) {
                messageSkew = maxTimestamp - minTimestamp + "ms";
            }
            String watermarkSkew = "unknown";
            long minWatermark = this.minWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
            long maxWatermark = this.maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
            if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) {
                watermarkSkew = maxWatermark - minWatermark + "ms";
            }
            String oldestInFlight = "no";
            String oldestAckId = (String)Iterables.getFirst(this.inFlight.keySet(), null);
            if (oldestAckId != null) {
                oldestInFlight = nowMsSinceEpoch - this.inFlight.get((Object)oldestAckId).requestTimeMsSinceEpoch + "ms";
            }
            LOG.debug("Pubsub {} has {} received messages, {} current unread messages, {} current unread bytes, {} current in-flight msgs, {} oldest in-flight, {} current in-flight checkpoints, {} max in-flight checkpoints, {}B/s recent read, {} recent received, {} recent extended, {} recent late extended, {} recent ACKed, {} recent NACKed, {} recent expired, {} recent message timestamp skew, {} recent watermark skew, {} recent late messages, {} last reported watermark", new Object[]{this.subscription, this.numReceived, this.notYetRead.size(), this.notYetReadBytes, this.inFlight.size(), oldestInFlight, this.numInFlightCheckpoints.get(), this.maxInFlightCheckpoints, this.numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L), this.numReceivedRecently.get(nowMsSinceEpoch), this.numExtendedDeadlines.get(nowMsSinceEpoch), this.numLateDeadlines.get(nowMsSinceEpoch), this.numAcked.get(nowMsSinceEpoch), this.numNacked.get(nowMsSinceEpoch), this.numExpired.get(nowMsSinceEpoch), messageSkew, watermarkSkew, this.numLateMessages.get(nowMsSinceEpoch), new Instant(this.lastWatermarkMsSinceEpoch)});
            this.lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
        }

        public boolean start() throws IOException {
            this.ackTimeoutMs = this.pubsubClient.get().ackDeadlineSeconds(this.subscription) * 1000;
            return this.advance();
        }

        public boolean advance() throws IOException {
            this.stats();
            if (this.current != null) {
                this.minUnreadTimestampMsSinceEpoch.remove(this.current.requestTimeMsSinceEpoch);
                this.current = null;
            }
            this.retire();
            this.extend();
            if (this.notYetRead.isEmpty()) {
                this.pull();
            }
            this.current = this.notYetRead.poll();
            if (this.current == null) {
                return false;
            }
            this.notYetReadBytes -= (long)this.current.elementBytes.length;
            Preconditions.checkState((this.notYetReadBytes >= 0L ? 1 : 0) != 0);
            long nowMsSinceEpoch = this.now();
            this.numReadBytes.add(nowMsSinceEpoch, (long)this.current.elementBytes.length);
            this.minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, this.current.timestampMsSinceEpoch);
            if (this.current.timestampMsSinceEpoch < this.lastWatermarkMsSinceEpoch) {
                this.numLateMessages.add(nowMsSinceEpoch, 1L);
            }
            this.safeToAckIds.add(this.current.ackId);
            return true;
        }

        public PubsubMessage getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return new PubsubMessage(this.current.elementBytes, this.current.attributes);
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return new Instant(this.current.timestampMsSinceEpoch);
        }

        public byte[] getCurrentRecordId() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current.recordId.getBytes(StandardCharsets.UTF_8);
        }

        public void close() throws IOException {
            this.active.set(false);
            this.maybeCloseClient();
        }

        private void maybeCloseClient() throws IOException {
            PubsubClient client;
            if (!this.active.get() && this.numInFlightCheckpoints.get() == 0 && (client = (PubsubClient)this.pubsubClient.getAndSet(null)) != null) {
                client.close();
            }
        }

        public PubsubSource getCurrentSource() {
            return this.outer;
        }

        public Instant getWatermark() {
            if (this.pubsubClient.get().isEOF() && this.notYetRead.isEmpty()) {
                return BoundedWindow.TIMESTAMP_MAX_VALUE;
            }
            long nowMsSinceEpoch = this.now();
            long readMin = this.minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            long unreadMin = this.minUnreadTimestampMsSinceEpoch.get();
            if (readMin == Long.MAX_VALUE && unreadMin == Long.MAX_VALUE && this.lastReceivedMsSinceEpoch >= 0L && nowMsSinceEpoch > this.lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
                this.lastWatermarkMsSinceEpoch = nowMsSinceEpoch;
            } else if (this.minReadTimestampMsSinceEpoch.isSignificant() || this.minUnreadTimestampMsSinceEpoch.isSignificant()) {
                this.lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin);
            }
            this.minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, this.lastWatermarkMsSinceEpoch);
            this.maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, this.lastWatermarkMsSinceEpoch);
            return new Instant(this.lastWatermarkMsSinceEpoch);
        }

        public PubsubCheckpoint getCheckpointMark() {
            int cur = this.numInFlightCheckpoints.incrementAndGet();
            this.maxInFlightCheckpoints = Math.max(this.maxInFlightCheckpoints, cur);
            ArrayList snapshotSafeToAckIds = Lists.newArrayList(this.safeToAckIds);
            ArrayList<String> snapshotNotYetReadIds = new ArrayList<String>(this.notYetRead.size());
            for (PubsubClient.IncomingMessage incomingMessage : this.notYetRead) {
                snapshotNotYetReadIds.add(incomingMessage.ackId);
            }
            if (this.outer.subscriptionPath == null) {
                return new PubsubCheckpoint(this.subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds);
            }
            return new PubsubCheckpoint(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
        }

        public long getSplitBacklogBytes() {
            return this.notYetReadBytes;
        }

        private static class InFlightState {
            long requestTimeMsSinceEpoch;
            long ackDeadlineMsSinceEpoch;

            public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) {
                this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
                this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch;
            }
        }
    }

    private static class PubsubCheckpointCoder<T>
    extends AtomicCoder<PubsubCheckpoint> {
        private static final Coder<String> SUBSCRIPTION_PATH_CODER = NullableCoder.of((Coder)StringUtf8Coder.of());
        private static final Coder<List<String>> LIST_CODER = ListCoder.of((Coder)StringUtf8Coder.of());

        public static <T> PubsubCheckpointCoder<T> of() {
            return new PubsubCheckpointCoder<T>();
        }

        private PubsubCheckpointCoder() {
        }

        public void encode(PubsubCheckpoint value, OutputStream outStream) throws IOException {
            SUBSCRIPTION_PATH_CODER.encode((Object)value.subscriptionPath, outStream);
            LIST_CODER.encode(value.notYetReadIds, outStream);
        }

        public PubsubCheckpoint decode(InputStream inStream) throws IOException {
            String path = (String)SUBSCRIPTION_PATH_CODER.decode(inStream);
            List notYetReadIds = (List)LIST_CODER.decode(inStream);
            return new PubsubCheckpoint(path, null, null, notYetReadIds);
        }
    }

    @VisibleForTesting
    static class PubsubCheckpoint
    implements UnboundedSource.CheckpointMark {
        @Nullable
        @VisibleForTesting
        String subscriptionPath;
        @Nullable
        private PubsubReader reader;
        @Nullable
        private List<String> safeToAckIds;
        @VisibleForTesting
        final List<String> notYetReadIds;

        public PubsubCheckpoint(@Nullable String subscriptionPath, @Nullable PubsubReader reader, @Nullable List<String> safeToAckIds, List<String> notYetReadIds) {
            this.subscriptionPath = subscriptionPath;
            this.reader = reader;
            this.safeToAckIds = safeToAckIds;
            this.notYetReadIds = notYetReadIds;
        }

        @Nullable
        private PubsubClient.SubscriptionPath getSubscription() {
            return this.subscriptionPath == null ? null : PubsubClient.subscriptionPathFromPath(this.subscriptionPath);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void finalizeCheckpoint() throws IOException {
            Preconditions.checkState((this.reader != null && this.safeToAckIds != null ? 1 : 0) != 0, (Object)"Cannot finalize a restored checkpoint");
            try {
                int remainingInFlight;
                int n = this.safeToAckIds.size();
                ArrayList<String> batchSafeToAckIds = new ArrayList<String>(Math.min(n, 2000));
                for (String ackId : this.safeToAckIds) {
                    batchSafeToAckIds.add(ackId);
                    if (batchSafeToAckIds.size() < 2000) continue;
                    this.reader.ackBatch(batchSafeToAckIds);
                    batchSafeToAckIds = new ArrayList(Math.min(n -= batchSafeToAckIds.size(), 2000));
                }
                if (!batchSafeToAckIds.isEmpty()) {
                    this.reader.ackBatch(batchSafeToAckIds);
                }
                Preconditions.checkState(((remainingInFlight = this.reader.numInFlightCheckpoints.decrementAndGet()) >= 0 ? 1 : 0) != 0, (Object)"Miscounted in-flight checkpoints");
            }
            catch (Throwable throwable) {
                int remainingInFlight = this.reader.numInFlightCheckpoints.decrementAndGet();
                Preconditions.checkState((remainingInFlight >= 0 ? 1 : 0) != 0, (Object)"Miscounted in-flight checkpoints");
                this.reader.maybeCloseClient();
                this.reader = null;
                this.safeToAckIds = null;
                throw throwable;
            }
            this.reader.maybeCloseClient();
            this.reader = null;
            this.safeToAckIds = null;
        }

        private static long now(PubsubReader reader) {
            if (((PubsubReader)reader).outer.outer.clock == null) {
                return System.currentTimeMillis();
            }
            return ((PubsubReader)reader).outer.outer.clock.currentTimeMillis();
        }

        public void nackAll(PubsubReader reader) throws IOException {
            Preconditions.checkState((this.reader == null ? 1 : 0) != 0, (Object)"Cannot nackAll on persisting checkpoint");
            ArrayList<String> batchYetToAckIds = new ArrayList<String>(Math.min(this.notYetReadIds.size(), 2000));
            for (String ackId : this.notYetReadIds) {
                batchYetToAckIds.add(ackId);
                if (batchYetToAckIds.size() < 2000) continue;
                long nowMsSinceEpoch = PubsubCheckpoint.now(reader);
                reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
                batchYetToAckIds.clear();
            }
            if (!batchYetToAckIds.isEmpty()) {
                long nowMsSinceEpoch = PubsubCheckpoint.now(reader);
                reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
            }
        }
    }
}

