/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsub.v1.AckRequestData;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageDispatcher;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.ModackRequestData;
import com.google.cloud.pubsub.v1.OpenTelemetryPubsubTracer;
import com.google.cloud.pubsub.v1.PubsubMessageWrapper;
import com.google.cloud.pubsub.v1.StatusUtil;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriberShutdownSettings;
import com.google.cloud.pubsub.v1.Waiter;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriptionName;
import com.google.rpc.ErrorInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.opentelemetry.api.trace.Span;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

final class StreamingSubscriberConnection
extends AbstractApiService
implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100L);
    private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10L);
    private static final long INITIAL_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS = 100L;
    private static final long MAX_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS = Duration.ofSeconds(10L).toMillis();
    private static final int MAX_PER_REQUEST_CHANGES = 1000;
    private final String PERMANENT_FAILURE_INVALID_ACK_ID_METADATA = "PERMANENT_FAILURE_INVALID_ACK_ID";
    private final String TRANSIENT_FAILURE_METADATA_PREFIX = "TRANSIENT_";
    private Duration inititalStreamAckDeadline;
    private final Map<String, List<String>> streamMetadata;
    private final SubscriberStub subscriberStub;
    private final int channelAffinity;
    private final long protocolVersion;
    private final String subscription;
    private final SubscriptionName subscriptionNameObject;
    private final ScheduledExecutorService systemExecutor;
    private final ApiClock clock;
    private final MessageDispatcher messageDispatcher;
    private final FlowControlSettings flowControlSettings;
    private final boolean useLegacyFlowControl;
    private final Set<AckRequestData> pendingRequests = ConcurrentHashMap.newKeySet();
    private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
    private final Waiter ackOperationsWaiter = new Waiter();
    private final Lock lock = new ReentrantLock();
    private ClientStream<StreamingPullRequest> clientStream;
    private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false);
    private final String clientId = UUID.randomUUID().toString();
    private final boolean enableOpenTelemetryTracing;
    private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
    private final SubscriberShutdownSettings subscriberShutdownSettings;
    private final boolean enableKeepalive;
    private static final long KEEP_ALIVE_SUPPORT_VERSION = 1L;
    private static final Duration CLIENT_PING_INTERVAL = Duration.ofSeconds(30L);
    private ScheduledFuture<?> pingSchedulerHandle;
    private static final Duration SERVER_MONITOR_INTERVAL = Duration.ofSeconds(10L);
    private static final Duration SERVER_PING_TIMEOUT_DURATION = Duration.ofSeconds(15L);
    private final AtomicLong lastServerResponseTime;
    private final AtomicLong lastClientPingTime;
    private ScheduledFuture<?> serverMonitorHandle;

    private StreamingSubscriberConnection(Builder builder) {
        this.subscription = builder.subscription;
        this.subscriptionNameObject = SubscriptionName.parse((String)builder.subscription);
        this.systemExecutor = builder.systemExecutor;
        this.clock = builder.clock;
        this.inititalStreamAckDeadline = builder.maxDurationPerAckExtensionDefaultUsed ? Subscriber.STREAM_ACK_DEADLINE_DEFAULT : (builder.maxDurationPerAckExtension.compareTo(Subscriber.MIN_STREAM_ACK_DEADLINE) < 0 ? Subscriber.MIN_STREAM_ACK_DEADLINE : (builder.maxDurationPerAckExtension.compareTo(Subscriber.MAX_STREAM_ACK_DEADLINE) > 0 ? Subscriber.MAX_STREAM_ACK_DEADLINE : builder.maxDurationPerAckExtension));
        this.streamMetadata = ImmutableMap.of((Object)"x-goog-request-params", (Object)ImmutableList.of((Object)("subscription=" + this.subscription)));
        this.subscriberStub = builder.subscriberStub;
        this.channelAffinity = builder.channelAffinity;
        this.protocolVersion = builder.protocolVersion;
        MessageDispatcher.Builder messageDispatcherBuilder = builder.receiver != null ? MessageDispatcher.newBuilder(builder.receiver) : MessageDispatcher.newBuilder(builder.receiverWithAckResponse);
        this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
        if (builder.tracer != null) {
            this.tracer = builder.tracer;
        }
        this.subscriberShutdownSettings = builder.subscriberShutdownSettings;
        this.messageDispatcher = messageDispatcherBuilder.setAckProcessor(this).setAckExpirationPadding(builder.ackExpirationPadding).setMaxAckExtensionPeriod(builder.maxAckExtensionPeriod).setMinDurationPerAckExtension(builder.minDurationPerAckExtension).setMinDurationPerAckExtensionDefaultUsed(builder.minDurationPerAckExtensionDefaultUsed).setMaxDurationPerAckExtension(builder.maxDurationPerAckExtension).setMaxDurationPerAckExtensionDefaultUsed(builder.maxDurationPerAckExtensionDefaultUsed).setAckLatencyDistribution(builder.ackLatencyDistribution).setFlowController(builder.flowController).setExecutor(builder.executor).setSystemExecutor(builder.systemExecutor).setApiClock(builder.clock).setSubscriptionName(this.subscription).setEnableOpenTelemetryTracing(this.enableOpenTelemetryTracing).setTracer(this.tracer).setSubscriberShutdownSettings(this.subscriberShutdownSettings).build();
        this.flowControlSettings = builder.flowControlSettings;
        this.useLegacyFlowControl = builder.useLegacyFlowControl;
        this.enableKeepalive = this.protocolVersion >= 1L;
        this.lastServerResponseTime = new AtomicLong(this.clock.nanoTime());
        this.lastClientPingTime = new AtomicLong(-1L);
    }

    public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled(boolean isExactlyOnceDeliveryEnabled) {
        this.exactlyOnceDeliveryEnabled.set(isExactlyOnceDeliveryEnabled);
        return this;
    }

    public boolean getExactlyOnceDeliveryEnabled() {
        return this.exactlyOnceDeliveryEnabled.get();
    }

    protected void doStart() {
        logger.config("Starting subscriber.");
        this.messageDispatcher.start();
        this.initialize();
        this.notifyStarted();
    }

    protected void doStop() {
        this.lock.lock();
        try {
            this.clientStream.closeSendWithError((Throwable)Status.CANCELLED.asException());
        }
        finally {
            this.lock.unlock();
        }
        if (this.enableKeepalive) {
            this.stopClientPinger();
            this.stopServerMonitor();
        }
        this.runShutdown();
        this.notifyStopped();
    }

    private void runShutdown() {
        Duration timeout = this.subscriberShutdownSettings.getTimeout();
        if (timeout.isZero()) {
            return;
        }
        this.messageDispatcher.stop();
        if (timeout.isNegative()) {
            this.ackOperationsWaiter.waitComplete();
        } else {
            boolean completedWait = this.ackOperationsWaiter.tryWait(timeout.toMillis(), this.clock);
            if (!completedWait) {
                logger.log(Level.WARNING, "Timeout exceeded while waiting for ACK/NACK operations to complete.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initialize() {
        SettableApiFuture errorFuture = SettableApiFuture.create();
        StreamingPullResponseObserver responseObserver = new StreamingPullResponseObserver((SettableApiFuture<Void>)errorFuture);
        ClientStream initClientStream = this.subscriberStub.streamingPullCallable().splitCall((ResponseObserver)responseObserver, (ApiCallContext)GrpcCallContext.createDefault().withChannelAffinity(Integer.valueOf(this.channelAffinity)).withExtraHeaders(this.streamMetadata));
        logger.log(Level.FINER, "Initializing stream to subscription {0}", this.subscription);
        initClientStream.send((Object)StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(Math.toIntExact(this.inititalStreamAckDeadline.getSeconds())).setClientId(this.clientId).setMaxOutstandingMessages(this.useLegacyFlowControl ? 0L : this.valueOrZero(this.flowControlSettings.getMaxOutstandingElementCount())).setMaxOutstandingBytes(this.useLegacyFlowControl ? 0L : this.valueOrZero(this.flowControlSettings.getMaxOutstandingRequestBytes())).setProtocolVersion(this.protocolVersion).build());
        this.lock.lock();
        try {
            this.clientStream = initClientStream;
        }
        finally {
            this.lock.unlock();
        }
        if (this.enableKeepalive) {
            this.lastServerResponseTime.set(this.clock.nanoTime());
            this.lastClientPingTime.set(-1L);
            this.startClientPinger();
            this.startServerMonitor();
        }
        ApiFutures.addCallback((ApiFuture)errorFuture, (ApiFutureCallback)new ApiFutureCallback<Void>(){

            public void onSuccess(@Nullable Void result) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    return;
                }
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.initialize();
            }

            public void onFailure(Throwable cause) {
                if (StreamingSubscriberConnection.this.enableKeepalive) {
                    StreamingSubscriberConnection.this.stopClientPinger();
                    StreamingSubscriberConnection.this.stopServerMonitor();
                }
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    logger.log(Level.FINE, "pull failure after service no longer running", cause);
                    return;
                }
                if (!StatusUtil.isRetryable(cause)) {
                    ApiException gaxException = ApiExceptionFactory.createException((Throwable)cause, (StatusCode)GrpcStatusCode.of((Status.Code)Status.fromThrowable((Throwable)cause).getCode()), (boolean)false);
                    logger.log(Level.SEVERE, "terminated streaming with exception", (Throwable)gaxException);
                    StreamingSubscriberConnection.this.runShutdown();
                    StreamingSubscriberConnection.this.setFailureFutureOutstandingMessages(cause);
                    StreamingSubscriberConnection.this.notifyFailed((Throwable)gaxException);
                    return;
                }
                logger.log(Level.FINE, "stream closed with retryable exception; will reconnect", cause);
                long backoffMillis = StreamingSubscriberConnection.this.channelReconnectBackoffMillis.get();
                long newBackoffMillis = Math.min(backoffMillis * 2L, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(newBackoffMillis);
                StreamingSubscriberConnection.this.systemExecutor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        StreamingSubscriberConnection.this.initialize();
                    }
                }, backoffMillis, TimeUnit.MILLISECONDS);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private Long valueOrZero(Long value) {
        return value != null ? value : 0L;
    }

    private boolean isAlive() {
        ApiService.State state = this.state();
        return state == ApiService.State.RUNNING || state == ApiService.State.STARTING;
    }

    private void startClientPinger() {
        if (this.pingSchedulerHandle != null) {
            this.pingSchedulerHandle.cancel(false);
        }
        this.pingSchedulerHandle = this.systemExecutor.scheduleAtFixedRate(() -> {
            try {
                this.lock.lock();
                try {
                    if (this.clientStream != null && this.isAlive()) {
                        this.clientStream.send((Object)StreamingPullRequest.newBuilder().build());
                        this.lastClientPingTime.set(this.clock.nanoTime());
                        logger.log(Level.FINEST, "Sent client keepalive ping");
                    }
                }
                finally {
                    this.lock.unlock();
                }
            }
            catch (Exception e) {
                logger.log(Level.FINE, "Error sending client keepalive ping", e);
            }
        }, 0L, CLIENT_PING_INTERVAL.getSeconds(), TimeUnit.SECONDS);
    }

    private void stopClientPinger() {
        if (this.pingSchedulerHandle != null) {
            this.pingSchedulerHandle.cancel(false);
            this.pingSchedulerHandle = null;
        }
    }

    private void startServerMonitor() {
        if (this.serverMonitorHandle != null) {
            this.serverMonitorHandle.cancel(false);
        }
        this.serverMonitorHandle = this.systemExecutor.scheduleAtFixedRate(() -> {
            try {
                if (!this.isAlive()) {
                    return;
                }
                long now = this.clock.nanoTime();
                long lastResponse = this.lastServerResponseTime.get();
                long lastPing = this.lastClientPingTime.get();
                if (lastPing <= lastResponse) {
                    return;
                }
                Duration elapsedSincePing = Duration.ofNanos(now - lastPing);
                if (elapsedSincePing.compareTo(SERVER_PING_TIMEOUT_DURATION) < 0) {
                    return;
                }
                logger.log(Level.WARNING, "No response from server for {0} seconds since last ping. Closing stream.", elapsedSincePing.getSeconds());
                this.lock.lock();
                try {
                    if (this.clientStream != null) {
                        this.clientStream.closeSendWithError((Throwable)Status.UNAVAILABLE.withDescription("Keepalive timeout with server").asException());
                    }
                }
                finally {
                    this.lock.unlock();
                }
                this.stopServerMonitor();
            }
            catch (Exception e) {
                logger.log(Level.FINE, "Error in server keepalive monitor", e);
            }
        }, SERVER_MONITOR_INTERVAL.getSeconds(), SERVER_MONITOR_INTERVAL.getSeconds(), TimeUnit.SECONDS);
    }

    private void stopServerMonitor() {
        if (this.serverMonitorHandle != null) {
            this.serverMonitorHandle.cancel(false);
            this.serverMonitorHandle = null;
        }
    }

    public void setResponseOutstandingMessages(AckResponse ackResponse) {
        logger.log(Level.WARNING, "Setting response: {0} on outstanding messages", ackResponse.toString());
        for (AckRequestData ackRequestData : this.pendingRequests) {
            ackRequestData.setResponse(ackResponse, false);
        }
        this.pendingRequests.clear();
    }

    private void setFailureFutureOutstandingMessages(Throwable t) {
        AckResponse ackResponse = this.getExactlyOnceDeliveryEnabled() ? StatusUtil.getFailedAckResponse(t) : AckResponse.SUCCESSFUL;
        this.setResponseOutstandingMessages(ackResponse);
    }

    @Override
    public void sendAckOperations(List<AckRequestData> ackRequestDataList) {
        this.sendAckOperations(ackRequestDataList, 100L);
    }

    @Override
    public void sendModackOperations(List<ModackRequestData> modackRequestDataList) {
        this.sendModackOperations(modackRequestDataList, 100L);
    }

    private void sendAckOperations(List<AckRequestData> ackRequestDataList, long currentBackoffMillis) {
        int pendingOperations = 0;
        for (List ackRequestDataInRequestList : Lists.partition(ackRequestDataList, (int)1000)) {
            ArrayList<String> ackIdsInRequest = new ArrayList<String>();
            ArrayList<PubsubMessageWrapper> messagesInRequest = new ArrayList<PubsubMessageWrapper>();
            for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
                ackIdsInRequest.add(ackRequestData.getAckId());
                messagesInRequest.add(ackRequestData.getMessageWrapper());
                if (!ackRequestData.hasMessageFuture()) continue;
                this.pendingRequests.add(ackRequestData);
            }
            Span rpcSpan = this.tracer.startSubscribeRpcSpan(this.subscriptionNameObject, "ack", messagesInRequest, 0, false);
            ApiFutureCallback<Empty> callback = this.getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan);
            ApiFuture ackFuture = this.subscriberStub.acknowledgeCallable().futureCall((Object)AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds(ackIdsInRequest).build());
            ApiFutures.addCallback((ApiFuture)ackFuture, callback, (Executor)this.getCallbackExecutor());
            ++pendingOperations;
        }
        this.ackOperationsWaiter.incrementPendingCount(pendingOperations);
    }

    private void sendModackOperations(List<ModackRequestData> modackRequestDataList, long currentBackoffMillis) {
        int pendingOperations = 0;
        for (ModackRequestData modackRequestData : modackRequestDataList) {
            for (List ackRequestDataInRequestList : Lists.partition(modackRequestData.getAckRequestData(), (int)1000)) {
                ArrayList<String> ackIdsInRequest = new ArrayList<String>();
                ArrayList<PubsubMessageWrapper> messagesInRequest = new ArrayList<PubsubMessageWrapper>();
                for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
                    ackIdsInRequest.add(ackRequestData.getAckId());
                    messagesInRequest.add(ackRequestData.getMessageWrapper());
                    if (!ackRequestData.hasMessageFuture()) continue;
                    this.pendingRequests.add(ackRequestData);
                }
                int deadlineExtensionSeconds = modackRequestData.getDeadlineExtensionSeconds();
                String rpcOperation = deadlineExtensionSeconds == 0 ? "nack" : "modack";
                Span rpcSpan = this.tracer.startSubscribeRpcSpan(this.subscriptionNameObject, rpcOperation, messagesInRequest, deadlineExtensionSeconds, modackRequestData.getIsReceiptModack());
                ApiFutureCallback<Empty> callback = this.getCallback(ackRequestDataInRequestList, deadlineExtensionSeconds, true, currentBackoffMillis, rpcSpan);
                ApiFuture modackFuture = this.subscriberStub.modifyAckDeadlineCallable().futureCall((Object)ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription).addAllAckIds(ackIdsInRequest).setAckDeadlineSeconds(modackRequestData.getDeadlineExtensionSeconds()).build());
                ApiFutures.addCallback((ApiFuture)modackFuture, callback, (Executor)this.getCallbackExecutor());
                ++pendingOperations;
            }
        }
        this.ackOperationsWaiter.incrementPendingCount(pendingOperations);
    }

    private Map<String, String> getMetadataMapFromThrowable(Throwable t) throws InvalidProtocolBufferException {
        com.google.rpc.Status status = StatusProto.fromThrowable((Throwable)t);
        Map<String, String> metadataMap = new HashMap<String, String>();
        if (status != null) {
            for (Any any : status.getDetailsList()) {
                if (!any.is(ErrorInfo.class)) continue;
                ErrorInfo errorInfo = (ErrorInfo)any.unpack(ErrorInfo.class);
                metadataMap = errorInfo.getMetadataMap();
            }
        }
        return metadataMap;
    }

    private ApiFutureCallback<Empty> getCallback(final List<AckRequestData> ackRequestDataList, final int deadlineExtensionSeconds, final boolean isModack, final long currentBackoffMillis, final Span rpcSpan) {
        final boolean setResponseOnSuccess = !isModack || deadlineExtensionSeconds == 0;
        final boolean rpcSpanSampled = rpcSpan == null ? false : rpcSpan.getSpanContext().isSampled();
        return new ApiFutureCallback<Empty>(){

            public void onSuccess(Empty empty) {
                StreamingSubscriberConnection.this.ackOperationsWaiter.incrementPendingCount(-1);
                StreamingSubscriberConnection.this.tracer.endSubscribeRpcSpan(rpcSpan);
                for (AckRequestData ackRequestData : ackRequestDataList) {
                    if (setResponseOnSuccess && StreamingSubscriberConnection.this.getExactlyOnceDeliveryEnabled() && StreamingSubscriberConnection.this.messageDispatcher.getNackImmediatelyShutdownInProgress()) {
                        ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
                        StreamingSubscriberConnection.this.messageDispatcher.notifyAckFailed(ackRequestData);
                    } else {
                        ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
                        StreamingSubscriberConnection.this.messageDispatcher.notifyAckSuccess(ackRequestData);
                    }
                    StreamingSubscriberConnection.this.pendingRequests.remove(ackRequestData);
                    StreamingSubscriberConnection.this.tracer.addEndRpcEvent(ackRequestData.getMessageWrapper(), rpcSpanSampled, isModack, deadlineExtensionSeconds);
                    if (isModack && deadlineExtensionSeconds != 0) continue;
                    StreamingSubscriberConnection.this.tracer.endSubscriberSpan(ackRequestData.getMessageWrapper());
                }
            }

            public void onFailure(Throwable t) {
                StreamingSubscriberConnection.this.ackOperationsWaiter.incrementPendingCount(-1);
                Level level = StreamingSubscriberConnection.this.isAlive() ? Level.WARNING : Level.FINER;
                logger.log(level, "failed to send operations", t);
                StreamingSubscriberConnection.this.tracer.setSubscribeRpcSpanException(rpcSpan, isModack, deadlineExtensionSeconds, t);
                if (!StreamingSubscriberConnection.this.getExactlyOnceDeliveryEnabled()) {
                    if (StreamingSubscriberConnection.this.enableOpenTelemetryTracing) {
                        for (AckRequestData ackRequestData2 : ackRequestDataList) {
                            StreamingSubscriberConnection.this.tracer.addEndRpcEvent(ackRequestData2.getMessageWrapper(), rpcSpanSampled, isModack, deadlineExtensionSeconds);
                            if (isModack && deadlineExtensionSeconds != 0) continue;
                            StreamingSubscriberConnection.this.tracer.endSubscriberSpan(ackRequestData2.getMessageWrapper());
                        }
                    }
                    return;
                }
                final ArrayList ackRequestDataArrayRetryList = new ArrayList();
                try {
                    Map metadataMap = StreamingSubscriberConnection.this.getMetadataMapFromThrowable(t);
                    if (metadataMap.isEmpty()) {
                        String operation;
                        String string = operation = isModack ? "ModifyAckDeadline" : "Acknowledge";
                        if (!StatusUtil.isRetryable(t)) {
                            logger.log(Level.WARNING, "Un-retryable error on " + operation, t);
                            ackRequestDataList.forEach(ackRequestData -> {
                                AckResponse failedAckResponse = StatusUtil.getFailedAckResponse(t);
                                ackRequestData.setResponse(failedAckResponse, setResponseOnSuccess);
                                StreamingSubscriberConnection.this.messageDispatcher.notifyAckFailed((AckRequestData)ackRequestData);
                                StreamingSubscriberConnection.this.tracer.addEndRpcEvent(ackRequestData.getMessageWrapper(), rpcSpanSampled, isModack, deadlineExtensionSeconds);
                                StreamingSubscriberConnection.this.tracer.setSubscriberSpanException(ackRequestData.getMessageWrapper(), t, "Error with no metadata map");
                                ackRequestData.getMessageWrapper().setSubscriberSpanException(t, "Error with no metadata map");
                                StreamingSubscriberConnection.this.pendingRequests.remove(ackRequestData);
                            });
                        } else {
                            logger.log(Level.INFO, "Retryable error on " + operation + ", will resend", t);
                            ackRequestDataArrayRetryList.addAll(ackRequestDataList);
                            ackRequestDataList.forEach(ackRequestData -> StreamingSubscriberConnection.this.pendingRequests.remove(ackRequestData));
                        }
                    } else {
                        ackRequestDataList.forEach(ackRequestData -> {
                            String ackId = ackRequestData.getAckId();
                            if (metadataMap.containsKey(ackId)) {
                                String errorMessage = (String)metadataMap.get(ackId);
                                if (errorMessage.startsWith("TRANSIENT_")) {
                                    logger.log(Level.INFO, "Transient error message, will resend", errorMessage);
                                    ackRequestDataArrayRetryList.add(ackRequestData);
                                } else if (errorMessage.equals("PERMANENT_FAILURE_INVALID_ACK_ID")) {
                                    logger.log(Level.INFO, "Permanent error invalid ack id message, will not resend", errorMessage);
                                    ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess);
                                    StreamingSubscriberConnection.this.messageDispatcher.notifyAckFailed((AckRequestData)ackRequestData);
                                    StreamingSubscriberConnection.this.tracer.addEndRpcEvent(ackRequestData.getMessageWrapper(), rpcSpanSampled, isModack, deadlineExtensionSeconds);
                                    StreamingSubscriberConnection.this.tracer.setSubscriberSpanException(ackRequestData.getMessageWrapper(), t, "Invalid ack ID");
                                } else {
                                    logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage);
                                    ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
                                    StreamingSubscriberConnection.this.messageDispatcher.notifyAckFailed((AckRequestData)ackRequestData);
                                    StreamingSubscriberConnection.this.tracer.addEndRpcEvent(ackRequestData.getMessageWrapper(), rpcSpanSampled, isModack, deadlineExtensionSeconds);
                                    StreamingSubscriberConnection.this.tracer.setSubscriberSpanException(ackRequestData.getMessageWrapper(), t, "Unknown error message");
                                    ackRequestData.getMessageWrapper().setSubscriberSpanException(t, "Unknown error message");
                                }
                            } else {
                                ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
                                StreamingSubscriberConnection.this.messageDispatcher.notifyAckSuccess((AckRequestData)ackRequestData);
                                StreamingSubscriberConnection.this.tracer.endSubscriberSpan(ackRequestData.getMessageWrapper());
                                StreamingSubscriberConnection.this.tracer.addEndRpcEvent(ackRequestData.getMessageWrapper(), rpcSpanSampled, isModack, deadlineExtensionSeconds);
                            }
                            StreamingSubscriberConnection.this.pendingRequests.remove(ackRequestData);
                        });
                    }
                }
                catch (InvalidProtocolBufferException e) {
                    logger.log(Level.WARNING, "Exception occurred when parsing throwable {0} for errorInfo", t);
                    ackRequestDataArrayRetryList.addAll(ackRequestDataList);
                }
                if (!ackRequestDataArrayRetryList.isEmpty()) {
                    final long newBackoffMillis = Math.min(currentBackoffMillis * 2L, MAX_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS);
                    StreamingSubscriberConnection.this.systemExecutor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            if (isModack) {
                                ModackRequestData modackRequestData = new ModackRequestData(deadlineExtensionSeconds, ackRequestDataArrayRetryList);
                                StreamingSubscriberConnection.this.sendModackOperations(Collections.singletonList(modackRequestData), newBackoffMillis);
                            } else {
                                StreamingSubscriberConnection.this.sendAckOperations(ackRequestDataArrayRetryList, newBackoffMillis);
                            }
                        }
                    }, currentBackoffMillis, TimeUnit.MILLISECONDS);
                }
            }
        };
    }

    private Executor getCallbackExecutor() {
        if (!this.getExactlyOnceDeliveryEnabled()) {
            return MoreExecutors.directExecutor();
        }
        return this.systemExecutor;
    }

    public static Builder newBuilder(MessageReceiver receiver) {
        return new Builder(receiver);
    }

    public static Builder newBuilder(MessageReceiverWithAckResponse receiverWithAckResponse) {
        return new Builder(receiverWithAckResponse);
    }

    public static final class Builder {
        private MessageReceiver receiver;
        private MessageReceiverWithAckResponse receiverWithAckResponse;
        private String subscription;
        private Duration ackExpirationPadding;
        private Duration maxAckExtensionPeriod;
        private Duration minDurationPerAckExtension;
        private boolean minDurationPerAckExtensionDefaultUsed;
        private Duration maxDurationPerAckExtension;
        private boolean maxDurationPerAckExtensionDefaultUsed;
        private Distribution ackLatencyDistribution;
        private SubscriberStub subscriberStub;
        private int channelAffinity;
        private long protocolVersion;
        private FlowController flowController;
        private FlowControlSettings flowControlSettings;
        private boolean useLegacyFlowControl;
        private ScheduledExecutorService executor;
        private ScheduledExecutorService systemExecutor;
        private ApiClock clock;
        private boolean enableOpenTelemetryTracing;
        private OpenTelemetryPubsubTracer tracer;
        private SubscriberShutdownSettings subscriberShutdownSettings;

        protected Builder(MessageReceiver receiver) {
            this.receiver = receiver;
        }

        protected Builder(MessageReceiverWithAckResponse receiverWithAckResponse) {
            this.receiverWithAckResponse = receiverWithAckResponse;
        }

        public Builder setSubscription(String subscription) {
            this.subscription = subscription;
            return this;
        }

        public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
            this.ackExpirationPadding = ackExpirationPadding;
            return this;
        }

        public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
            this.maxAckExtensionPeriod = maxAckExtensionPeriod;
            return this;
        }

        public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) {
            this.minDurationPerAckExtension = minDurationPerAckExtension;
            return this;
        }

        public Builder setMinDurationPerAckExtensionDefaultUsed(boolean minDurationPerAckExtensionDefaultUsed) {
            this.minDurationPerAckExtensionDefaultUsed = minDurationPerAckExtensionDefaultUsed;
            return this;
        }

        public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
            this.maxDurationPerAckExtension = maxDurationPerAckExtension;
            return this;
        }

        public Builder setMaxDurationPerAckExtensionDefaultUsed(boolean maxDurationPerAckExtensionDefaultUsed) {
            this.maxDurationPerAckExtensionDefaultUsed = maxDurationPerAckExtensionDefaultUsed;
            return this;
        }

        public Builder setAckLatencyDistribution(Distribution ackLatencyDistribution) {
            this.ackLatencyDistribution = ackLatencyDistribution;
            return this;
        }

        public Builder setSubscriberStub(SubscriberStub subscriberStub) {
            this.subscriberStub = subscriberStub;
            return this;
        }

        public Builder setChannelAffinity(int channelAffinity) {
            this.channelAffinity = channelAffinity;
            return this;
        }

        public Builder setProtocolVersion(long protocolVersion) {
            this.protocolVersion = protocolVersion;
            return this;
        }

        public Builder setFlowController(FlowController flowController) {
            this.flowController = flowController;
            return this;
        }

        public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
            this.flowControlSettings = flowControlSettings;
            return this;
        }

        public Builder setUseLegacyFlowControl(boolean useLegacyFlowControl) {
            this.useLegacyFlowControl = useLegacyFlowControl;
            return this;
        }

        public Builder setExecutor(ScheduledExecutorService executor) {
            this.executor = executor;
            return this;
        }

        public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) {
            this.systemExecutor = systemExecutor;
            return this;
        }

        public Builder setClock(ApiClock clock) {
            this.clock = clock;
            return this;
        }

        public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
            this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
            return this;
        }

        public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
            this.tracer = tracer;
            return this;
        }

        public Builder setSubscriberShutdownSettings(SubscriberShutdownSettings subscriberShutdownSettings) {
            this.subscriberShutdownSettings = subscriberShutdownSettings;
            return this;
        }

        public StreamingSubscriberConnection build() {
            return new StreamingSubscriberConnection(this);
        }
    }

    private class StreamingPullResponseObserver
    implements ResponseObserver<StreamingPullResponse> {
        final SettableApiFuture<Void> errorFuture;
        StreamController thisController;

        StreamingPullResponseObserver(SettableApiFuture<Void> errorFuture) {
            this.errorFuture = errorFuture;
        }

        public void onStart(StreamController controller) {
            this.thisController = controller;
            this.thisController.disableAutoInboundFlowControl();
            this.thisController.request(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onResponse(StreamingPullResponse response) {
            if (StreamingSubscriberConnection.this.enableKeepalive) {
                StreamingSubscriberConnection.this.lastServerResponseTime.set(StreamingSubscriberConnection.this.clock.nanoTime());
            }
            StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
            boolean exactlyOnceDeliveryEnabledResponse = response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled();
            boolean messageOrderingEnabledResponse = response.getSubscriptionProperties().getMessageOrderingEnabled();
            StreamingSubscriberConnection.this.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
            StreamingSubscriberConnection.this.messageDispatcher.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
            StreamingSubscriberConnection.this.messageDispatcher.setMessageOrderingEnabled(messageOrderingEnabledResponse);
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
            if (StreamingSubscriberConnection.this.isAlive() && !this.errorFuture.isDone()) {
                StreamingSubscriberConnection.this.lock.lock();
                try {
                    this.thisController.request(1);
                }
                catch (Exception e) {
                    logger.log(Level.WARNING, "cannot request more messages", e);
                }
                finally {
                    StreamingSubscriberConnection.this.lock.unlock();
                }
            }
        }

        public void onError(Throwable t) {
            if (StreamingSubscriberConnection.this.enableKeepalive) {
                StreamingSubscriberConnection.this.stopClientPinger();
                StreamingSubscriberConnection.this.stopServerMonitor();
            }
            this.errorFuture.setException(t);
        }

        public void onComplete() {
            if (StreamingSubscriberConnection.this.enableKeepalive) {
                StreamingSubscriberConnection.this.stopClientPinger();
                StreamingSubscriberConnection.this.stopServerMonitor();
            }
            logger.fine("Streaming pull terminated successfully!");
            this.errorFuture.set(null);
        }
    }
}

