/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CreateBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.xd.dirt.server.admin.deployment.DeploymentHandler;
import org.springframework.xd.dirt.stream.AbstractInstancePersistingDeployer;
import org.springframework.xd.dirt.stream.ParsingContext;
import org.springframework.xd.dirt.stream.Stream;
import org.springframework.xd.dirt.stream.StreamDefinition;
import org.springframework.xd.dirt.stream.StreamDefinitionRepository;
import org.springframework.xd.dirt.stream.StreamRepository;
import org.springframework.xd.dirt.stream.XDParser;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.ModuleDefinition;
import org.springframework.xd.module.ModuleDescriptor;

public class StreamDeployer
extends AbstractInstancePersistingDeployer<StreamDefinition, Stream> {
    private static final Logger logger = LoggerFactory.getLogger(StreamDeployer.class);
    private static final TypeReference<List<ModuleDefinition>> MODULE_DEFINITIONS_LIST = new TypeReference<List<ModuleDefinition>>(){};
    private final ObjectWriter objectWriter = new ObjectMapper().writerWithType(MODULE_DEFINITIONS_LIST);
    private static final String DEFINITION_KEY = "definition";
    private static final String MODULE_DEFINITIONS_KEY = "moduleDefinitions";
    private final ZooKeeperConnection zkConnection;
    private final XDParser parser;

    public StreamDeployer(ZooKeeperConnection zkConnection, StreamDefinitionRepository repository, StreamRepository streamRepository, XDParser parser, DeploymentHandler deploymentHandler) {
        super(zkConnection, repository, streamRepository, parser, deploymentHandler, ParsingContext.stream);
        this.zkConnection = zkConnection;
        this.parser = parser;
    }

    @Override
    protected Stream makeInstance(StreamDefinition definition) {
        return new Stream(definition);
    }

    @Override
    protected StreamDefinition createDefinition(String name, String definition) {
        return new StreamDefinition(name, definition);
    }

    @Override
    protected String getDeploymentPath(StreamDefinition definition) {
        return Paths.build("deployments/streams", definition.getName());
    }

    @PostConstruct
    private void updateModuleDefinitions() {
        if (this.zkConnection.getClient() != null) {
            try {
                CuratorFramework client = this.zkConnection.getClient();
                if (client.checkExists().forPath("streams") != null) {
                    for (StreamDefinition definition : this.findAll()) {
                        this.setModuleDefinitions(client, definition);
                    }
                }
            }
            catch (Exception e) {
                logger.error("Exception migrating stream definitions. This migration is done when the existing stream definitions that don't have module definitions set.", (Throwable)e);
            }
        }
    }

    private void setModuleDefinitions(CuratorFramework client, StreamDefinition definition) {
        String streamName = definition.getName();
        String path = Paths.build("streams", streamName);
        try {
            List<ModuleDescriptor> moduleDescriptors;
            List<ModuleDefinition> moduleDefinitions;
            Map<String, String> map;
            byte[] bytes = (byte[])client.getData().forPath(path);
            if (bytes != null && (map = ZooKeeperUtils.bytesToMap(bytes)).get(MODULE_DEFINITIONS_KEY) == null && !(moduleDefinitions = this.createModuleDefinitions(moduleDescriptors = this.parser.parse(streamName, definition.getDefinition(), this.definitionKind))).isEmpty()) {
                map.put(DEFINITION_KEY, definition.getDefinition());
                try {
                    map.put(MODULE_DEFINITIONS_KEY, this.objectWriter.writeValueAsString(moduleDefinitions));
                    byte[] binary = ZooKeeperUtils.mapToBytes(map);
                    CreateBuilder op = client.checkExists().forPath(path) == null ? client.create() : client.setData();
                    op.forPath(path, binary);
                }
                catch (JsonProcessingException jpe) {
                    logger.error("Exception writing module definitions " + moduleDefinitions + " for the stream " + streamName, (Throwable)jpe);
                }
            }
        }
        catch (Exception e) {
            logger.error("Exception when updating module definitions for the stream " + streamName, (Throwable)e);
        }
    }
}

