/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.server.admin.deployment.zk;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.embedded.EmbeddedServletContainerInitializedEvent;
import org.springframework.boot.context.embedded.EmbeddedWebApplicationContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.cluster.Admin;
import org.springframework.xd.dirt.cluster.AdminAttributes;
import org.springframework.xd.dirt.container.store.AdminRepository;
import org.springframework.xd.dirt.container.store.ContainerRepository;
import org.springframework.xd.dirt.job.JobFactory;
import org.springframework.xd.dirt.server.admin.deployment.ContainerMatcher;
import org.springframework.xd.dirt.server.admin.deployment.DeploymentUnitStateCalculator;
import org.springframework.xd.dirt.server.admin.deployment.zk.ContainerListener;
import org.springframework.xd.dirt.server.admin.deployment.zk.DeploymentMessageConsumer;
import org.springframework.xd.dirt.server.admin.deployment.zk.DeploymentQueue;
import org.springframework.xd.dirt.server.admin.deployment.zk.ModuleDeploymentWriter;
import org.springframework.xd.dirt.server.admin.deployment.zk.SupervisorElectedEvent;
import org.springframework.xd.dirt.server.admin.deployment.zk.SupervisorElectionListener;
import org.springframework.xd.dirt.stream.StreamFactory;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;

public class DeploymentSupervisor
implements ApplicationListener<ApplicationEvent>,
DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(DeploymentSupervisor.class);
    @Autowired
    private ZooKeeperConnection zkConnection;
    @Autowired
    private AdminRepository adminRepository;
    @Autowired
    private DeploymentMessageConsumer deploymentMessageConsumer;
    @Autowired
    private StreamFactory streamFactory;
    @Autowired
    private JobFactory jobFactory;
    @Autowired
    private ContainerMatcher containerMatcher;
    @Autowired
    private ContainerRepository containerRepository;
    @Autowired
    private ModuleDeploymentWriter moduleDeploymentWriter;
    @Autowired
    private DeploymentUnitStateCalculator stateCalculator;
    private final AdminAttributes adminAttributes;
    private volatile DeploymentQueue deploymentQueueForConsumer = null;
    private volatile ApplicationContext applicationContext;
    private volatile LeaderSelector leaderSelector;
    private final LeaderSelectorListener leaderListener = new LeaderListener();
    private final ConnectionListener connectionListener = new ConnectionListener();
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory((String)"DeploymentSupervisor"));
    private static final String MGMT_CONTEXT_NAMESPACE = "management";
    private final AtomicLong quietPeriod = new AtomicLong(15000L);
    public static final String QUIET_PERIOD_PROPERTY = "xd.admin.quietPeriod";

    public DeploymentSupervisor(AdminAttributes adminAttributes) {
        this.adminAttributes = adminAttributes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            String namespace = ((EmbeddedWebApplicationContext)event.getSource()).getNamespace();
            if (!MGMT_CONTEXT_NAMESPACE.equals(namespace)) {
                this.applicationContext = ((ContextRefreshedEvent)event).getApplicationContext();
                String delay = this.applicationContext.getEnvironment().getProperty(QUIET_PERIOD_PROPERTY);
                if (StringUtils.hasText((String)delay)) {
                    this.quietPeriod.set(Long.parseLong(delay));
                    logger.info("Set container quiet period to {} ms", (Object)delay);
                }
                if (this.zkConnection.isConnected()) {
                    this.registerWithZooKeeper(this.zkConnection.getClient());
                    this.requestLeadership(this.zkConnection.getClient());
                }
                this.zkConnection.addListener(this.connectionListener);
            }
        } else if (event instanceof ContextStoppedEvent) {
            if (this.leaderSelector != null) {
                this.leaderSelector.close();
            }
        } else if (event instanceof EmbeddedServletContainerInitializedEvent) {
            String namespace = ((EmbeddedServletContainerInitializedEvent)event).getApplicationContext().getNamespace();
            int port = ((EmbeddedServletContainerInitializedEvent)event).getEmbeddedServletContainer().getPort();
            AdminAttributes adminAttributes = this.adminAttributes;
            synchronized (adminAttributes) {
                if (MGMT_CONTEXT_NAMESPACE.equals(namespace)) {
                    this.adminAttributes.setManagementPort(port);
                } else {
                    this.adminAttributes.setPort(port);
                }
                if (this.zkConnection.isConnected() && this.adminRepository.exists(this.adminAttributes.getId())) {
                    this.adminRepository.update(new Admin(this.adminAttributes.getId(), this.adminAttributes));
                }
            }
        }
    }

    private String getId() {
        return this.applicationContext.getId();
    }

    private synchronized void requestLeadership(CuratorFramework client) {
        try {
            Paths.ensurePath(client, "deployments/modules");
            Paths.ensurePath(client, "deployments/streams");
            Paths.ensurePath(client, "deployments/jobs");
            Paths.ensurePath(client, "admins");
            Paths.ensurePath(client, "containers");
            Paths.ensurePath(client, "streams");
            Paths.ensurePath(client, "jobs");
            if (this.leaderSelector == null) {
                this.leaderSelector = new LeaderSelector(client, Paths.build("adminelection"), this.leaderListener);
                this.leaderSelector.setId(this.getId());
                this.leaderSelector.start();
            }
        }
        catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
    }

    public void destroy() {
        if (this.leaderSelector != null) {
            this.leaderSelector.close();
            this.leaderSelector = null;
        }
    }

    private PathChildrenCache instantiatePathChildrenCache(CuratorFramework client, String path) {
        return new PathChildrenCache(client, path, true, false, (ExecutorService)this.executorService);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void registerWithZooKeeper(CuratorFramework client) {
        try {
            String containerId = this.adminAttributes.getId();
            String containerPath = Paths.build("admins", containerId);
            Stat containerPathStat = (Stat)client.checkExists().forPath(containerPath);
            if (containerPathStat != null) {
                long currSession;
                long prevSession = containerPathStat.getEphemeralOwner();
                if (prevSession == (currSession = client.getZookeeperClient().getZooKeeper().getSessionId())) {
                    logger.info(String.format("Existing registration for admin runtime %s with session 0x%x detected", containerId, currSession));
                    return;
                }
                logger.info(String.format("Trying to delete previous registration for admin runtime %s with session %x detected; current session: 0x%x; path: %s", containerId, prevSession, currSession, containerPath));
                try {
                    client.delete().forPath(containerPath);
                }
                catch (Exception e) {
                    ZooKeeperUtils.wrapAndThrowIgnoring(e, KeeperException.NoNodeException.class);
                }
            }
            AdminAttributes adminAttributes = this.adminAttributes;
            synchronized (adminAttributes) {
                this.adminRepository.save(new Admin(containerId, this.adminAttributes));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw ZooKeeperUtils.wrapThrowable(e);
        }
        catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
    }

    class LeaderListener
    extends LeaderSelectorListenerAdapter {
        LeaderListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void takeLeadership(CuratorFramework client) throws Exception {
            logger.info("Leader Admin {} is watching for stream/job deployment requests.", (Object)DeploymentSupervisor.this.getId());
            PathChildrenCache containers = null;
            PathChildrenCache streamDeployments = null;
            PathChildrenCache jobDeployments = null;
            PathChildrenCache moduleDeploymentRequests = null;
            try {
                String requestedModulesPath = Paths.build("deployments/modules", "requested");
                Paths.ensurePath(client, requestedModulesPath);
                String allocatedModulesPath = Paths.build("deployments/modules", "allocated");
                Paths.ensurePath(client, allocatedModulesPath);
                moduleDeploymentRequests = DeploymentSupervisor.this.instantiatePathChildrenCache(client, requestedModulesPath);
                moduleDeploymentRequests.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                streamDeployments = DeploymentSupervisor.this.instantiatePathChildrenCache(client, "deployments/streams");
                streamDeployments.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                jobDeployments = DeploymentSupervisor.this.instantiatePathChildrenCache(client, "deployments/jobs");
                jobDeployments.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                SupervisorElectedEvent supervisorElectedEvent = new SupervisorElectedEvent(moduleDeploymentRequests, streamDeployments, jobDeployments);
                Map listenersMap = DeploymentSupervisor.this.applicationContext.getBeansOfType(SupervisorElectionListener.class);
                for (Map.Entry entry : listenersMap.entrySet()) {
                    ((SupervisorElectionListener)entry.getValue()).onSupervisorElected(supervisorElectedEvent);
                }
                ContainerListener containerListener = new ContainerListener(DeploymentSupervisor.this.zkConnection, DeploymentSupervisor.this.containerRepository, DeploymentSupervisor.this.streamFactory, DeploymentSupervisor.this.jobFactory, streamDeployments, jobDeployments, moduleDeploymentRequests, DeploymentSupervisor.this.containerMatcher, DeploymentSupervisor.this.moduleDeploymentWriter, DeploymentSupervisor.this.stateCalculator, DeploymentSupervisor.this.executorService, DeploymentSupervisor.this.quietPeriod);
                containers = DeploymentSupervisor.this.instantiatePathChildrenCache(client, "containers");
                containers.getListenable().addListener((Object)containerListener);
                containers.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                containerListener.scheduleDepartedContainerDeployer();
                DeploymentSupervisor.this.deploymentQueueForConsumer = new DeploymentQueue(client, DeploymentSupervisor.this.deploymentMessageConsumer, "queue/deployments", DeploymentSupervisor.this.executorService);
                DeploymentSupervisor.this.deploymentQueueForConsumer.start();
                Thread.sleep(Long.MAX_VALUE);
            }
            catch (InterruptedException e) {
                logger.info("Leadership canceled due to thread interrupt");
                Thread.currentThread().interrupt();
            }
            finally {
                if (containers != null) {
                    containers.close();
                }
                if (streamDeployments != null) {
                    streamDeployments.close();
                }
                if (jobDeployments != null) {
                    jobDeployments.close();
                }
                if (moduleDeploymentRequests != null) {
                    moduleDeploymentRequests.close();
                }
                if (DeploymentSupervisor.this.deploymentQueueForConsumer != null) {
                    try {
                        DeploymentSupervisor.this.deploymentQueueForConsumer.destroy();
                    }
                    catch (IOException e) {
                        logger.warn("Exception closing the distributed queue producer " + e);
                    }
                }
            }
        }
    }

    private class ConnectionListener
    implements ZooKeeperConnectionListener {
        private ConnectionListener() {
        }

        @Override
        public void onConnect(CuratorFramework client) {
            logger.info("Admin {} connection established", (Object)DeploymentSupervisor.this.getId());
            DeploymentSupervisor.this.registerWithZooKeeper(client);
            DeploymentSupervisor.this.requestLeadership(client);
        }

        @Override
        public void onResume(CuratorFramework client) {
            logger.info("Admin {} connection resumed, client state: {}", (Object)DeploymentSupervisor.this.getId(), (Object)client.getState());
            DeploymentSupervisor.this.registerWithZooKeeper(client);
            DeploymentSupervisor.this.requestLeadership(client);
        }

        @Override
        public void onDisconnect(CuratorFramework client) {
            logger.info("Admin {} connection terminated", (Object)DeploymentSupervisor.this.getId());
            try {
                DeploymentSupervisor.this.destroy();
            }
            catch (Exception e) {
                logger.warn("exception occurred while closing leader selector", (Throwable)e);
            }
        }

        @Override
        public void onSuspend(CuratorFramework client) {
            logger.info("Admin {} connection suspended", (Object)DeploymentSupervisor.this.getId());
            try {
                DeploymentSupervisor.this.destroy();
            }
            catch (Exception e) {
                logger.warn("exception occurred while closing leader selector", (Throwable)e);
            }
        }
    }
}

