/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.AddressResolver;
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ProducerBuilder;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.AsyncRetry;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Clock;
import com.rabbitmq.stream.impl.Codecs;
import com.rabbitmq.stream.impl.ConsumersCoordinator;
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator;
import com.rabbitmq.stream.impl.ProducersCoordinator;
import com.rabbitmq.stream.impl.StreamConsumer;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder;
import com.rabbitmq.stream.impl.StreamProducer;
import com.rabbitmq.stream.impl.StreamProducerBuilder;
import com.rabbitmq.stream.impl.StreamStreamCreator;
import com.rabbitmq.stream.impl.Utils;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamEnvironment
implements Environment {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class);
    private final Random random = new Random();
    private final EventLoopGroup eventLoopGroup;
    private final ScheduledExecutorService scheduledExecutorService;
    private final boolean privateScheduleExecutorService;
    private final Client.ClientParameters clientParametersPrototype;
    private final List<Address> addresses;
    private final List<StreamProducer> producers = new CopyOnWriteArrayList<StreamProducer>();
    private final List<StreamConsumer> consumers = new CopyOnWriteArrayList<StreamConsumer>();
    private final Codec codec;
    private final BackOffDelayPolicy recoveryBackOffDelayPolicy;
    private final BackOffDelayPolicy topologyUpdateBackOffDelayPolicy;
    private final ConsumersCoordinator consumersCoordinator;
    private final ProducersCoordinator producersCoordinator;
    private final OffsetTrackingCoordinator offsetTrackingCoordinator;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AddressResolver addressResolver;
    private final Clock clock = new Clock();
    private final ScheduledFuture<?> clockRefreshFuture;
    private final ByteBufAllocator byteBufAllocator;
    private final AtomicBoolean locatorInitialized = new AtomicBoolean(false);
    private final Runnable locatorInitializationSequence;
    private volatile Client locator;

    StreamEnvironment(ScheduledExecutorService scheduledExecutorService, Client.ClientParameters clientParametersPrototype, List<URI> uris, BackOffDelayPolicy recoveryBackOffDelayPolicy, BackOffDelayPolicy topologyBackOffDelayPolicy, AddressResolver addressResolver, int maxProducersByConnection, int maxTrackingConsumersByConnection, int maxConsumersByConnection, StreamEnvironmentBuilder.DefaultTlsConfiguration tlsConfiguration, ByteBufAllocator byteBufAllocator, boolean lazyInit, Function<Utils.ClientConnectionType, String> connectionNamingStrategy) {
        this(scheduledExecutorService, clientParametersPrototype, uris, recoveryBackOffDelayPolicy, topologyBackOffDelayPolicy, addressResolver, maxProducersByConnection, maxTrackingConsumersByConnection, maxConsumersByConnection, tlsConfiguration, byteBufAllocator, lazyInit, connectionNamingStrategy, cp -> new Client((Client.ClientParameters)cp));
    }

    StreamEnvironment(ScheduledExecutorService scheduledExecutorService, Client.ClientParameters clientParametersPrototype, List<URI> uris, BackOffDelayPolicy recoveryBackOffDelayPolicy, BackOffDelayPolicy topologyBackOffDelayPolicy, AddressResolver addressResolver, int maxProducersByConnection, int maxTrackingConsumersByConnection, int maxConsumersByConnection, StreamEnvironmentBuilder.DefaultTlsConfiguration tlsConfiguration, ByteBufAllocator byteBufAllocator, boolean lazyInit, Function<Utils.ClientConnectionType, String> connectionNamingStrategy, Function<Client.ClientParameters, Client> clientFactory) {
        boolean tls;
        this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
        this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
        this.byteBufAllocator = byteBufAllocator;
        clientParametersPrototype.byteBufAllocator(byteBufAllocator);
        clientParametersPrototype = this.maybeSetUpClientParametersFromUris(uris, clientParametersPrototype);
        this.addressResolver = addressResolver;
        if (tlsConfiguration != null && tlsConfiguration.enabled()) {
            tls = true;
            try {
                SslContext sslContext = tlsConfiguration.sslContext() == null ? SslContextBuilder.forClient().build() : tlsConfiguration.sslContext();
                clientParametersPrototype.sslContext(sslContext);
                clientParametersPrototype.tlsHostnameVerification(tlsConfiguration.hostnameVerificationEnabled());
            }
            catch (SSLException e) {
                throw new StreamException("Error while creating Netty SSL context", e);
            }
        } else {
            tls = false;
        }
        if (uris.isEmpty()) {
            this.addresses = Collections.singletonList(new Address(clientParametersPrototype.host, clientParametersPrototype.port));
        } else {
            int defaultPort = tls ? 5551 : 5552;
            this.addresses = uris.stream().map(uriItem -> new Address(uriItem.getHost() == null ? "localhost" : uriItem.getHost(), uriItem.getPort() == -1 ? defaultPort : uriItem.getPort())).collect(Collectors.toList());
        }
        if (clientParametersPrototype.eventLoopGroup == null) {
            this.eventLoopGroup = new NioEventLoopGroup();
            this.clientParametersPrototype = clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
        } else {
            this.eventLoopGroup = null;
            this.clientParametersPrototype = clientParametersPrototype.duplicate().eventLoopGroup(clientParametersPrototype.eventLoopGroup);
        }
        if (scheduledExecutorService == null) {
            this.scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
            this.privateScheduleExecutorService = true;
        } else {
            this.scheduledExecutorService = scheduledExecutorService;
            this.privateScheduleExecutorService = false;
        }
        this.producersCoordinator = new ProducersCoordinator(this, maxProducersByConnection, maxTrackingConsumersByConnection, connectionNamingStrategy, Utils.coordinatorClientFactory(this));
        this.consumersCoordinator = new ConsumersCoordinator(this, maxConsumersByConnection, connectionNamingStrategy, Utils.coordinatorClientFactory(this));
        this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
        AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<Client.ShutdownListener>();
        Client.ShutdownListener shutdownListener = shutdownContext -> {
            if (shutdownContext.isShutdownUnexpected()) {
                this.locator = null;
                LOGGER.debug("Unexpected locator disconnection, trying to reconnect");
                Client.ClientParameters newLocatorParameters = this.clientParametersPrototype.duplicate().shutdownListener((Client.ShutdownListener)shutdownListenerReference.get());
                AsyncRetry.asyncRetry(() -> {
                    Address address = this.addresses.size() == 1 ? this.addresses.get(0) : this.addresses.get(this.random.nextInt(this.addresses.size()));
                    address = addressResolver.resolve(address);
                    LOGGER.debug("Trying to reconnect locator on {}", (Object)address);
                    Client newLocator = (Client)clientFactory.apply(newLocatorParameters.host(address.host()).port(address.port()).clientProperty("connection_name", (String)connectionNamingStrategy.apply(Utils.ClientConnectionType.LOCATOR)));
                    LOGGER.debug("Locator connected on {}", (Object)address);
                    return newLocator;
                }).description("Locator recovery").scheduler(this.scheduledExecutorService).delayPolicy(recoveryBackOffDelayPolicy).build().thenAccept(newLocator -> {
                    this.locator = newLocator;
                });
            }
        };
        shutdownListenerReference.set(shutdownListener);
        Client.ClientParameters clientParametersForInit = clientParametersPrototype.duplicate();
        Runnable locatorInitSequence = () -> {
            RuntimeException lastException = null;
            for (Address address : this.addresses) {
                address = addressResolver.resolve(address);
                Client.ClientParameters locatorParameters = clientParametersForInit.duplicate().host(address.host()).port(address.port()).clientProperty("connection_name", (String)connectionNamingStrategy.apply(Utils.ClientConnectionType.LOCATOR)).shutdownListener((Client.ShutdownListener)shutdownListenerReference.get());
                try {
                    this.locator = (Client)clientFactory.apply(locatorParameters);
                    LOGGER.debug("Locator connected to {}", (Object)address);
                    break;
                }
                catch (RuntimeException e) {
                    LOGGER.debug("Error while try to connect to {}: {}", (Object)address, (Object)e.getMessage());
                    lastException = e;
                }
            }
            if (this.locator == null) {
                throw lastException;
            }
        };
        if (lazyInit) {
            this.locatorInitializationSequence = locatorInitSequence;
        } else {
            locatorInitSequence.run();
            this.locatorInitialized.set(true);
            this.locatorInitializationSequence = () -> {};
        }
        this.codec = clientParametersPrototype.codec == null ? Codecs.DEFAULT : clientParametersPrototype.codec;
        this.clockRefreshFuture = this.scheduledExecutorService.scheduleAtFixedRate(() -> this.clock.refresh(), 1L, 1L, TimeUnit.SECONDS);
    }

    private static String uriDecode(String s) {
        try {
            return URLDecoder.decode(s.replace("+", "%2B"), "US-ASCII");
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    Client.ClientParameters maybeSetUpClientParametersFromUris(List<URI> uris, Client.ClientParameters clientParametersPrototype) {
        String path;
        String userInfo;
        int port;
        if (uris.isEmpty()) {
            return clientParametersPrototype;
        }
        URI uri = uris.get(0);
        clientParametersPrototype = clientParametersPrototype.duplicate();
        String host = uri.getHost();
        if (host != null) {
            clientParametersPrototype.host(host);
        }
        if ((port = uri.getPort()) != -1) {
            clientParametersPrototype.port(port);
        }
        if ((userInfo = uri.getRawUserInfo()) != null) {
            String[] userPassword = userInfo.split(":");
            if (userPassword.length > 2) {
                throw new IllegalArgumentException("Bad user info in URI " + userInfo);
            }
            clientParametersPrototype.username(StreamEnvironment.uriDecode(userPassword[0]));
            if (userPassword.length == 2) {
                clientParametersPrototype.password(StreamEnvironment.uriDecode(userPassword[1]));
            }
        }
        if ((path = uri.getRawPath()) != null && path.length() > 0) {
            if (path.indexOf(47, 1) != -1) {
                throw new IllegalArgumentException("Multiple segments in path of URI: " + path);
            }
            clientParametersPrototype.virtualHost(StreamEnvironment.uriDecode(uri.getPath().substring(1)));
        }
        return clientParametersPrototype;
    }

    public ByteBufAllocator byteBufAllocator() {
        return this.byteBufAllocator;
    }

    void maybeInitializeLocator() {
        if (this.locatorInitialized.compareAndSet(false, true)) {
            try {
                this.locatorInitializationSequence.run();
            }
            catch (RuntimeException e) {
                this.locatorInitialized.set(false);
                throw e;
            }
        }
    }

    @Override
    public StreamCreator streamCreator() {
        return new StreamStreamCreator(this);
    }

    @Override
    public void deleteStream(String stream) {
        this.maybeInitializeLocator();
        Client.Response response = this.locator().delete(stream);
        if (!response.isOk()) {
            throw new StreamException("Error while deleting stream " + stream + " (" + Utils.formatConstant(response.getResponseCode()) + ")", response.getResponseCode());
        }
    }

    @Override
    public ProducerBuilder producerBuilder() {
        return new StreamProducerBuilder(this);
    }

    void addProducer(StreamProducer producer) {
        this.producers.add(producer);
    }

    void removeProducer(StreamProducer producer) {
        this.producers.remove(producer);
    }

    void addConsumer(StreamConsumer consumer) {
        this.consumers.add(consumer);
    }

    void removeConsumer(StreamConsumer consumer) {
        this.consumers.remove(consumer);
    }

    @Override
    public ConsumerBuilder consumerBuilder() {
        return new StreamConsumerBuilder(this);
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            for (StreamProducer producer : this.producers) {
                try {
                    producer.closeFromEnvironment();
                }
                catch (Exception e) {
                    LOGGER.warn("Error while closing producer, moving on to the next one", (Throwable)e);
                }
            }
            for (StreamConsumer consumer : this.consumers) {
                try {
                    consumer.closeFromEnvironment();
                }
                catch (Exception e) {
                    LOGGER.warn("Error while closing consumer, moving on to the next one", (Throwable)e);
                }
            }
            this.producersCoordinator.close();
            this.consumersCoordinator.close();
            this.offsetTrackingCoordinator.close();
            try {
                if (this.locator != null && this.locator.isOpen()) {
                    this.locator.close();
                    this.locator = null;
                }
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing locator client", (Throwable)e);
            }
            this.clockRefreshFuture.cancel(false);
            if (this.privateScheduleExecutorService) {
                this.scheduledExecutorService.shutdownNow();
            }
            try {
                if (!(this.eventLoopGroup == null || this.eventLoopGroup.isShuttingDown() && this.eventLoopGroup.isShutdown())) {
                    LOGGER.debug("Closing Netty event loop group");
                    this.eventLoopGroup.shutdownGracefully(1L, 10L, TimeUnit.SECONDS).get(10L, TimeUnit.SECONDS);
                }
            }
            catch (InterruptedException e) {
                LOGGER.info("Event loop group closing has been interrupted");
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                LOGGER.info("Event loop group closing failed", (Throwable)e);
            }
            catch (TimeoutException e) {
                LOGGER.info("Could not close event loop group in 10 seconds");
            }
        }
    }

    ScheduledExecutorService scheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    BackOffDelayPolicy recoveryBackOffDelayPolicy() {
        return this.recoveryBackOffDelayPolicy;
    }

    BackOffDelayPolicy topologyUpdateBackOffDelayPolicy() {
        return this.topologyUpdateBackOffDelayPolicy;
    }

    CompressionCodecFactory compressionCodecFactory() {
        return this.clientParametersPrototype.compressionCodecFactory;
    }

    Runnable registerConsumer(StreamConsumer consumer, String stream, OffsetSpecification offsetSpecification, String trackingReference, SubscriptionListener subscriptionListener, MessageHandler messageHandler) {
        Runnable closingCallback = this.consumersCoordinator.subscribe(consumer, stream, offsetSpecification, trackingReference, subscriptionListener, messageHandler);
        return closingCallback;
    }

    Runnable registerProducer(StreamProducer producer, String reference, String stream) {
        return this.producersCoordinator.registerProducer(producer, reference, stream);
    }

    Client locator() {
        if (this.locator == null) {
            throw new LocatorNotAvailableException();
        }
        return this.locator;
    }

    <T> T locatorOperation(Function<Client, T> operation) {
        return StreamEnvironment.locatorOperation(operation, () -> this.locator(), this.recoveryBackOffDelayPolicy);
    }

    static <T> T locatorOperation(Function<Client, T> operation, Supplier<Client> clientSupplier, BackOffDelayPolicy backOffDelayPolicy) {
        int maxAttempt = 3;
        boolean executed = false;
        InterruptedException lastException = null;
        T result = null;
        for (int attempt = 0; attempt < maxAttempt; ++attempt) {
            try {
                result = operation.apply(clientSupplier.get());
                executed = true;
            }
            catch (LocatorNotAvailableException e) {
                try {
                    Thread.sleep(backOffDelayPolicy.delay(attempt).toMillis());
                    continue;
                }
                catch (InterruptedException ex) {
                    lastException = ex;
                    Thread.currentThread().interrupt();
                }
            }
            break;
        }
        if (!executed) {
            if (lastException == null) {
                throw new LocatorNotAvailableException();
            }
            throw new StreamException("Could not execute operation after " + maxAttempt + " attempts", lastException);
        }
        return result;
    }

    Clock clock() {
        return this.clock;
    }

    AddressResolver addressResolver() {
        return this.addressResolver;
    }

    Codec codec() {
        return this.codec;
    }

    Client.ClientParameters clientParametersCopy() {
        return this.clientParametersPrototype.duplicate();
    }

    TrackingConsumerRegistration registerTrackingConsumer(StreamConsumer streamConsumer, StreamConsumerBuilder.TrackingConfiguration configuration) {
        Runnable closingCallable = this.producersCoordinator.registerTrackingConsumer(streamConsumer);
        OffsetTrackingCoordinator.Registration offsetTrackingRegistration = this.offsetTrackingCoordinator.needTrackingRegistration(configuration) ? this.offsetTrackingCoordinator.registerTrackingConsumer(streamConsumer, configuration) : null;
        Runnable closingSequence = offsetTrackingRegistration == null ? closingCallable : () -> {
            try {
                LOGGER.debug("Executing offset tracking registration closing sequence");
                offsetTrackingRegistration.closingCallback().run();
                LOGGER.debug("Offset tracking registration closing sequence executed");
            }
            catch (Exception e) {
                LOGGER.warn("Error while executing offset tracking registration closing sequence: {}", (Object)e.getMessage());
            }
            closingCallable.run();
        };
        return new TrackingConsumerRegistration(closingSequence, offsetTrackingRegistration == null ? null : offsetTrackingRegistration.postMessageProcessingCallback(), offsetTrackingRegistration == null ? Utils.NO_OP_LONG_CONSUMER : offsetTrackingRegistration.trackingCallback());
    }

    public String toString() {
        Client locator = this.locator;
        return "{ \"locator\" : " + (locator == null ? "null" : "\"" + locator.connectionName() + "\"") + ", \"producers\" : " + this.producersCoordinator + ", \"consumers\" : " + this.consumersCoordinator + ", \"offset_tracking\" : " + this.offsetTrackingCoordinator + "}";
    }

    static class LocatorNotAvailableException
    extends StreamException {
        public LocatorNotAvailableException() {
            super("Locator not available");
        }
    }

    static class TrackingConsumerRegistration {
        private final Runnable closingCallback;
        private final Consumer<MessageHandler.Context> postMessageProcessingCallback;
        private final LongConsumer trackingCallback;

        TrackingConsumerRegistration(Runnable closingCallback, Consumer<MessageHandler.Context> postMessageProcessingCallback, LongConsumer trackingCallback) {
            this.closingCallback = closingCallback;
            this.postMessageProcessingCallback = postMessageProcessingCallback;
            this.trackingCallback = trackingCallback;
        }

        public Runnable closingCallback() {
            return this.closingCallback;
        }

        public LongConsumer trackingCallback() {
            return this.trackingCallback;
        }

        public Consumer<MessageHandler.Context> postMessageProcessingCallback() {
            return this.postMessageProcessingCallback;
        }
    }
}

