/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.RoutingStrategy;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.StreamProducerBuilder;
import com.rabbitmq.stream.impl.Utils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SuperStreamProducer
implements Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SuperStreamProducer.class);
    private final RoutingStrategy routingStrategy;
    private final Codec codec;
    private final String superStream;
    private final Map<String, Producer> producers = new ConcurrentHashMap<String, Producer>();
    private final StreamProducerBuilder producerBuilder;
    private final StreamEnvironment environment;
    private final String name;
    private final RoutingStrategy.Metadata superStreamMetadata;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MessageInterceptor messageInterceptor;

    SuperStreamProducer(StreamProducerBuilder producerBuilder, String name, String superStream, RoutingStrategy routingStrategy, StreamEnvironment streamEnvironment) {
        this.routingStrategy = routingStrategy;
        this.codec = streamEnvironment.codec();
        this.name = name;
        this.superStream = superStream;
        this.environment = streamEnvironment;
        this.superStreamMetadata = new DefaultSuperStreamMetadata(this.superStream, this.environment);
        this.producerBuilder = producerBuilder.duplicate();
        this.producerBuilder.stream(null);
        this.producerBuilder.resetRouting();
        this.messageInterceptor = this.environment.observationCollector().isNoop() ? (i, msg) -> msg : (i, msg) -> i == 0 ? msg : msg.copy();
    }

    @Override
    public MessageBuilder messageBuilder() {
        return this.codec.messageBuilder();
    }

    @Override
    public long getLastPublishingId() {
        if (this.name != null && !this.name.isEmpty()) {
            List streams = this.environment.locatorOperation(Utils.namedFunction(c -> c.partitions(this.superStream), "Partition lookup for super stream '%s'", this.superStream));
            long publishingId = 0L;
            boolean first = true;
            for (String partition : streams) {
                long pubId = this.environment.locatorOperation(Utils.namedFunction(c -> c.queryPublisherSequence(this.name, partition), "Publisher sequence query for on partition '%s' of super stream '%s', publisher name '%s'", partition, this.superStream, this.name));
                if (first) {
                    publishingId = pubId;
                    first = false;
                    continue;
                }
                if (Long.compareUnsigned(publishingId, pubId) <= 0) continue;
                publishingId = pubId;
            }
            return publishingId;
        }
        throw new IllegalStateException("The producer has no name");
    }

    @Override
    public void send(Message message, ConfirmationHandler confirmationHandler) {
        if (this.canSend()) {
            List<String> streams = this.routingStrategy.route(message, this.superStreamMetadata);
            if (streams.isEmpty()) {
                confirmationHandler.handle(new ConfirmationStatus(message, false, 10005));
            } else if (streams.size() == 1) {
                this.producer(streams.get(0)).send(message, confirmationHandler);
            } else {
                for (int i = 0; i < streams.size(); ++i) {
                    Producer producer = this.producer(streams.get(i));
                    this.producer(streams.get(i)).send(this.messageInterceptor.apply(i, message), confirmationHandler);
                }
            }
        } else {
            confirmationHandler.handle(new ConfirmationStatus(message, false, 10003));
        }
    }

    private Producer producer(String stream) {
        return this.producers.computeIfAbsent(stream, stream1 -> {
            Producer p = this.producerBuilder.duplicate().superStream(null).stream((String)stream1).build();
            return p;
        });
    }

    private boolean canSend() {
        return !this.closed.get();
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            for (Map.Entry<String, Producer> entry : this.producers.entrySet()) {
                try {
                    entry.getValue().close();
                }
                catch (Exception e) {
                    LOGGER.info("Error while closing producer for partition {} of super stream {}: {}", new Object[]{entry.getKey(), this.superStream, e.getMessage()});
                }
            }
        }
    }

    @FunctionalInterface
    private static interface MessageInterceptor {
        public Message apply(int var1, Message var2);
    }

    private static final class DefaultSuperStreamMetadata
    implements RoutingStrategy.Metadata {
        private final String superStream;
        private final StreamEnvironment environment;
        private final List<String> partitions;
        private final Map<String, List<String>> routes = new ConcurrentHashMap<String, List<String>>();

        private DefaultSuperStreamMetadata(String superStream, StreamEnvironment environment) {
            this.superStream = superStream;
            this.environment = environment;
            List ps = environment.locatorOperation(Utils.namedFunction(c -> c.partitions(superStream), "Partition lookup for super stream '%s'", superStream));
            if (ps.isEmpty()) {
                throw new IllegalArgumentException("Super stream '" + superStream + "' has no partition");
            }
            this.partitions = new CopyOnWriteArrayList<String>(ps);
        }

        @Override
        public List<String> partitions() {
            return this.partitions;
        }

        @Override
        public List<String> route(String routingKey) {
            return this.routes.computeIfAbsent(routingKey, routingKey1 -> this.environment.locatorOperation(Utils.namedFunction(c -> c.route((String)routingKey1, this.superStream), "Route lookup on super stream '%s' for key '%s'", this.superStream, routingKey1)));
        }
    }
}

