/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.gemfire.listener;

import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geode.cache.RegionService;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryService;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.gemfire.GemfireQueryException;
import org.springframework.data.gemfire.client.support.DefaultableDelegatingPoolAdapter;
import org.springframework.data.gemfire.client.support.DelegatingPoolAdapter;
import org.springframework.data.gemfire.listener.ContinuousQueryDefinition;
import org.springframework.data.gemfire.listener.ContinuousQueryListener;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

public class ContinuousQueryListenerContainer
implements BeanFactoryAware,
BeanNameAware,
InitializingBean,
DisposableBean,
SmartLifecycle {
    public static final String DEFAULT_THREAD_NAME_PREFIX = String.format("%1$s-", ClassUtils.getShortName(ContinuousQueryListenerContainer.class));
    private boolean autoStartup = true;
    private volatile boolean initialized = false;
    private volatile boolean manageExecutor = false;
    private volatile boolean running = false;
    private int phase = Integer.MAX_VALUE;
    private BeanFactory beanFactory;
    private ErrorHandler errorHandler;
    private Executor taskExecutor;
    protected final Log logger = LogFactory.getLog(this.getClass());
    private Queue<CqQuery> continuousQueries = new ConcurrentLinkedQueue<CqQuery>();
    private QueryService queryService;
    private Set<ContinuousQueryDefinition> continuousQueryDefinitions = new LinkedHashSet<ContinuousQueryDefinition>();
    private String beanName;
    private String poolName;

    public void afterPropertiesSet() {
        this.initQueryService(this.eagerlyInitializePool(this.resolvePoolName()));
        this.initExecutor();
        this.initContinuousQueries(this.continuousQueryDefinitions);
        Assert.state((this.queryService != null ? 1 : 0) != 0, (String)"QueryService was not properly initialized");
        this.initialized = true;
        if (this.isAutoStartup()) {
            this.start();
        }
    }

    String resolvePoolName() {
        String poolName = this.poolName;
        if (!StringUtils.hasText((String)poolName)) {
            String defaultPoolName = "gemfirePool";
            poolName = this.beanFactory != null && this.beanFactory.containsBean(defaultPoolName) ? defaultPoolName : "DEFAULT";
        }
        return poolName;
    }

    String eagerlyInitializePool(String poolName) {
        try {
            if (this.beanFactory != null && this.beanFactory.isTypeMatch(poolName, Pool.class)) {
                this.beanFactory.getBean(poolName, Pool.class);
            }
        }
        catch (BeansException ignore) {
            Assert.notNull((Object)PoolManager.find((String)poolName), (String)String.format("No GemFire Pool with name [%1$s] was found", poolName));
        }
        return poolName;
    }

    QueryService initQueryService(String poolName) {
        if (this.queryService == null || StringUtils.hasText((String)poolName)) {
            this.queryService = DefaultableDelegatingPoolAdapter.from(DelegatingPoolAdapter.from(PoolManager.find((String)poolName))).preferPool().getQueryService(this.queryService);
        }
        return this.queryService;
    }

    Executor initExecutor() {
        if (this.taskExecutor == null) {
            this.taskExecutor = this.createDefaultTaskExecutor();
            this.manageExecutor = true;
        }
        return this.taskExecutor;
    }

    protected TaskExecutor createDefaultTaskExecutor() {
        return new SimpleAsyncTaskExecutor(this.beanName != null ? String.format("%1$s-", this.beanName) : DEFAULT_THREAD_NAME_PREFIX);
    }

    private void initContinuousQueries(Set<ContinuousQueryDefinition> continuousQueryDefinitions) {
        if (this.isRunning()) {
            this.stop();
        }
        this.closeQueries();
        for (ContinuousQueryDefinition definition : continuousQueryDefinitions) {
            this.addContinuousQuery(definition);
        }
    }

    public synchronized void start() {
        if (!this.isRunning()) {
            this.doStart();
            this.running = true;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Started ContinuousQueryListenerContainer");
            }
        }
    }

    private void doStart() {
        for (CqQuery cq : this.continuousQueries) {
            this.executeQuery(cq);
        }
    }

    private void executeQuery(CqQuery cq) {
        try {
            cq.execute();
        }
        catch (QueryException ex) {
            throw new GemfireQueryException(String.format("Could not execute query [%1$s]; state is [%2$s].", cq.getName(), cq.getState()), ex);
        }
        catch (RuntimeException ex) {
            throw new GemfireQueryException(String.format("Could not execute query [%1$s]; state is [%2$s].", cq.getName(), cq.getState()), ex);
        }
    }

    public synchronized void stop() {
        if (this.isRunning()) {
            this.doStop();
            this.running = false;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)"Stopped ContinuousQueryListenerContainer");
        }
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    private void doStop() {
        for (CqQuery cq : this.continuousQueries) {
            try {
                cq.stop();
            }
            catch (Exception e) {
                this.logger.warn((Object)String.format("Cannot stop query '%1$s'; state is '%2$s.", cq.getName(), cq.getState()), (Throwable)e);
            }
        }
    }

    public void destroy() throws Exception {
        this.stop();
        this.closeQueries();
        this.destroyExecutor();
        this.initialized = false;
    }

    private void closeQueries() {
        for (CqQuery cq : this.continuousQueries) {
            try {
                if (cq.isClosed()) continue;
                cq.close();
            }
            catch (Exception e) {
                this.logger.warn((Object)String.format("Cannot close query '%1$s'; state is '%2$s.", cq.getName(), cq.getState()), (Throwable)e);
            }
        }
        this.continuousQueries.clear();
    }

    private void destroyExecutor() throws Exception {
        if (this.manageExecutor && this.taskExecutor instanceof DisposableBean) {
            ((DisposableBean)this.taskExecutor).destroy();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Stopped internally-managed Task Executor.");
            }
        }
    }

    public final boolean isActive() {
        return this.initialized;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setCache(RegionService cache) {
        this.setQueryService(cache.getQueryService());
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPoolName(String poolName) {
        this.poolName = poolName;
    }

    public void setQueryListeners(Set<ContinuousQueryDefinition> queries) {
        this.continuousQueryDefinitions.clear();
        this.continuousQueryDefinitions.addAll(queries);
    }

    public void setQueryService(QueryService service) {
        this.queryService = service;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void addListener(ContinuousQueryDefinition definition) {
        this.doAddListener(definition);
    }

    private void doAddListener(ContinuousQueryDefinition definition) {
        CqQuery cq = this.addContinuousQuery(definition);
        if (this.isRunning()) {
            this.executeQuery(cq);
        }
    }

    private CqQuery addContinuousQuery(ContinuousQueryDefinition definition) {
        try {
            CqAttributesFactory continuousQueryAttributesFactory = new CqAttributesFactory();
            continuousQueryAttributesFactory.addCqListener((CqListener)new EventDispatcherAdapter(definition.getListener()));
            CqAttributes continuousQueryAttributes = continuousQueryAttributesFactory.create();
            CqQuery cq = StringUtils.hasText((String)definition.getName()) ? this.queryService.newCq(definition.getName(), definition.getQuery(), continuousQueryAttributes, definition.isDurable()) : this.queryService.newCq(definition.getQuery(), continuousQueryAttributes, definition.isDurable());
            this.continuousQueries.add(cq);
            return cq;
        }
        catch (RuntimeException ex) {
            throw new GemfireQueryException("Cannot create query ", ex);
        }
        catch (QueryException ex) {
            throw new GemfireQueryException("Cannot create query ", ex);
        }
    }

    private void dispatchEvent(final ContinuousQueryListener listener, final CqEvent event) {
        this.taskExecutor.execute(new Runnable(){

            @Override
            public void run() {
                ContinuousQueryListenerContainer.this.executeListener(listener, event);
            }
        });
    }

    protected void executeListener(ContinuousQueryListener listener, CqEvent event) {
        try {
            listener.onEvent(event);
        }
        catch (Throwable ex) {
            this.handleListenerException(ex);
        }
    }

    protected void handleListenerException(Throwable e) {
        if (this.isActive()) {
            this.invokeErrorHandler(e);
        } else {
            this.logger.debug((Object)"Listener exception after container shutdown", e);
        }
    }

    protected void invokeErrorHandler(Throwable e) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(e);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn((Object)"Execution of the CQ event listener failed, and no ErrorHandler has been set.", e);
        }
    }

    private class EventDispatcherAdapter
    implements CqListener {
        private final ContinuousQueryListener delegate;

        private EventDispatcherAdapter(ContinuousQueryListener delegate) {
            this.delegate = delegate;
        }

        public void onError(CqEvent event) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.delegate, event);
        }

        public void onEvent(CqEvent event) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.delegate, event);
        }

        public void close() {
        }
    }
}

