/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.SequencedPublisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnection;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver;
import com.google.cloud.pubsublite.internal.wire.SerialBatcher;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Monitor;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;

public final class PublisherImpl
extends ProxyService
implements SequencedPublisher<Offset>,
RetryingConnectionObserver<MessagePublishResponse> {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private final AlarmFactory alarmFactory;
    private final PublishRequest initialRequest;
    private final CloseableMonitor monitor = new CloseableMonitor();
    private final Monitor.Guard noneInFlight;
    @GuardedBy(value="monitor.monitor")
    private Optional<Future<?>> alarmFuture;
    @GuardedBy(value="monitor.monitor")
    private final RetryingConnection<PublishRequest, BatchPublisher> connection;
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown;
    @GuardedBy(value="monitor.monitor")
    private Offset lastSentOffset;
    private final CloseableMonitor batcherMonitor;
    @GuardedBy(value="batcherMonitor.monitor")
    private final SerialBatcher batcher;
    @GuardedBy(value="monitor.monitor")
    private final Queue<InFlightBatch> batchesInFlight;
    private final CloseableMonitor reconnectingMonitor;
    @GuardedBy(value="reconnectingMonitor.monitor")
    private boolean reconnecting;

    @VisibleForTesting
    PublisherImpl(StreamFactories.PublishStreamFactory streamFactory, BatchPublisherFactory publisherFactory, AlarmFactory alarmFactory, InitialPublishRequest initialRequest, BatchingSettings batchingSettings) throws ApiException {
        super(new ApiService[0]);
        this.noneInFlight = new Monitor.Guard(this.monitor.monitor){

            public boolean isSatisfied() {
                return PublisherImpl.this.batchesInFlight.isEmpty() || PublisherImpl.this.shutdown;
            }
        };
        this.alarmFuture = Optional.empty();
        this.shutdown = false;
        this.lastSentOffset = Offset.of(-1L);
        this.batcherMonitor = new CloseableMonitor();
        this.batchesInFlight = new ArrayDeque<InFlightBatch>();
        this.reconnectingMonitor = new CloseableMonitor();
        this.reconnecting = false;
        this.alarmFactory = alarmFactory;
        Preconditions.checkNotNull((Object)batchingSettings.getRequestByteThreshold());
        Preconditions.checkNotNull((Object)batchingSettings.getElementCountThreshold());
        this.initialRequest = PublishRequest.newBuilder().setInitialRequest(initialRequest).build();
        this.connection = new RetryingConnectionImpl<PublishRequest, PublishResponse, MessagePublishResponse, BatchPublisher>(streamFactory, publisherFactory, this, this.initialRequest);
        this.batcher = new SerialBatcher(batchingSettings.getRequestByteThreshold(), batchingSettings.getElementCountThreshold());
        this.addServices(this.connection);
    }

    public PublisherImpl(StreamFactories.PublishStreamFactory streamFactory, InitialPublishRequest initialRequest, BatchingSettings batchingSettings) throws ApiException {
        this(streamFactory, new BatchPublisherImpl.Factory(), AlarmFactory.create(Duration.ofNanos(Objects.requireNonNull(batchingSettings.getDelayThreshold()).toNanos())), initialRequest, batchingSettings);
    }

    @GuardedBy(value="monitor.monitor")
    private void rebatchForRestart() {
        Queue messages = this.batchesInFlight.stream().flatMap(b -> b.messages.stream()).collect(Collectors.toCollection(ArrayDeque::new));
        ((GoogleLogger.Api)logger.atFiner()).log("Re-publishing %s messages after reconnection for partition %s", messages.size(), this.initialRequest.getInitialRequest().getPartition());
        long size = 0L;
        int count = 0;
        ArrayDeque<SerialBatcher.UnbatchedMessage> currentBatch = new ArrayDeque<SerialBatcher.UnbatchedMessage>();
        this.batchesInFlight.clear();
        for (SerialBatcher.UnbatchedMessage message : messages) {
            long messageSize = message.message().getSerializedSize();
            if (!(size + messageSize <= 0x380000L && (long)(count + 1) <= 1000L || currentBatch.isEmpty())) {
                this.batchesInFlight.add(new InFlightBatch((List<SerialBatcher.UnbatchedMessage>)ImmutableList.copyOf(currentBatch)));
                currentBatch.clear();
                count = 0;
                size = 0L;
            }
            currentBatch.add(message);
            size += messageSize;
            ++count;
        }
        if (!currentBatch.isEmpty()) {
            this.batchesInFlight.add(new InFlightBatch((List<SerialBatcher.UnbatchedMessage>)ImmutableList.copyOf(currentBatch)));
        }
    }

    @Override
    public void triggerReinitialize(CheckedApiException streamError) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            try (CloseableMonitor.Hold rh = this.reconnectingMonitor.enter();){
                this.reconnecting = true;
            }
            this.connection.reinitialize(this.initialRequest);
            this.rebatchForRestart();
            Queue<InFlightBatch> batches = this.batchesInFlight;
            this.connection.modifyConnection(connectionOr -> {
                if (!connectionOr.isPresent()) {
                    return;
                }
                batches.forEach(batch -> ((BatchPublisher)connectionOr.get()).publish(batch.messagesToSend(), batch.firstSequenceNumber()));
            });
            try (CloseableMonitor.Hold rh = this.reconnectingMonitor.enter();){
                this.reconnecting = false;
            }
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.alarmFuture.ifPresent(future -> future.cancel(false));
            this.alarmFuture = Optional.empty();
            this.terminateOutstandingPublishes(error);
        }
    }

    @Override
    protected void start() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.alarmFuture = Optional.of(this.alarmFactory.newAlarm(this::backgroundFlushToStream));
        }
    }

    @Override
    protected void stop() {
        this.flush();
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.alarmFuture.ifPresent(future -> future.cancel(false));
            this.alarmFuture = Optional.empty();
        }
        this.flush();
    }

    @GuardedBy(value="monitor.monitor")
    private void terminateOutstandingPublishes(CheckedApiException e) {
        this.batchesInFlight.forEach(batch -> batch.messages.forEach(message -> message.future().setException((Throwable)e)));
        try (CloseableMonitor.Hold h = this.batcherMonitor.enter();){
            this.batcher.flush().forEach(batch -> batch.forEach(m -> m.future().setException((Throwable)e)));
        }
        this.batchesInFlight.clear();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public ApiFuture<Offset> publish(PubSubMessage message, PublishSequenceNumber sequenceNumber) {
        try (CloseableMonitor.Hold h = this.batcherMonitor.enter();){
            ApiService.State currentState = this.state();
            switch (currentState) {
                case FAILED: {
                    throw new CheckedApiException("Cannot publish when publisher has failed.", this.failureCause(), StatusCode.Code.FAILED_PRECONDITION);
                }
                case STARTING: 
                case RUNNING: {
                    ApiFuture<Offset> apiFuture = this.batcher.add(message, sequenceNumber);
                    return apiFuture;
                }
            }
            throw new CheckedApiException("Cannot publish when Publisher state is " + currentState.name(), StatusCode.Code.FAILED_PRECONDITION);
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
            return ApiFutures.immediateFailedFuture((Throwable)e);
        }
    }

    @Override
    public void cancelOutstandingPublishes() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.terminateOutstandingPublishes(new CheckedApiException("Cancelled by client.", StatusCode.Code.CANCELLED));
        }
    }

    private void backgroundFlushToStream() {
        try (CloseableMonitor.Hold h = this.reconnectingMonitor.enter();){
            if (this.reconnecting) {
                return;
            }
        }
        this.flushToStream();
    }

    private void flushToStream() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            List<List<SerialBatcher.UnbatchedMessage>> batches;
            if (this.shutdown) {
                return;
            }
            try (CloseableMonitor.Hold h2 = this.batcherMonitor.enter();){
                batches = this.batcher.flush();
            }
            for (List<SerialBatcher.UnbatchedMessage> batch : batches) {
                this.processBatch(batch);
            }
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    @GuardedBy(value="monitor.monitor")
    private void processBatch(List<SerialBatcher.UnbatchedMessage> batch) throws CheckedApiException {
        if (batch.isEmpty()) {
            return;
        }
        InFlightBatch inFlightBatch = new InFlightBatch(batch);
        this.batchesInFlight.add(inFlightBatch);
        this.connection.modifyConnection(connectionOr -> {
            CheckedApiPreconditions.checkState(connectionOr.isPresent(), "Published after the stream shut down.");
            ((BatchPublisher)connectionOr.get()).publish(inFlightBatch.messagesToSend(), inFlightBatch.firstSequenceNumber());
        });
    }

    @Override
    public void flush() {
        this.flushToStream();
        CloseableMonitor.Hold h = this.monitor.enterWhenUninterruptibly(this.noneInFlight);
        if (h != null) {
            h.close();
        }
    }

    @Override
    public void onClientResponse(MessagePublishResponse publishResponse) throws CheckedApiException {
        ImmutableList ranges = ImmutableList.sortedCopyOf(Comparator.comparing(MessagePublishResponse.CursorRange::getStartIndex), (Iterable)publishResponse.getCursorRangesList());
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            CheckedApiPreconditions.checkState(!this.batchesInFlight.isEmpty(), "Received publish response with no batches in flight.");
            InFlightBatch batch = this.batchesInFlight.remove();
            int rangeIndex = 0;
            for (int messageIndex = 0; messageIndex < batch.messages.size(); ++messageIndex) {
                SerialBatcher.UnbatchedMessage message = batch.messages.get(messageIndex);
                try {
                    if (rangeIndex < ranges.size() && ((MessagePublishResponse.CursorRange)ranges.get(rangeIndex)).getEndIndex() <= messageIndex && ++rangeIndex < ranges.size() && ((MessagePublishResponse.CursorRange)ranges.get(rangeIndex)).getStartIndex() < ((MessagePublishResponse.CursorRange)ranges.get(rangeIndex - 1)).getEndIndex()) {
                        throw new CheckedApiException(String.format("Server sent invalid cursor ranges in message publish response: %s", publishResponse), StatusCode.Code.FAILED_PRECONDITION);
                    }
                    if (rangeIndex < ranges.size() && messageIndex >= ((MessagePublishResponse.CursorRange)ranges.get(rangeIndex)).getStartIndex() && messageIndex < ((MessagePublishResponse.CursorRange)ranges.get(rangeIndex)).getEndIndex()) {
                        MessagePublishResponse.CursorRange range = (MessagePublishResponse.CursorRange)ranges.get(rangeIndex);
                        Offset offset = Offset.of(range.getStartCursor().getOffset() + (long)messageIndex - (long)range.getStartIndex());
                        if (this.lastSentOffset.value() >= offset.value()) {
                            throw new CheckedApiException(String.format("Received publish response with offset %s that is inconsistent with previous offset %s", offset, this.lastSentOffset), StatusCode.Code.FAILED_PRECONDITION);
                        }
                        message.future().set((Object)offset);
                        this.lastSentOffset = offset;
                        continue;
                    }
                    message.future().set((Object)Offset.of(-1L));
                    continue;
                }
                catch (CheckedApiException e) {
                    batch.failBatch(messageIndex, e);
                    throw e;
                }
            }
        }
    }

    private static class InFlightBatch {
        final List<SerialBatcher.UnbatchedMessage> messages;

        InFlightBatch(List<SerialBatcher.UnbatchedMessage> toBatch) {
            this.messages = toBatch;
        }

        List<PubSubMessage> messagesToSend() {
            return (List)this.messages.stream().map(SerialBatcher.UnbatchedMessage::message).collect(ImmutableList.toImmutableList());
        }

        PublishSequenceNumber firstSequenceNumber() {
            return this.messages.get(0).sequenceNumber();
        }

        void failBatch(int startIdx, CheckedApiException e) {
            for (int i = startIdx; i < this.messages.size(); ++i) {
                this.messages.get(i).future().setException((Throwable)e);
            }
        }
    }
}

