/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.stream.zookeeper;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.core.DeploymentUnitStatus;
import org.springframework.xd.dirt.core.StreamDeploymentsPath;
import org.springframework.xd.dirt.stream.Stream;
import org.springframework.xd.dirt.stream.StreamDefinition;
import org.springframework.xd.dirt.stream.StreamRepository;
import org.springframework.xd.dirt.stream.zookeeper.RepositoryConnectionListener;
import org.springframework.xd.dirt.util.PagingUtility;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;

public class ZooKeeperStreamRepository
implements StreamRepository,
InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStreamRepository.class);
    private final ZooKeeperConnection zkConnection;
    private final PagingUtility<Stream> pagingUtility = new PagingUtility();
    private final RepositoryConnectionListener connectionListener = new RepositoryConnectionListener();

    @Autowired
    public ZooKeeperStreamRepository(ZooKeeperConnection zkConnection) {
        this.zkConnection = zkConnection;
    }

    public void afterPropertiesSet() throws Exception {
        this.zkConnection.addListener(this.connectionListener);
        if (this.zkConnection.isConnected()) {
            this.connectionListener.onConnect(this.zkConnection.getClient());
        }
    }

    public Iterable<Stream> findAll(Sort sort) {
        return this.findAll();
    }

    public Page<Stream> findAll(Pageable pageable) {
        return this.pagingUtility.getPagedData(pageable, (List<Stream>)this.findAll());
    }

    public <S extends Stream> S save(S entity) {
        return entity;
    }

    public <S extends Stream> Iterable<S> save(Iterable<S> entities) {
        return entities;
    }

    public Stream findOne(String id) {
        CuratorFramework client = this.zkConnection.getClient();
        String path = Paths.build("streams", id);
        try {
            Stat definitionStat = (Stat)client.checkExists().forPath(path);
            if (definitionStat != null) {
                byte[] data = (byte[])client.getData().forPath(path);
                Map<String, String> map = ZooKeeperUtils.bytesToMap(data);
                Stream stream = new Stream(new StreamDefinition(id, map.get("definition")));
                Stat deployStat = (Stat)client.checkExists().forPath(Paths.build("deployments/streams", id));
                if (deployStat != null) {
                    stream.setStartedAt(new Date(deployStat.getCtime()));
                    stream.setStatus(this.getDeploymentStatus(id));
                    return stream;
                }
            }
        }
        catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
        return null;
    }

    public boolean exists(String id) {
        return null != this.findOne(id);
    }

    public List<Stream> findAll() {
        try {
            return this.findAll((Iterable)this.zkConnection.getClient().getChildren().forPath("deployments/streams"));
        }
        catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
    }

    public List<Stream> findAll(Iterable<String> ids) {
        ArrayList<Stream> results = new ArrayList<Stream>();
        try {
            for (String stream : ids) {
                Stream s = this.findOne(stream);
                if (s == null) continue;
                results.add(s);
            }
        }
        catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
        return results;
    }

    public long count() {
        try {
            Stat stat = (Stat)this.zkConnection.getClient().checkExists().forPath("deployments/streams");
            return stat != null ? (long)stat.getNumChildren() : 0L;
        }
        catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
    }

    public void delete(String id) {
        logger.info("Undeploying stream {}", (Object)id);
        String streamDeploymentPath = Paths.build("deployments/streams", id);
        String streamModuleDeploymentPath = Paths.build(streamDeploymentPath, "modules");
        CuratorFramework client = this.zkConnection.getClient();
        ArrayDeque<String> paths = new ArrayDeque<String>();
        try {
            client.setData().forPath(Paths.build("deployments/streams", id, "status"), ZooKeeperUtils.mapToBytes(new DeploymentUnitStatus(DeploymentUnitStatus.State.undeploying).toMap()));
        }
        catch (Exception e) {
            logger.warn("Exception while transitioning stream {} state to {}", new Object[]{id, DeploymentUnitStatus.State.undeploying, e});
        }
        TreeMap<Long, String> txMap = new TreeMap<Long, String>();
        try {
            List deployments = (List)client.getChildren().forPath(streamModuleDeploymentPath);
            for (String deployment : deployments) {
                String path = new StreamDeploymentsPath(Paths.build(streamModuleDeploymentPath, deployment)).build();
                Stat stat = (Stat)client.checkExists().forPath(path);
                Assert.notNull((Object)stat);
                txMap.put(stat.getCzxid(), path);
            }
        }
        catch (Exception e) {
            ZooKeeperUtils.wrapAndThrowIgnoring(e, KeeperException.NoNodeException.class);
        }
        for (String deployment : txMap.values()) {
            paths.add(deployment);
        }
        Iterator iterator = paths.descendingIterator();
        while (iterator.hasNext()) {
            try {
                String path = (String)iterator.next();
                logger.trace("removing path {}", (Object)path);
                client.delete().deletingChildrenIfNeeded().forPath(path);
            }
            catch (Exception e) {
                ZooKeeperUtils.wrapAndThrowIgnoring(e, KeeperException.NoNodeException.class);
            }
        }
        try {
            client.delete().deletingChildrenIfNeeded().forPath(streamDeploymentPath);
        }
        catch (KeeperException.NotEmptyException e) {
            ArrayList<String> children = new ArrayList<String>();
            try {
                children.addAll((Collection)client.getChildren().forPath(streamModuleDeploymentPath));
            }
            catch (Exception ex) {
                children.add("Could not load list of children due to " + ex);
            }
            throw new IllegalStateException(String.format("The following children were not deleted from %s: %s", streamModuleDeploymentPath, children), e);
        }
        catch (Exception e) {
            ZooKeeperUtils.wrapAndThrowIgnoring(e, KeeperException.NoNodeException.class);
        }
    }

    public void delete(Stream entity) {
        Assert.notNull((Object)entity, (String)"stream must not be null");
        this.delete(((StreamDefinition)entity.getDefinition()).getName());
    }

    public void delete(Iterable<? extends Stream> entities) {
        for (Stream stream : entities) {
            this.delete(stream);
        }
    }

    public void deleteAll() {
        try {
            List children = (List)this.zkConnection.getClient().getChildren().forPath("deployments/streams");
            for (String child : children) {
                this.delete(child);
            }
        }
        catch (Exception e) {
            ZooKeeperUtils.wrapAndThrowIgnoring(e, KeeperException.NoNodeException.class);
        }
    }

    public Iterable<Stream> findAllInRange(String from, boolean fromInclusive, String to, boolean toInclusive) {
        Stream stream;
        Iterable all = this.findAll();
        if (CollectionUtils.isEmpty((Collection)all)) {
            return Collections.emptyList();
        }
        Collections.sort(all);
        ArrayList<Stream> results = new ArrayList<Stream>();
        Iterator i$ = all.iterator();
        while (i$.hasNext() && ((StreamDefinition)(stream = (Stream)i$.next()).getDefinition()).getName().compareTo(to) <= 0) {
            if (((StreamDefinition)stream.getDefinition()).getName().compareTo(from) < 0) continue;
            results.add(stream);
        }
        return results;
    }

    @Override
    public DeploymentUnitStatus getDeploymentStatus(String id) {
        String path = Paths.build("deployments/streams", id, "status");
        byte[] statusBytes = null;
        try {
            statusBytes = (byte[])this.zkConnection.getClient().getData().forPath(path);
        }
        catch (Exception e) {
            ZooKeeperUtils.wrapAndThrowIgnoring(e, KeeperException.NoNodeException.class);
        }
        return statusBytes == null ? new DeploymentUnitStatus(DeploymentUnitStatus.State.undeployed) : new DeploymentUnitStatus(ZooKeeperUtils.bytesToMap(statusBytes));
    }
}

