/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.NackHandler;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTracker;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.Executor;

public class SinglePartitionSubscriber
extends ProxyService
implements Subscriber {
    private final MessageReceiver receiver;
    private final MessageTransformer<SequencedMessage, PubsubMessage> transformer;
    private final AckSetTracker ackSetTracker;
    private final NackHandler nackHandler;
    private final FlowControlSettings flowControlSettings;
    private final com.google.cloud.pubsublite.internal.wire.Subscriber wireSubscriber;

    public SinglePartitionSubscriber(MessageReceiver receiver, MessageTransformer<SequencedMessage, PubsubMessage> transformer, AckSetTracker ackSetTracker, NackHandler nackHandler, SubscriberFactory wireSubscriberFactory, FlowControlSettings flowControlSettings) throws StatusException {
        this.receiver = receiver;
        this.transformer = transformer;
        this.ackSetTracker = ackSetTracker;
        this.nackHandler = nackHandler;
        this.flowControlSettings = flowControlSettings;
        this.wireSubscriber = wireSubscriberFactory.New(this::onMessages);
        this.addServices(ackSetTracker, this.wireSubscriber);
    }

    @Override
    protected void handlePermanentError(StatusException error) {
    }

    @Override
    protected void start() {
        this.wireSubscriber.allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(this.flowControlSettings.messagesOutstanding()).setAllowedBytes(this.flowControlSettings.bytesOutstanding()).build());
    }

    @Override
    protected void stop() {
    }

    @VisibleForTesting
    void onMessages(ImmutableList<SequencedMessage> sequencedMessages) {
        try {
            for (SequencedMessage message : sequencedMessages) {
                final PubsubMessage userMessage = this.transformer.transform(message);
                final long bytes = message.byteSize();
                final Runnable trackerConsumer = this.ackSetTracker.track(message);
                AckReplyConsumer clientConsumer = new AckReplyConsumer(){

                    public void ack() {
                        trackerConsumer.run();
                        SinglePartitionSubscriber.this.wireSubscriber.allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(1L).setAllowedBytes(bytes).build());
                    }

                    public void nack() {
                        ApiFuture<Void> nackDone = SinglePartitionSubscriber.this.nackHandler.nack(userMessage);
                        ApiFutures.addCallback(nackDone, (ApiFutureCallback)new ApiFutureCallback<Void>(){

                            public void onFailure(Throwable t) {
                                SinglePartitionSubscriber.this.onPermanentError(ExtractStatus.toCanonical(t));
                            }

                            public void onSuccess(Void result) {
                                this.ack();
                            }
                        }, (Executor)MoreExecutors.directExecutor());
                    }
                };
                this.receiver.receiveMessage(userMessage, clientConsumer);
            }
        }
        catch (Throwable t) {
            this.onPermanentError(ExtractStatus.toCanonical(t));
        }
    }
}

