/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitter;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitterFactory;
import com.google.cloud.pubsublite.internal.wire.SingleConnection;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SequencedCommitCursorRequest;
import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorRequest;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

public class ConnectedCommitterImpl
extends SingleConnection<StreamingCommitCursorRequest, StreamingCommitCursorResponse, SequencedCommitCursorResponse>
implements ConnectedCommitter {
    private final StreamingCommitCursorRequest initialRequest;

    private ConnectedCommitterImpl(StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory, StreamObserver<SequencedCommitCursorResponse> clientStream, StreamingCommitCursorRequest initialRequest) {
        super(streamFactory, clientStream);
        this.initialRequest = initialRequest;
        this.initialize(initialRequest);
    }

    @Override
    protected Status handleInitialResponse(StreamingCommitCursorResponse response) {
        if (!response.hasInitial()) {
            return Status.FAILED_PRECONDITION.withDescription(String.format("Received non-initial first response %s on stream with initial request %s.", response, this.initialRequest));
        }
        return Status.OK;
    }

    @Override
    protected Status handleStreamResponse(StreamingCommitCursorResponse response) {
        if (!response.hasCommit()) {
            return Status.FAILED_PRECONDITION.withDescription(String.format("Received non-commit subsequent response %s on stream with initial request %s.", response, this.initialRequest));
        }
        if (response.getCommit().getAcknowledgedCommits() <= 0L) {
            return Status.FAILED_PRECONDITION.withDescription(String.format("Received non-positive commit count response %s on stream with initial request %s.", response, this.initialRequest));
        }
        this.sendToClient(response.getCommit());
        return Status.OK;
    }

    @Override
    public void commit(Offset offset) {
        this.sendToStream(StreamingCommitCursorRequest.newBuilder().setCommit(SequencedCommitCursorRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset.value()))).build());
    }

    static class Factory
    implements ConnectedCommitterFactory {
        Factory() {
        }

        @Override
        public ConnectedCommitter New(StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory, StreamObserver<SequencedCommitCursorResponse> clientStream, StreamingCommitCursorRequest initialRequest) {
            return new ConnectedCommitterImpl(streamFactory, clientStream, initialRequest);
        }
    }
}

