/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.reactive.client.internal.adapter;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class ProducerCacheEntry
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ProducerCacheEntry.class);
    private final AtomicReference<Producer<?>> producer = new AtomicReference();
    private final AtomicReference<Mono<? extends Producer<?>>> producerCreator = new AtomicReference();
    private final AtomicInteger activeLeases = new AtomicInteger(0);
    private final PublisherTransformer producerActionTransformer;
    private volatile boolean removed;

    ProducerCacheEntry(Producer<?> producer, Supplier<PublisherTransformer> producerActionTransformer) {
        this.producer.set(producer);
        this.producerCreator.set(Mono.fromSupplier(this.producer::get));
        this.producerActionTransformer = producerActionTransformer != null ? producerActionTransformer.get() : PublisherTransformer.identity();
    }

    private static void flushAndCloseProducerAsync(Producer<?> producer) {
        ((CompletableFuture)producer.flushAsync().thenCompose(__ -> producer.closeAsync())).whenComplete((r, t) -> {
            if (t != null) {
                log.error("Error flushing and closing producer", t);
            }
        });
    }

    void activateLease() {
        this.activeLeases.incrementAndGet();
    }

    void releaseLease() {
        int currentLeases = this.activeLeases.decrementAndGet();
        if (currentLeases == 0 && this.removed) {
            this.cleanupResources();
        }
    }

    <T> Producer<T> getProducer() {
        return this.producer.get();
    }

    <T> Mono<ProducerCacheEntry> recreateIfClosed(Mono<Producer<T>> producerMono) {
        return Mono.defer(() -> {
            Producer<?> p = this.producer.get();
            if (p != null) {
                if (p.isConnected()) {
                    return Mono.just((Object)this);
                }
                Mono<? extends Producer<?>> previousUpdater = this.producerCreator.get();
                if (this.producerCreator.compareAndSet(previousUpdater, this.createCachedProducerMono(producerMono))) {
                    this.producer.compareAndSet(p, null);
                    ProducerCacheEntry.flushAndCloseProducerAsync(p);
                }
            }
            return Mono.defer(this.producerCreator::get).filter(Producer::isConnected).repeatWhenEmpty(5, flux -> flux.delayElements(Duration.ofSeconds(1L))).thenReturn((Object)this);
        });
    }

    private <T> Mono<Producer<T>> createCachedProducerMono(Mono<Producer<T>> producerMono) {
        return producerMono.doOnNext(newProducer -> {
            log.info("Replaced closed producer for topic {}", (Object)newProducer.getTopic());
            this.producer.set((Producer<?>)newProducer);
        }).cache();
    }

    @Override
    public void close() {
        this.removed = true;
        if (this.activeLeases.get() == 0) {
            this.cleanupResources();
        }
    }

    private void cleanupResources() {
        try {
            this.closeProducer();
        }
        finally {
            this.producerActionTransformer.dispose();
        }
    }

    private void closeProducer() {
        Producer<?> p = this.producer.get();
        if (p != null && this.producer.compareAndSet(p, null)) {
            log.info("Closed producer {} for topic {}", (Object)p.getProducerName(), (Object)p.getTopic());
            ProducerCacheEntry.flushAndCloseProducerAsync(p);
        }
    }

    <R> Publisher<? extends R> decorateProducerAction(Flux<R> source) {
        return this.producerActionTransformer.transform(source);
    }

    <R> Mono<? extends R> decorateProducerAction(Mono<R> source) {
        return Mono.from((Publisher)this.producerActionTransformer.transform(source));
    }

    PublisherTransformer getProducerActionTransformer() {
        return this.producerActionTransformer;
    }
}

