/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.PulsarConnection;
import com.datastax.oss.pulsar.jms.PulsarDestination;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import com.datastax.oss.pulsar.jms.PulsarQueue;
import com.datastax.oss.pulsar.jms.Utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarConnectionFactory
implements ConnectionFactory,
QueueConnectionFactory,
TopicConnectionFactory,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(PulsarConnectionFactory.class);
    private static final Set<String> clientIdentifiers = new ConcurrentSkipListSet<String>();
    private final Map<String, Producer<byte[]>> producers = new ConcurrentHashMap<String, Producer<byte[]>>();
    private final Set<PulsarConnection> connections = Collections.synchronizedSet(new HashSet());
    private final List<Consumer<byte[]>> consumers = new CopyOnWriteArrayList<Consumer<byte[]>>();
    private final List<Reader<byte[]>> readers = new CopyOnWriteArrayList<Reader<byte[]>>();
    private PulsarClient pulsarClient;
    private PulsarAdmin pulsarAdmin;
    private Map<String, Object> producerConfiguration;
    private Map<String, Object> consumerConfiguration;
    private String systemNamespace = "public/default";
    private String defaultClientId = null;
    private boolean enableTransaction = false;
    private boolean enableClientSideEmulation = false;
    private boolean forceDeleteTemporaryDestinations = false;
    private boolean useExclusiveSubscriptionsForSimpleConsumers = false;
    private boolean acknowledgeRejectedMessages = false;
    private String tckUsername = "";
    private String tckPassword = "";
    private String queueSubscriptionName = "jms-queue";
    private long waitForServerStartupTimeout = 60000L;
    private boolean initialized;
    private Map<String, Object> configuration;

    public PulsarConnectionFactory() throws JMSException {
        this(new HashMap<String, Object>());
    }

    public PulsarConnectionFactory(Map<String, Object> properties) throws JMSException {
        this.configuration = new HashMap<String, Object>(properties);
    }

    public PulsarConnectionFactory(String configuration) throws JMSException {
        this();
        this.setJsonConfiguration(configuration);
    }

    public String getJsonConfiguration() {
        return Utils.runtimeException(() -> new ObjectMapper().writeValueAsString(this.getConfiguration()));
    }

    public void setJsonConfiguration(String json) {
        if (json == null || json.isEmpty()) {
            this.setConfiguration(Collections.emptyMap());
            return;
        }
        this.setConfiguration(Utils.runtimeException(() -> (Map)new ObjectMapper().readValue(json, Map.class)));
    }

    public synchronized Map<String, Object> getConfiguration() {
        return new HashMap<String, Object>(this.configuration);
    }

    public synchronized void setConfiguration(Map<String, Object> configuration) {
        this.configuration = new HashMap<String, Object>(configuration);
    }

    private synchronized Map<String, Object> getConsumerConfiguration() {
        return this.consumerConfiguration;
    }

    private synchronized Map<String, Object> getProducerConfiguration() {
        return this.producerConfiguration;
    }

    private synchronized void ensureInitialized() throws JMSException {
        if (this.initialized) {
            return;
        }
        try {
            Map producerConfiguration = (Map)this.configuration.remove("producerConfig");
            this.producerConfiguration = producerConfiguration != null ? new HashMap<String, Object>(producerConfiguration) : Collections.emptyMap();
            Map consumerConfigurationM = (Map)this.configuration.remove("consumerConfig");
            this.consumerConfiguration = consumerConfigurationM != null ? new HashMap<String, Object>(consumerConfigurationM) : Collections.emptyMap();
            this.systemNamespace = PulsarConnectionFactory.getAndRemoveString("jms.systemNamespace", "public/default", this.configuration);
            this.tckUsername = PulsarConnectionFactory.getAndRemoveString("jms.tckUsername", "", this.configuration);
            this.tckPassword = PulsarConnectionFactory.getAndRemoveString("jms.tckPassword", "", this.configuration);
            this.defaultClientId = PulsarConnectionFactory.getAndRemoveString("jms.clientId", null, this.configuration);
            this.queueSubscriptionName = PulsarConnectionFactory.getAndRemoveString("jms.queueSubscriptionName", "jms-queue", this.configuration);
            this.waitForServerStartupTimeout = Long.parseLong(PulsarConnectionFactory.getAndRemoveString("jms.waitForServerStartupTimeout", "60000", this.configuration));
            this.enableClientSideEmulation = Boolean.parseBoolean(PulsarConnectionFactory.getAndRemoveString("jms.enableClientSideEmulation", "false", this.configuration));
            this.useExclusiveSubscriptionsForSimpleConsumers = Boolean.parseBoolean(PulsarConnectionFactory.getAndRemoveString("jms.useExclusiveSubscriptionsForSimpleConsumers", "true", this.configuration));
            this.acknowledgeRejectedMessages = Boolean.parseBoolean(PulsarConnectionFactory.getAndRemoveString("jms.acknowledgeRejectedMessages", "false", this.configuration));
            this.forceDeleteTemporaryDestinations = Boolean.parseBoolean(PulsarConnectionFactory.getAndRemoveString("jms.forceDeleteTemporaryDestinations", "false", this.configuration));
            this.enableTransaction = Boolean.parseBoolean(this.configuration.getOrDefault("enableTransaction", "false").toString());
            String webServiceUrl = PulsarConnectionFactory.getAndRemoveString("webServiceUrl", "http://localhost:8080", this.configuration);
            String brokenServiceUrl = PulsarConnectionFactory.getAndRemoveString("brokerServiceUrl", "", this.configuration);
            PulsarClient pulsarClient = null;
            PulsarAdmin pulsarAdmin = null;
            try {
                String authPluginClassName = PulsarConnectionFactory.getAndRemoveString("authPlugin", "", this.configuration);
                String authParamsString = PulsarConnectionFactory.getAndRemoveString("authParams", "", this.configuration);
                Authentication authentication = AuthenticationFactory.create((String)authPluginClassName, (String)authParamsString);
                if (log.isDebugEnabled()) {
                    log.debug("Authentication {}", (Object)authentication);
                }
                boolean tlsAllowInsecureConnection = Boolean.parseBoolean(PulsarConnectionFactory.getAndRemoveString("tlsAllowInsecureConnection", "false", this.configuration));
                boolean tlsEnableHostnameVerification = Boolean.parseBoolean(PulsarConnectionFactory.getAndRemoveString("tlsEnableHostnameVerification", "false", this.configuration));
                String tlsTrustCertsFilePath = PulsarConnectionFactory.getAndRemoveString("tlsTrustCertsFilePath", "", this.configuration);
                boolean useKeyStoreTls = Boolean.parseBoolean(PulsarConnectionFactory.getAndRemoveString("useKeyStoreTls", "false", this.configuration));
                String tlsTrustStoreType = PulsarConnectionFactory.getAndRemoveString("tlsTrustStoreType", "JKS", this.configuration);
                String tlsTrustStorePath = PulsarConnectionFactory.getAndRemoveString("tlsTrustStorePath", "", this.configuration);
                String tlsTrustStorePassword = PulsarConnectionFactory.getAndRemoveString("tlsTrustStorePassword", "", this.configuration);
                pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrl).allowTlsInsecureConnection(tlsAllowInsecureConnection).enableTlsHostnameVerification(tlsEnableHostnameVerification).tlsTrustCertsFilePath(tlsTrustCertsFilePath).useKeyStoreTls(useKeyStoreTls).tlsTrustStoreType(tlsTrustStoreType).tlsTrustStorePath(tlsTrustStorePath).tlsTrustStorePassword(tlsTrustStorePassword).authentication(authentication).build();
                ClientBuilder clientBuilder = PulsarClient.builder().loadConf(this.configuration).tlsTrustStorePassword(tlsTrustStorePassword).tlsTrustStorePath(tlsTrustStorePath).tlsTrustCertsFilePath(tlsTrustCertsFilePath).tlsTrustStoreType(tlsTrustStoreType).useKeyStoreTls(useKeyStoreTls).enableTlsHostnameVerification(tlsEnableHostnameVerification).allowTlsInsecureConnection(tlsAllowInsecureConnection).serviceUrl(webServiceUrl).authentication(authentication);
                if (!brokenServiceUrl.isEmpty()) {
                    clientBuilder.serviceUrl(brokenServiceUrl);
                }
                pulsarClient = clientBuilder.build();
            }
            catch (PulsarClientException err) {
                if (pulsarAdmin != null) {
                    pulsarAdmin.close();
                }
                if (pulsarClient != null) {
                    pulsarClient.close();
                }
                throw err;
            }
            this.pulsarClient = pulsarClient;
            this.pulsarAdmin = pulsarAdmin;
            this.initialized = true;
        }
        catch (Throwable t) {
            throw Utils.handleException(t);
        }
    }

    private static String getAndRemoveString(String name, String defaultValue, Map<String, Object> properties) {
        Object value = properties.remove(name);
        return value != null ? value.toString() : defaultValue;
    }

    public synchronized boolean isEnableClientSideEmulation() {
        return this.enableClientSideEmulation;
    }

    synchronized String getDefaultClientId() {
        return this.defaultClientId;
    }

    public synchronized boolean isEnableTransaction() {
        return this.enableTransaction;
    }

    public synchronized PulsarClient getPulsarClient() {
        return this.pulsarClient;
    }

    public synchronized PulsarAdmin getPulsarAdmin() {
        return this.pulsarAdmin;
    }

    public synchronized String getSystemNamespace() {
        return this.systemNamespace;
    }

    public PulsarConnection createConnection() throws JMSException {
        this.ensureInitialized();
        PulsarConnection res = new PulsarConnection(this);
        this.connections.add(res);
        return res;
    }

    public PulsarConnection createConnection(String userName, String password) throws JMSException {
        this.ensureInitialized();
        this.validateDummyUserNamePassword(userName, password);
        return this.createConnection();
    }

    private synchronized void validateDummyUserNamePassword(String userName, String password) throws JMSSecurityException {
        if (!(this.tckUsername == null || this.tckUsername.isEmpty() || this.tckUsername.equals(userName) || this.tckPassword == null || this.tckPassword.equals(password))) {
            throw new JMSSecurityException("Unauthorized");
        }
    }

    public JMSContext createContext() {
        return this.createContext(1);
    }

    public JMSContext createContext(String userName, String password) {
        return this.createContext(userName, password, 1);
    }

    public JMSContext createContext(String userName, String password, int sessionMode) {
        Utils.runtimeException(this::ensureInitialized);
        Utils.runtimeException(() -> this.validateDummyUserNamePassword(userName, password));
        return this.createContext(sessionMode);
    }

    public JMSContext createContext(int sessionMode) {
        Utils.runtimeException(this::ensureInitialized);
        return new PulsarJMSContext(this, sessionMode);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Iterator<Producer<byte[]>> iterator = this;
        synchronized (iterator) {
            if (!this.initialized) {
                return;
            }
        }
        for (PulsarConnection con : new ArrayList<PulsarConnection>(this.connections)) {
            try {
                con.close();
            }
            catch (Exception ignore) {
                Utils.handleException(ignore);
            }
        }
        for (Producer<byte[]> producer : this.producers.values()) {
            try {
                producer.close();
            }
            catch (PulsarClientException ignore) {
                Utils.handleException(ignore);
            }
        }
        this.pulsarAdmin.close();
        try {
            this.pulsarClient.close();
        }
        catch (PulsarClientException err) {
            log.info("Error closing PulsarClient", (Throwable)err);
        }
    }

    Producer<byte[]> getProducerForDestination(PulsarDestination defaultDestination, boolean transactions) throws JMSException {
        try {
            String fullQualifiedTopicName = this.applySystemNamespace(defaultDestination.topicName);
            String key = transactions ? fullQualifiedTopicName + "-tx" : fullQualifiedTopicName;
            return this.producers.computeIfAbsent(key, d -> {
                try {
                    return Utils.invoke(() -> {
                        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(this.applySystemNamespace(fullQualifiedTopicName)).loadConf(this.getProducerConfiguration());
                        if (transactions) {
                            producerBuilder.sendTimeout(0, TimeUnit.MILLISECONDS);
                        }
                        return producerBuilder.create();
                    });
                }
                catch (JMSException err) {
                    throw new RuntimeException(err);
                }
            });
        }
        catch (RuntimeException err) {
            throw (JMSException)err.getCause();
        }
    }

    public void ensureQueueSubscription(PulsarDestination destination) throws JMSException {
        long start = System.currentTimeMillis();
        String fullQualifiedTopicName = this.applySystemNamespace(destination.topicName);
        while (true) {
            try {
                this.getPulsarAdmin().topics().createSubscription(fullQualifiedTopicName, this.getQueueSubscriptionName(), MessageId.earliest);
                break;
            }
            catch (PulsarAdminException.ConflictException exists) {
                log.debug("Subscription {} already exists for {}", (Object)this.getQueueSubscriptionName(), (Object)fullQualifiedTopicName);
                break;
            }
            catch (PulsarAdminException err) {
                long now = System.currentTimeMillis();
                if (now - start > this.getWaitForServerStartupTimeout()) {
                    throw Utils.handleException(err);
                }
                log.info("Got {} error while setting up subscription for queue {}, maybe the namespace/broker is still starting", (Object)err.toString(), (Object)fullQualifiedTopicName);
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {
                    Thread.currentThread().interrupt();
                    throw Utils.handleException(err);
                }
            }
        }
    }

    public void ensureSubscription(PulsarDestination destination, String consumerName) throws JMSException {
        String fullQualifiedTopicName = this.applySystemNamespace(destination.topicName);
        String subscriptionName = destination.isQueue() ? this.queueSubscriptionName : consumerName;
        log.info("Creating subscription {} for destination {}", (Object)subscriptionName, (Object)fullQualifiedTopicName);
        try {
            this.pulsarAdmin.topics().createSubscription(fullQualifiedTopicName, subscriptionName, MessageId.latest);
        }
        catch (PulsarAdminException.ConflictException alreadyExists) {
            log.info("Subscription {} already exists, this is usually not a problem", (Object)subscriptionName);
        }
        catch (Exception err) {
            throw Utils.handleException(err);
        }
    }

    public Consumer<byte[]> createConsumer(PulsarDestination destination, String consumerName, int sessionMode, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws JMSException {
        String fullQualifiedTopicName = this.applySystemNamespace(destination.topicName);
        String subscriptionName = destination.isQueue() ? this.queueSubscriptionName : consumerName;
        SubscriptionInitialPosition initialPosition = destination.isTopic() ? SubscriptionInitialPosition.Latest : SubscriptionInitialPosition.Earliest;
        Object seekMessageId = null;
        if (destination.isQueue() && subscriptionMode != SubscriptionMode.Durable) {
            throw new IllegalStateException("only durable mode for queues");
        }
        if (destination.isQueue() && subscriptionType != SubscriptionType.Shared) {
            throw new IllegalStateException("only Shared SubscriptionType for queues");
        }
        log.debug("createConsumer {} {} {}", new Object[]{fullQualifiedTopicName, consumerName, subscriptionMode, subscriptionType});
        try {
            ConsumerBuilder builder = this.pulsarClient.newConsumer().negativeAckRedeliveryDelay(1L, TimeUnit.SECONDS).loadConf(this.getConsumerConfiguration()).subscriptionInitialPosition(initialPosition).subscriptionMode(subscriptionMode).subscriptionType(subscriptionType).subscriptionName(subscriptionName).topic(new String[]{fullQualifiedTopicName});
            Consumer newConsumer = builder.subscribe();
            this.consumers.add((Consumer<byte[]>)newConsumer);
            return newConsumer;
        }
        catch (PulsarClientException err) {
            throw Utils.handleException(err);
        }
    }

    public Reader<byte[]> createReaderForBrowser(PulsarQueue destination) throws JMSException {
        String fullQualifiedTopicName = this.applySystemNamespace(destination.topicName);
        try {
            List messages = this.getPulsarAdmin().topics().peekMessages(fullQualifiedTopicName, this.queueSubscriptionName, 1);
            MessageId seekMessageId = messages.isEmpty() ? MessageId.latest : ((Message)messages.get(0)).getMessageId();
            log.info("createBrowser {} at {}", (Object)fullQualifiedTopicName, (Object)seekMessageId);
            ReaderBuilder builder = this.pulsarClient.newReader().loadConf(this.getConsumerConfiguration()).readerName("jms-queue-browser-" + UUID.randomUUID()).startMessageId(seekMessageId).startMessageIdInclusive().topic(fullQualifiedTopicName);
            Reader newReader = builder.create();
            this.readers.add((Reader<byte[]>)newReader);
            return newReader;
        }
        catch (PulsarAdminException | PulsarClientException err) {
            throw Utils.handleException(err);
        }
    }

    public void removeConsumer(Consumer<byte[]> consumer) {
        this.consumers.remove(consumer);
    }

    public void removeReader(Reader<byte[]> reader) {
        this.readers.remove(reader);
    }

    public boolean deleteSubscription(PulsarDestination destination, String name) throws JMSException {
        String systemNamespace = this.getSystemNamespace();
        boolean somethingDone = false;
        try {
            if (destination != null) {
                String fullQualifiedTopicName = this.applySystemNamespace(destination.topicName);
                log.info("deleteSubscription topic {} name {}", (Object)fullQualifiedTopicName, (Object)name);
                try {
                    this.pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, true);
                    somethingDone = true;
                }
                catch (PulsarAdminException.NotFoundException notFound) {
                    log.error("Cannot unsubscribe {} from {}: not found", (Object)name, (Object)fullQualifiedTopicName);
                }
            }
            if (!somethingDone) {
                List allTopics = this.pulsarAdmin.topics().getList(systemNamespace);
                for (String topic : allTopics) {
                    log.info("Scanning topic {}", (Object)topic);
                    List subscriptions = this.pulsarAdmin.topics().getSubscriptions(topic);
                    log.info("Subscriptions {}", (Object)subscriptions);
                    for (String subscription : subscriptions) {
                        log.info("Found subscription {} ", (Object)subscription);
                        if (!subscription.equals(name)) continue;
                        log.info("deleteSubscription topic {} name {}", (Object)topic, (Object)name);
                        this.pulsarAdmin.topics().deleteSubscription(topic, name, true);
                        somethingDone = true;
                    }
                }
            }
        }
        catch (Exception err) {
            throw Utils.handleException(err);
        }
        return somethingDone;
    }

    public void registerClientId(String clientID) throws InvalidClientIDException {
        log.info("registerClientId {}, existing {}", (Object)clientID, clientIdentifiers);
        if (!clientIdentifiers.add(clientID)) {
            throw new InvalidClientIDException("A connection with this client id '" + clientID + "'is already opened locally");
        }
    }

    public void unregisterConnection(PulsarConnection connection) {
        if (connection.clientId != null) {
            clientIdentifiers.remove(connection.clientId);
            log.info("unregisterClientId {} {}", (Object)connection.clientId, clientIdentifiers);
        }
        this.connections.remove(connection);
    }

    public QueueConnection createQueueConnection() throws JMSException {
        return this.createConnection();
    }

    public QueueConnection createQueueConnection(String s, String s1) throws JMSException {
        return this.createConnection(s, s1);
    }

    public TopicConnection createTopicConnection() throws JMSException {
        return this.createConnection();
    }

    public TopicConnection createTopicConnection(String s, String s1) throws JMSException {
        return this.createConnection(s, s1);
    }

    public synchronized boolean isForceDeleteTemporaryDestinations() {
        return this.forceDeleteTemporaryDestinations;
    }

    public synchronized String getQueueSubscriptionName() {
        return this.queueSubscriptionName;
    }

    public synchronized long getWaitForServerStartupTimeout() {
        return this.waitForServerStartupTimeout;
    }

    public synchronized SubscriptionType getExclusiveSubscriptionTypeForSimpleConsumers() {
        return this.useExclusiveSubscriptionsForSimpleConsumers ? SubscriptionType.Exclusive : SubscriptionType.Shared;
    }

    public String applySystemNamespace(String destination) {
        if (destination == null) {
            return null;
        }
        if (destination.startsWith("persistent://") || destination.startsWith("non-persistent://")) {
            return destination;
        }
        return "persistent://" + this.getSystemNamespace() + "/" + destination;
    }

    public boolean isAcknowledgeRejectedMessages() {
        return this.acknowledgeRejectedMessages;
    }
}

