/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.Predicates;
import com.google.cloud.pubsublite.internal.wire.SingleConnection;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.MessageResponse;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekResponse;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.stream.Collectors;

class ConnectedSubscriberImpl
extends SingleConnection<SubscribeRequest, SubscribeResponse, ConnectedSubscriber.Response>
implements ConnectedSubscriber {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private final SubscribeRequest initialRequest;
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private boolean seekInFlight = false;

    private ConnectedSubscriberImpl(StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory, StreamObserver<ConnectedSubscriber.Response> clientStream, SubscribeRequest initialRequest) {
        super(streamFactory, clientStream);
        this.initialRequest = initialRequest;
        this.initialize(initialRequest);
    }

    @Override
    public void seek(SeekRequest request) {
        Status seekStatus;
        Preconditions.checkArgument((boolean)Predicates.isValidSeekRequest(request));
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            seekStatus = this.seekLockHeld(request);
        }
        if (!seekStatus.isOk()) {
            this.setError(seekStatus);
        }
    }

    @Override
    public void allowFlow(FlowControlRequest request) {
        Preconditions.checkArgument((request.getAllowedBytes() >= 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((request.getAllowedMessages() >= 0L ? 1 : 0) != 0);
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.sendToStream(SubscribeRequest.newBuilder().setFlowControl(request).build());
        }
    }

    @GuardedBy(value="monitor.monitor")
    private Status seekLockHeld(SeekRequest request) {
        if (this.seekInFlight) {
            return Status.FAILED_PRECONDITION.withDescription(String.format("Sent second seek while seek in flight on stream with initial request %s.", this.initialRequest));
        }
        this.seekInFlight = true;
        this.sendToStream(SubscribeRequest.newBuilder().setSeek(request).build());
        return Status.OK;
    }

    @Override
    protected Status handleInitialResponse(SubscribeResponse response) {
        if (!response.hasInitial()) {
            return Status.FAILED_PRECONDITION.withDescription(String.format("Received non-initial first response %s on stream with initial request %s.", response, this.initialRequest));
        }
        return Status.OK;
    }

    @Override
    protected Status handleStreamResponse(SubscribeResponse response) {
        switch (response.getResponseCase()) {
            case INITIAL: {
                return Status.FAILED_PRECONDITION.withDescription(String.format("Received duplicate initial response on stream with initial request %s.", this.initialRequest));
            }
            case MESSAGES: {
                return this.onMessages(response.getMessages());
            }
            case SEEK: {
                return this.onSeekResponse(response.getSeek());
            }
        }
        return Status.FAILED_PRECONDITION.withDescription("Received a message on the stream with no case set.");
    }

    private Status onMessages(MessageResponse response) {
        List<SequencedMessage> messages;
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.seekInFlight) {
                ((GoogleLogger.Api)log.atInfo()).log("Dropping %s messages due to outstanding seek on stream with initial request %s.", response.getMessagesCount(), (Object)this.initialRequest);
                Status status = Status.OK;
                return status;
            }
            if (response.getMessagesCount() == 0) {
                Status status = Status.FAILED_PRECONDITION.withDescription(String.format("Received an empty PullResponse on stream with initial request %s.", this.initialRequest));
                return status;
            }
            messages = response.getMessagesList().stream().map(SequencedMessage::fromProto).collect(Collectors.toList());
            if (!Predicates.isOrdered(messages)) {
                Status status = Status.FAILED_PRECONDITION.withDescription(String.format("Received out of order messages on the stream with initial request %s.", this.initialRequest));
                return status;
            }
        }
        this.sendToClient(ConnectedSubscriber.Response.ofMessages(messages));
        return Status.OK;
    }

    private Status onSeekResponse(SeekResponse response) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (!this.seekInFlight) {
                Status status = Status.FAILED_PRECONDITION.withDescription(String.format("Received a SeekResponse when no seeks were in flight on stream with initial request %s.", this.initialRequest));
                return status;
            }
            this.seekInFlight = false;
        }
        this.sendToClient(ConnectedSubscriber.Response.ofSeekOffset(Offset.of(response.getCursor().getOffset())));
        return Status.OK;
    }

    static class Factory
    implements ConnectedSubscriberFactory {
        Factory() {
        }

        @Override
        public ConnectedSubscriberImpl New(StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory, StreamObserver<ConnectedSubscriber.Response> clientStream, SubscribeRequest initialRequest) {
            return new ConnectedSubscriberImpl(streamFactory, clientStream, initialRequest);
        }
    }
}

