/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.gemfire.listener;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geode.cache.RegionService;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryService;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.gemfire.GemfireQueryException;
import org.springframework.data.gemfire.client.support.DefaultableDelegatingPoolAdapter;
import org.springframework.data.gemfire.client.support.DelegatingPoolAdapter;
import org.springframework.data.gemfire.config.annotation.ContinuousQueryListenerContainerConfigurer;
import org.springframework.data.gemfire.listener.ContinuousQueryDefinition;
import org.springframework.data.gemfire.listener.ContinuousQueryListener;
import org.springframework.data.gemfire.util.ArrayUtils;
import org.springframework.data.gemfire.util.CollectionUtils;
import org.springframework.data.gemfire.util.RuntimeExceptionFactory;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

public class ContinuousQueryListenerContainer
implements BeanFactoryAware,
BeanNameAware,
InitializingBean,
DisposableBean,
SmartLifecycle {
    public static final String DEFAULT_THREAD_NAME_PREFIX = String.format("%s-", ContinuousQueryListenerContainer.class.getSimpleName());
    private boolean autoStartup = true;
    private volatile boolean initialized = false;
    private volatile boolean manageExecutor = false;
    private volatile boolean running = false;
    private int phase = Integer.MAX_VALUE;
    private BeanFactory beanFactory;
    private ErrorHandler errorHandler;
    private Executor taskExecutor;
    private List<ContinuousQueryListenerContainerConfigurer> cqListenerContainerConfigurers = Collections.emptyList();
    private ContinuousQueryListenerContainerConfigurer compositeCqListenerContainerConfigurer = (beanName, container) -> CollectionUtils.nullSafeList(this.cqListenerContainerConfigurers).forEach(configurer -> configurer.configure(beanName, container));
    protected final Log logger = LogFactory.getLog(this.getClass());
    private Queue<CqQuery> continuousQueries = new ConcurrentLinkedQueue<CqQuery>();
    private QueryService queryService;
    private Set<ContinuousQueryDefinition> continuousQueryDefinitions = new LinkedHashSet<ContinuousQueryDefinition>();
    private String beanName;
    private String poolName;

    public void afterPropertiesSet() {
        this.applyContinuousQueryListenerContainerConfigurers();
        this.validateQueryService(this.initQueryService(this.eagerlyInitializePool(this.resolvePoolName())));
        this.initExecutor();
        this.initContinuousQueries();
        this.initialized = true;
    }

    private void applyContinuousQueryListenerContainerConfigurers() {
        this.applyContinuousQueryListenerContainerConfigurers(this.getCompositeContinuousQueryListenerContainerConfigurer());
    }

    private QueryService validateQueryService(QueryService queryService) {
        Assert.state((queryService != null ? 1 : 0) != 0, (String)"QueryService was not properly initialized");
        return queryService;
    }

    protected void applyContinuousQueryListenerContainerConfigurers(ContinuousQueryListenerContainerConfigurer ... configurers) {
        this.applyContinuousQueryListenerContainerConfigurers(Arrays.asList(ArrayUtils.nullSafeArray(configurers, ContinuousQueryListenerContainerConfigurer.class)));
    }

    protected void applyContinuousQueryListenerContainerConfigurers(Iterable<ContinuousQueryListenerContainerConfigurer> configurers) {
        StreamSupport.stream(CollectionUtils.nullSafeIterable(configurers).spliterator(), false).forEach(configurer -> configurer.configure(this.getBeanName(), this));
    }

    String resolvePoolName() {
        return Optional.ofNullable(this.getPoolName()).filter(StringUtils::hasText).orElseGet(() -> Optional.ofNullable(this.getBeanFactory()).filter(it -> it.containsBean("gemfirePool")).map(it -> "gemfirePool").orElse("DEFAULT"));
    }

    String eagerlyInitializePool(String poolName) {
        Supplier<String> poolNameResolver = () -> {
            Assert.notNull((Object)PoolManager.find((String)poolName), (String)String.format("No Pool with name [%s] was found", poolName));
            return poolName;
        };
        return Optional.ofNullable(this.getBeanFactory()).filter(it -> it.containsBean(poolName)).filter(it -> it.isTypeMatch(poolName, Pool.class)).map(it -> {
            try {
                it.getBean(poolName, Pool.class);
                return poolName;
            }
            catch (BeansException ignore) {
                return (String)poolNameResolver.get();
            }
        }).orElseGet(poolNameResolver);
    }

    QueryService initQueryService(String poolName) {
        QueryService queryService = this.getQueryService();
        if (queryService == null || StringUtils.hasText((String)poolName)) {
            this.setQueryService(DefaultableDelegatingPoolAdapter.from(DelegatingPoolAdapter.from(PoolManager.find((String)poolName))).preferPool().getQueryService(queryService));
        }
        return this.getQueryService();
    }

    Executor initExecutor() {
        if (this.getTaskExecutor() == null) {
            this.setTaskExecutor(this.createDefaultTaskExecutor());
            this.manageExecutor = true;
        }
        return this.getTaskExecutor();
    }

    protected Executor createDefaultTaskExecutor() {
        String threadNamePrefix = Optional.ofNullable(this.getBeanName()).filter(StringUtils::hasText).map(it -> String.format("%s-", it)).orElse(DEFAULT_THREAD_NAME_PREFIX);
        return new SimpleAsyncTaskExecutor(threadNamePrefix);
    }

    private void initContinuousQueries() {
        this.initContinuousQueries(this.continuousQueryDefinitions);
    }

    private void initContinuousQueries(Set<ContinuousQueryDefinition> continuousQueryDefinitions) {
        if (this.isRunning()) {
            this.stop();
        }
        this.closeQueries();
        for (ContinuousQueryDefinition definition : continuousQueryDefinitions) {
            this.addContinuousQuery(definition);
        }
    }

    public boolean isActive() {
        return this.initialized;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    protected BeanFactory getBeanFactory() {
        return this.beanFactory;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    protected String getBeanName() {
        return this.beanName;
    }

    public void setCache(RegionService cache) {
        this.setQueryService(cache.getQueryService());
    }

    protected Queue<CqQuery> getContinuousQueries() {
        return this.continuousQueries;
    }

    public void setContinuousQueryListenerContainerConfigurers(ContinuousQueryListenerContainerConfigurer ... configurers) {
        this.setContinuousQueryListenerContainerConfigurers(Arrays.asList(ArrayUtils.nullSafeArray(configurers, ContinuousQueryListenerContainerConfigurer.class)));
    }

    public void setContinuousQueryListenerContainerConfigurers(List<ContinuousQueryListenerContainerConfigurer> configurers) {
        this.cqListenerContainerConfigurers = Optional.ofNullable(configurers).orElseGet(Collections::emptyList);
    }

    protected ContinuousQueryListenerContainerConfigurer getCompositeContinuousQueryListenerContainerConfigurer() {
        return this.compositeCqListenerContainerConfigurer;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    protected Optional<ErrorHandler> getErrorHandler() {
        return Optional.ofNullable(this.errorHandler);
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPoolName(String poolName) {
        this.poolName = poolName;
    }

    protected String getPoolName() {
        return this.poolName;
    }

    public void setQueryListeners(Set<ContinuousQueryDefinition> queries) {
        this.continuousQueryDefinitions.clear();
        this.continuousQueryDefinitions.addAll(CollectionUtils.nullSafeSet(queries));
    }

    public void setQueryService(QueryService queryService) {
        this.queryService = queryService;
    }

    protected QueryService getQueryService() {
        return this.queryService;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    protected Executor getTaskExecutor() {
        return this.taskExecutor;
    }

    public void addListener(ContinuousQueryDefinition definition) {
        CqQuery query = this.addContinuousQuery(definition);
        if (this.isRunning()) {
            this.execute(query);
        }
    }

    CqQuery addContinuousQuery(ContinuousQueryDefinition definition) {
        try {
            CqAttributes attributes = definition.toCqAttributes(this::newCqListener);
            CqQuery query = definition.isNamed() ? this.newNamedContinuousQuery(definition, attributes) : this.newUnnamedContinuousQuery(definition, attributes);
            return this.manage(query);
        }
        catch (QueryException cause) {
            throw new GemfireQueryException(String.format("Unable to create query [%s]", definition.getQuery()), cause);
        }
    }

    protected CqListener newCqListener(ContinuousQueryListener listener) {
        return new EventDispatcherAdapter(listener);
    }

    private CqQuery newNamedContinuousQuery(ContinuousQueryDefinition definition, CqAttributes attributes) throws QueryException {
        return this.getQueryService().newCq(definition.getName(), definition.getQuery(), attributes, definition.isDurable());
    }

    private CqQuery newUnnamedContinuousQuery(ContinuousQueryDefinition definition, CqAttributes attributes) throws CqException {
        return this.getQueryService().newCq(definition.getQuery(), attributes, definition.isDurable());
    }

    private CqQuery manage(CqQuery query) {
        this.getContinuousQueries().add(query);
        return query;
    }

    public synchronized void start() {
        if (!this.isRunning()) {
            this.doStart();
            this.running = true;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Started ContinuousQueryListenerContainer");
            }
        }
    }

    void doStart() {
        this.getContinuousQueries().forEach(this::execute);
    }

    private void execute(CqQuery query) {
        try {
            query.execute();
        }
        catch (QueryException cause) {
            throw new GemfireQueryException(String.format("Could not execute query [%1$s]; state is [%2$s]", query.getName(), query.getState()), cause);
        }
    }

    protected void dispatchEvent(ContinuousQueryListener listener, CqEvent event) {
        this.getTaskExecutor().execute(() -> this.notify(listener, event));
    }

    private void notify(ContinuousQueryListener listener, CqEvent event) {
        try {
            listener.onEvent(event);
        }
        catch (Throwable cause) {
            this.handleListenerError(cause);
        }
    }

    private void handleListenerError(Throwable cause) {
        this.getErrorHandler().filter(errorHandler -> {
            boolean active = this.isActive();
            if (!active && this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"A CQ listener exception occurred after container shutdown; ErrorHandler will not be invoked", cause);
            }
            return active;
        }).ifPresent(errorHandler -> errorHandler.handleError(cause));
        if (!this.getErrorHandler().isPresent() && this.logger.isWarnEnabled()) {
            this.logger.warn((Object)"Execution of CQ listener failed; No ErrorHandler was configured", cause);
        }
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public synchronized void stop() {
        if (this.isRunning()) {
            this.doStop();
            this.running = false;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)"Stopped ContinuousQueryListenerContainer");
        }
    }

    void doStop() {
        this.getContinuousQueries().forEach(query -> {
            block2: {
                try {
                    query.stop();
                }
                catch (Exception cause) {
                    if (!this.logger.isWarnEnabled()) break block2;
                    this.logger.warn((Object)String.format("Cannot stop query [%1$s]; state is [%2$s]", query.getName(), query.getState()), (Throwable)cause);
                }
            }
        });
    }

    public void destroy() throws Exception {
        this.stop();
        this.closeQueries();
        this.destroyExecutor();
        this.initialized = false;
    }

    private void closeQueries() {
        this.getContinuousQueries().stream().filter(query -> !query.isClosed()).forEach(query -> {
            block2: {
                try {
                    query.close();
                }
                catch (Exception cause) {
                    if (!this.logger.isWarnEnabled()) break block2;
                    this.logger.warn((Object)String.format("Cannot close query [%1$s]; state is [%2$s]", query.getName(), query.getState()), (Throwable)cause);
                }
            }
        });
        this.getContinuousQueries().clear();
    }

    private void destroyExecutor() {
        Optional.ofNullable(this.getTaskExecutor()).filter(it -> this.manageExecutor).filter(it -> it instanceof DisposableBean).ifPresent(it -> {
            try {
                ((DisposableBean)it).destroy();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)String.format("Stopped internally-managed TaskExecutor [%s]", it));
                }
            }
            catch (Exception ignore) {
                this.logger.warn((Object)String.format("Failed to properly destroy the managed TaskExecutor [%s]", it));
            }
        });
    }

    protected class EventDispatcherAdapter
    implements CqListener {
        private final ContinuousQueryListener listener;

        protected EventDispatcherAdapter(ContinuousQueryListener listener) {
            this.listener = Optional.ofNullable(listener).orElseThrow(() -> RuntimeExceptionFactory.newIllegalArgumentException("ContinuousQueryListener is required", new Object[0]));
        }

        protected ContinuousQueryListener getListener() {
            return this.listener;
        }

        public void onError(CqEvent event) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.getListener(), event);
        }

        public void onEvent(CqEvent event) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.getListener(), event);
        }

        public void close() {
        }
    }
}

