/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.pubsublite.PullSubscriber;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.checkerframework.checker.nullness.qual.Nullable;

class BufferingPullSubscriber
implements PullSubscriber {
    private final Subscriber underlying;
    private final AtomicReference<StatusException> error = new AtomicReference();
    private final LinkedBlockingQueue<SequencedMessage> messages = new LinkedBlockingQueue();

    BufferingPullSubscriber(SubscriberFactory factory, FlowControlSettings settings) throws StatusException {
        this.underlying = factory.New(newMessages -> this.messages.addAll(newMessages.stream().map(message -> message.toProto()).collect(Collectors.toList())));
        this.underlying.addListener(new ApiService.Listener(){

            public void failed(ApiService.State state, Throwable throwable) {
                BufferingPullSubscriber.this.error.set(ExtractStatus.toCanonical((Throwable)throwable));
            }
        }, MoreExecutors.directExecutor());
        this.underlying.startAsync().awaitRunning();
        this.underlying.allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(settings.messagesOutstanding()).setAllowedBytes(settings.bytesOutstanding()).build());
    }

    BufferingPullSubscriber(SubscriberFactory factory, FlowControlSettings settings, Offset initialLocation) throws StatusException {
        this(factory, settings);
        try {
            this.underlying.seek(SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(initialLocation.value())).build()).get();
        }
        catch (InterruptedException e) {
            throw ExtractStatus.toCanonical((Throwable)e);
        }
        catch (ExecutionException e) {
            throw ExtractStatus.toCanonical((Throwable)e.getCause());
        }
    }

    @Override
    public List<SequencedMessage> pull() throws StatusException {
        @Nullable StatusException maybeError = this.error.get();
        if (maybeError != null) {
            throw maybeError;
        }
        ArrayList collection = new ArrayList();
        this.messages.drainTo(collection);
        long bytes = collection.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
        this.underlying.allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(bytes).setAllowedMessages((long)collection.size()).build());
        return ImmutableList.copyOf(collection);
    }

    @Override
    public void close() {
        this.underlying.stopAsync().awaitTerminated();
    }
}

