/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.reactive.client.internal.api;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.reactive.client.api.MessageGroupingFunction;
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
import org.apache.pulsar.reactive.client.internal.api.DefaultReactiveMessagePipelineBuilder;
import org.apache.pulsar.reactive.client.internal.api.GroupOrderedMessageProcessors;
import org.apache.pulsar.reactive.client.internal.api.InflightLimiter;
import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;

class DefaultReactiveMessagePipeline<T>
implements ReactiveMessagePipeline {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveMessagePipeline.class);
    private static final String INFLIGHT_LIMITER_CONTEXT_KEY = DefaultReactiveMessagePipelineBuilder.class.getName() + ".INFLIGHT_LIMITER_CONTEXT_KEY";
    private final AtomicReference<Disposable> killSwitch = new AtomicReference();
    private final Mono<Void> pipeline;
    private final Function<Message<T>, Publisher<Void>> messageHandler;
    private final BiConsumer<Message<T>, Throwable> errorLogger;
    private final Retry pipelineRetrySpec;
    private final Duration handlingTimeout;
    private final Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler;
    private final int concurrency;
    private final int maxInflight;
    private final MessageGroupingFunction groupingFunction;
    private final AtomicReference<InternalConsumerListenerImpl> consumerListener = new AtomicReference();
    private final AtomicReference<CompletableFuture<Void>> pipelineStoppedFuture = new AtomicReference();

    DefaultReactiveMessagePipeline(ReactiveMessageConsumer<T> messageConsumer, Function<Message<T>, Publisher<Void>> messageHandler, BiConsumer<Message<T>, Throwable> errorLogger, Retry pipelineRetrySpec, Duration handlingTimeout, Function<Mono<Void>, Publisher<Void>> transformer, Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler, MessageGroupingFunction groupingFunction, int concurrency, int maxInflight) {
        this.messageHandler = messageHandler;
        this.errorLogger = errorLogger;
        this.pipelineRetrySpec = pipelineRetrySpec;
        this.handlingTimeout = handlingTimeout;
        this.streamingMessageHandler = streamingMessageHandler;
        this.groupingFunction = groupingFunction;
        this.concurrency = concurrency;
        this.maxInflight = maxInflight;
        this.pipeline = messageConsumer.consumeMany(this::createMessageConsumer).then().transform(transformer).transform(this::decoratePipeline).doFinally(signalType -> {
            CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
            if (f != null) {
                f.complete(null);
            }
        }).doFirst(() -> this.pipelineStoppedFuture.set(new CompletableFuture()));
    }

    private Mono<Void> decorateMessageHandler(Mono<Void> messageHandler) {
        if (this.handlingTimeout != null) {
            messageHandler = messageHandler.timeout(this.handlingTimeout);
        }
        if (this.maxInflight > 0) {
            messageHandler = messageHandler.transformDeferredContextual((original, context) -> {
                InflightLimiter inflightLimiter = (InflightLimiter)context.get((Object)INFLIGHT_LIMITER_CONTEXT_KEY);
                return inflightLimiter.transform(original);
            });
        }
        return messageHandler;
    }

    private Mono<Void> decoratePipeline(Mono<Void> pipeline) {
        if (this.maxInflight > 0) {
            Mono finalPipeline = pipeline;
            pipeline = Mono.using(() -> new InflightLimiter(this.maxInflight), inflightLimiter -> finalPipeline.contextWrite((ContextView)Context.of((Object)INFLIGHT_LIMITER_CONTEXT_KEY, (Object)inflightLimiter)), InflightLimiter::dispose);
        }
        if (this.pipelineRetrySpec != null) {
            return pipeline.retryWhen(this.pipelineRetrySpec);
        }
        return pipeline;
    }

    private Flux<MessageResult<Void>> createMessageConsumer(Flux<Message<T>> messageFlux) {
        if (this.messageHandler != null) {
            if (this.streamingMessageHandler != null) {
                throw new IllegalStateException("messageHandler and streamingMessageHandler cannot be set at the same time.");
            }
            if (this.concurrency > 1) {
                if (this.groupingFunction != null) {
                    return GroupOrderedMessageProcessors.processGroupsInOrderConcurrently(messageFlux, this.groupingFunction, this::handleMessage, Schedulers.parallel(), this.concurrency);
                }
                return messageFlux.flatMap(message -> this.handleMessage((Message<T>)message).subscribeOn(Schedulers.parallel()), this.concurrency);
            }
            return messageFlux.concatMap(this::handleMessage);
        }
        return Flux.from(Objects.requireNonNull(this.streamingMessageHandler, "streamingMessageHandler or messageHandler must be set").apply(messageFlux));
    }

    private Mono<MessageResult<Void>> handleMessage(Message<T> message) {
        return Mono.defer(() -> Mono.from(this.messageHandler.apply(message))).transform(this::decorateMessageHandler).thenReturn(MessageResult.acknowledge(message.getMessageId())).onErrorResume(throwable -> {
            if (this.errorLogger != null) {
                try {
                    this.errorLogger.accept((Message<Message>)message, (Throwable)throwable);
                }
                catch (Exception ex) {
                    LOG.error("Error in calling error logger", (Throwable)ex);
                }
            } else {
                LOG.error("Message handling for message id {} failed.", (Object)message.getMessageId(), throwable);
            }
            return Mono.just(MessageResult.negativeAcknowledge(message.getMessageId()));
        });
    }

    @Override
    public ReactiveMessagePipeline start() {
        if (this.killSwitch.get() != null) {
            throw new IllegalStateException("Message handler is already running.");
        }
        InternalConsumerListenerImpl consumerListener = new InternalConsumerListenerImpl();
        Disposable disposable = this.pipeline.contextWrite((ContextView)Context.of(InternalConsumerListener.class, (Object)consumerListener)).subscribe(null, this::logError, this::logUnexpectedCompletion);
        if (!this.killSwitch.compareAndSet(null, disposable)) {
            disposable.dispose();
            throw new IllegalStateException("Message handler was already running.");
        }
        this.consumerListener.set(consumerListener);
        return this;
    }

    @Override
    public Mono<Void> untilStarted() {
        if (!this.isRunning()) {
            throw new IllegalStateException("Pipeline isn't running. Call start first.");
        }
        InternalConsumerListenerImpl internalConsumerListener = this.consumerListener.get();
        return internalConsumerListener.waitForConsumerCreated();
    }

    private void logError(Throwable throwable) {
        LOG.error("ReactiveMessageHandler was unexpectedly terminated.", throwable);
    }

    private void logUnexpectedCompletion() {
        if (this.isRunning()) {
            LOG.error("ReactiveMessageHandler was unexpectedly completed.");
        }
    }

    @Override
    public ReactiveMessagePipeline stop() {
        Disposable disposable = this.killSwitch.getAndSet(null);
        if (disposable != null) {
            disposable.dispose();
        }
        return this;
    }

    @Override
    public Mono<Void> untilStopped() {
        if (this.isRunning()) {
            throw new IllegalStateException("Pipeline is running. Call stop first.");
        }
        CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
        if (f != null) {
            return Mono.fromFuture(f, (boolean)true);
        }
        return Mono.empty();
    }

    @Override
    public boolean isRunning() {
        return this.killSwitch.get() != null;
    }

    private static final class InternalConsumerListenerImpl
    implements InternalConsumerListener {
        private final CompletableFuture<Void> createdFuture = new CompletableFuture();

        private InternalConsumerListenerImpl() {
        }

        @Override
        public void onConsumerCreated(Object nativeConsumer) {
            if (!this.createdFuture.isDone()) {
                this.createdFuture.complete(null);
            }
        }

        Mono<Void> waitForConsumerCreated() {
            return Mono.fromFuture(this.createdFuture, (boolean)true);
        }
    }
}

