/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.rabbitmq;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.rabbitmq.BodySerializer;
import io.atleon.rabbitmq.RabbitMQConfig;
import io.atleon.rabbitmq.RabbitMQConfigSource;
import io.atleon.rabbitmq.RabbitMQMessage;
import io.atleon.rabbitmq.RabbitMQMessageCreator;
import io.atleon.rabbitmq.RabbitMQMessageSendInterceptor;
import io.atleon.rabbitmq.RabbitMQSenderResult;
import io.atleon.rabbitmq.SerializedBody;
import java.io.Closeable;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.rabbitmq.CorrelableOutboundMessage;
import reactor.rabbitmq.SendOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

public class AloRabbitMQSender<T>
implements Closeable {
    public static final String CONFIG_PREFIX = "rabbitmq.sender.";
    public static final String INTERCEPTORS_CONFIG = "rabbitmq.sender.send.interceptors";
    public static final String BODY_SERIALIZER_CONFIG = "rabbitmq.sender.body.serializer";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloRabbitMQSender.class);
    private static final SendOptions SEND_OPTIONS = new SendOptions();
    private static final SendOptions ALO_SEND_OPTIONS = new SendOptions().exceptionHandler(AloRabbitMQSender::handleAloSendException);
    private final Mono<SendResources<T>> futureResources;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    private AloRabbitMQSender(RabbitMQConfigSource configSource) {
        this.futureResources = ((Mono)configSource.create()).map(SendResources::fromConfig).cacheInvalidateWhen(resources -> this.closeSink.asFlux().next().then(), SendResources::close);
    }

    public static <T> AloRabbitMQSender<T> from(RabbitMQConfigSource configSource) {
        return new AloRabbitMQSender<T>(configSource);
    }

    public Function<Publisher<T>, Flux<RabbitMQSenderResult<T>>> sendBodies(RabbitMQMessageCreator<T> messageCreator) {
        return bodies -> this.sendBodies((Publisher<T>)bodies, messageCreator);
    }

    public Flux<RabbitMQSenderResult<T>> sendBodies(Publisher<T> bodies, RabbitMQMessageCreator<T> messageCreator) {
        return this.futureResources.flatMapMany(resources -> resources.send(bodies, messageCreator));
    }

    public Flux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendMessages(Publisher<RabbitMQMessage<T>> messages) {
        return this.futureResources.flatMapMany(resources -> resources.send(messages, Function.identity()));
    }

    public Function<Publisher<Alo<T>>, AloFlux<RabbitMQSenderResult<T>>> sendAloBodies(RabbitMQMessageCreator<T> messageCreator) {
        return aloBodies -> this.sendAloBodies((Publisher<Alo<T>>)aloBodies, messageCreator);
    }

    public AloFlux<RabbitMQSenderResult<T>> sendAloBodies(Publisher<Alo<T>> aloBodies, RabbitMQMessageCreator<T> messageCreator) {
        return (AloFlux)this.futureResources.flatMapMany(resources -> resources.sendAlos(aloBodies, messageCreator)).as(AloFlux::wrap);
    }

    public AloFlux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendAloMessages(Publisher<Alo<RabbitMQMessage<T>>> aloMessages) {
        return (AloFlux)this.futureResources.flatMapMany(resources -> resources.sendAlos(aloMessages, Function.identity())).as(AloFlux::wrap);
    }

    public void close(Object reason) {
        LOGGER.info("Closing AloRabbitMQSender due to reason={}", reason);
        this.close();
    }

    @Override
    public void close() {
        this.closeSink.tryEmitNext((Object)System.currentTimeMillis());
    }

    private static void handleAloSendException(Sender.SendContext sendContext, Exception error) {
        CorrelableOutboundMessage message = (CorrelableOutboundMessage)CorrelableOutboundMessage.class.cast(sendContext.getMessage());
        Alo.nacknowledge((Alo)((Alo)Alo.class.cast(message.getCorrelationMetadata())), (Throwable)error);
    }

    private static final class SendResources<T> {
        private final Sender sender;
        private final List<RabbitMQMessageSendInterceptor<T>> interceptors;
        private final BodySerializer<T> bodySerializer;

        public SendResources(Sender sender, List<RabbitMQMessageSendInterceptor<T>> interceptors, BodySerializer<T> bodySerializer) {
            this.sender = sender;
            this.bodySerializer = bodySerializer;
            this.interceptors = interceptors;
        }

        public static <T> SendResources<T> fromConfig(RabbitMQConfig config) {
            SenderOptions senderOptions = new SenderOptions().connectionFactory(config.getConnectionFactory());
            return new SendResources<T>(new Sender(senderOptions), config.loadListOfConfigured(AloRabbitMQSender.INTERCEPTORS_CONFIG), (BodySerializer)config.loadConfiguredOrThrow(AloRabbitMQSender.BODY_SERIALIZER_CONFIG));
        }

        public <R> Flux<RabbitMQSenderResult<R>> send(Publisher<R> items, Function<R, RabbitMQMessage<T>> messageCreator) {
            return Flux.from(items).map(item -> this.toCorrelableOutboundMessage(item, messageCreator)).transform(outboundMessages -> this.sender.sendWithTypedPublishConfirms((Publisher)outboundMessages, SEND_OPTIONS)).map(RabbitMQSenderResult::fromMessageResult);
        }

        public <R> Flux<Alo<RabbitMQSenderResult<R>>> sendAlos(Publisher<Alo<R>> aloItems, Function<R, RabbitMQMessage<T>> messageCreator) {
            return AloFlux.toFlux(aloItems).map(aloItem -> this.toCorrelableOutboundMessage(aloItem, messageCreator.compose(Alo::get))).transform(outboundMessages -> this.sender.sendWithTypedPublishConfirms((Publisher)outboundMessages, ALO_SEND_OPTIONS)).map(RabbitMQSenderResult::fromMessageResultOfAlo);
        }

        public void close() {
            this.sender.close();
        }

        private <R> CorrelableOutboundMessage<R> toCorrelableOutboundMessage(R data, Function<R, RabbitMQMessage<T>> dataToRabbitMQMessage) {
            RabbitMQMessage<T> message = dataToRabbitMQMessage.apply(data);
            SerializedBody serializedBody = this.bodySerializer.serialize(message.getBody());
            for (RabbitMQMessageSendInterceptor<T> interceptor : this.interceptors) {
                message = interceptor.onSend(message, serializedBody);
            }
            return new CorrelableOutboundMessage(message.getExchange(), message.getRoutingKey(), message.getProperties(), serializedBody.bytes(), data);
        }
    }
}

