/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.Predicates;
import com.google.cloud.pubsublite.internal.wire.SingleConnection;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.MessageResponse;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekResponse;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.List;
import java.util.stream.Collectors;

class ConnectedSubscriberImpl
extends SingleConnection<SubscribeRequest, SubscribeResponse, ConnectedSubscriber.Response>
implements ConnectedSubscriber {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private final SubscribeRequest initialRequest;
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private boolean seekInFlight = false;

    private ConnectedSubscriberImpl(StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory, ResponseObserver<ConnectedSubscriber.Response> clientStream, SubscribeRequest initialRequest) {
        super(streamFactory, clientStream);
        this.initialRequest = initialRequest;
        this.initialize(initialRequest);
    }

    @Override
    public void seek(SeekRequest request) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            CheckedApiPreconditions.checkArgument(Predicates.isValidSeekRequest(request));
            this.seekLockHeld(request);
        }
        catch (CheckedApiException e) {
            this.setError(e);
        }
    }

    @Override
    public void allowFlow(FlowControlRequest request) {
        Preconditions.checkArgument((request.getAllowedBytes() >= 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((request.getAllowedMessages() >= 0L ? 1 : 0) != 0);
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.sendToStream(SubscribeRequest.newBuilder().setFlowControl(request).build());
        }
    }

    @GuardedBy(value="monitor.monitor")
    private void seekLockHeld(SeekRequest request) throws CheckedApiException {
        CheckedApiPreconditions.checkState(!this.seekInFlight, String.format("Sent second seek while seek in flight on stream with initial request %s.", this.initialRequest));
        this.seekInFlight = true;
        this.sendToStream(SubscribeRequest.newBuilder().setSeek(request).build());
    }

    @Override
    protected void handleInitialResponse(SubscribeResponse response) throws CheckedApiException {
        CheckedApiPreconditions.checkState(response.hasInitial(), String.format("Received non-initial first response %s on stream with initial request %s.", response, this.initialRequest));
    }

    @Override
    protected void handleStreamResponse(SubscribeResponse response) throws CheckedApiException {
        switch (response.getResponseCase()) {
            case INITIAL: {
                throw new CheckedApiException(String.format("Received duplicate initial response on stream with initial request %s.", this.initialRequest), StatusCode.Code.FAILED_PRECONDITION);
            }
            case MESSAGES: {
                this.onMessages(response.getMessages());
                return;
            }
            case SEEK: {
                this.onSeekResponse(response.getSeek());
                return;
            }
        }
        throw new CheckedApiException("Received a message on the stream with no case set.", StatusCode.Code.FAILED_PRECONDITION);
    }

    private void onMessages(MessageResponse response) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.seekInFlight) {
                ((GoogleLogger.Api)log.atInfo()).log("Dropping %s messages due to outstanding seek on stream with initial request %s.", response.getMessagesCount(), (Object)this.initialRequest);
                return;
            }
        }
        CheckedApiPreconditions.checkState(response.getMessagesCount() > 0, String.format("Received an empty PullResponse on stream with initial request %s.", this.initialRequest));
        List<SequencedMessage> messages = response.getMessagesList().stream().map(SequencedMessage::fromProto).collect(Collectors.toList());
        CheckedApiPreconditions.checkState(Predicates.isOrdered(messages), String.format("Received out of order messages on the stream with initial request %s.", this.initialRequest));
        this.sendToClient(ConnectedSubscriber.Response.ofMessages(messages));
    }

    private void onSeekResponse(SeekResponse response) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            CheckedApiPreconditions.checkState(this.seekInFlight, String.format("Received a SeekResponse when no seeks were in flight on stream with initial request %s.", this.initialRequest));
            this.seekInFlight = false;
        }
        this.sendToClient(ConnectedSubscriber.Response.ofSeekOffset(Offset.of(response.getCursor().getOffset())));
    }

    static class Factory
    implements ConnectedSubscriberFactory {
        Factory() {
        }

        @Override
        public ConnectedSubscriberImpl New(StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory, ResponseObserver<ConnectedSubscriber.Response> clientStream, SubscribeRequest initialRequest) {
            return new ConnectedSubscriberImpl(streamFactory, clientStream, initialRequest);
        }
    }
}

