/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.Optional;

public class BlockingPullSubscriberImpl
implements BlockingPullSubscriber {
    private final Subscriber underlying;
    @GuardedBy(value="this")
    private Optional<CheckedApiException> error = Optional.empty();
    @GuardedBy(value="this")
    private Deque<SequencedMessage> messages = new ArrayDeque<SequencedMessage>();
    @GuardedBy(value="this")
    private Optional<SettableApiFuture<Void>> notification = Optional.empty();

    public BlockingPullSubscriberImpl(SubscriberFactory factory, FlowControlSettings settings) throws CheckedApiException {
        this.underlying = factory.newSubscriber(this::addMessages);
        this.underlying.addListener(new ApiService.Listener(){

            public void failed(ApiService.State state, Throwable throwable) {
                BlockingPullSubscriberImpl.this.fail(ExtractStatus.toCanonical(throwable));
            }
        }, SystemExecutors.getFuturesExecutor());
        this.underlying.startAsync().awaitRunning();
        this.underlying.allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(settings.messagesOutstanding()).setAllowedBytes(settings.bytesOutstanding()).build());
    }

    private synchronized void fail(CheckedApiException e) {
        this.error = Optional.of(e);
        if (this.notification.isPresent()) {
            this.notification.get().setException((Throwable)e);
            this.notification = Optional.empty();
        }
    }

    private synchronized void addMessages(Collection<SequencedMessage> new_messages) {
        this.messages.addAll(new_messages);
        if (this.notification.isPresent()) {
            this.notification.get().set(null);
            this.notification = Optional.empty();
        }
    }

    @Override
    public synchronized ApiFuture<Void> onData() {
        if (this.error.isPresent()) {
            return ApiFutures.immediateFailedFuture((Throwable)this.error.get());
        }
        if (!this.messages.isEmpty()) {
            return ApiFutures.immediateFuture(null);
        }
        if (!this.notification.isPresent()) {
            this.notification = Optional.of(SettableApiFuture.create());
        }
        return (ApiFuture)this.notification.get();
    }

    @Override
    public synchronized Optional<SequencedMessage> messageIfAvailable() throws CheckedApiException {
        if (this.error.isPresent()) {
            throw this.error.get();
        }
        if (this.messages.isEmpty()) {
            return Optional.empty();
        }
        SequencedMessage msg = this.messages.remove();
        this.underlying.allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(1L).setAllowedBytes(msg.getSizeBytes()).build());
        return Optional.of(msg);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        BlockingPullSubscriberImpl blockingPullSubscriberImpl = this;
        synchronized (blockingPullSubscriberImpl) {
            if (!this.error.isPresent()) {
                this.error = Optional.of(new CheckedApiException("Subscriber client shut down", StatusCode.Code.UNAVAILABLE));
            }
        }
        ApiServiceUtils.blockingShutdown(this.underlying);
    }
}

