package org.springframework.cloud.dataflow.server.service.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.dataflow.audit.service.AuditRecordService;
import org.springframework.cloud.dataflow.audit.service.AuditServiceUtils;
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.core.AuditActionType;
import org.springframework.cloud.dataflow.core.AuditOperationType;
import org.springframework.cloud.dataflow.core.DataFlowPropertyKeys;
import org.springframework.cloud.dataflow.core.StreamAppDefinition;
import org.springframework.cloud.dataflow.core.StreamDefinition;
import org.springframework.cloud.dataflow.core.StreamDefinitionService;
import org.springframework.cloud.dataflow.core.StreamDeployment;
import org.springframework.cloud.dataflow.core.dsl.ParseException;
import org.springframework.cloud.dataflow.core.dsl.StreamNode;
import org.springframework.cloud.dataflow.rest.SkipperStream;
import org.springframework.cloud.dataflow.rest.UpdateStreamRequest;
import org.springframework.cloud.dataflow.rest.util.DeploymentPropertiesUtils;
import org.springframework.cloud.dataflow.server.controller.StreamAlreadyDeployedException;
import org.springframework.cloud.dataflow.server.controller.StreamAlreadyDeployingException;
import org.springframework.cloud.dataflow.server.controller.support.InvalidStreamDefinitionException;
import org.springframework.cloud.dataflow.server.repository.DuplicateStreamDefinitionException;
import org.springframework.cloud.dataflow.server.repository.NoSuchStreamDefinitionException;
import org.springframework.cloud.dataflow.server.repository.StreamDefinitionRepository;
import org.springframework.cloud.dataflow.server.service.StreamService;
import org.springframework.cloud.dataflow.server.service.StreamValidationService;
import org.springframework.cloud.dataflow.server.service.ValidationStatus;
import org.springframework.cloud.dataflow.server.stream.SkipperStreamDeployer;
import org.springframework.cloud.dataflow.server.stream.StreamDeploymentRequest;
import org.springframework.cloud.deployer.spi.app.DeploymentState;
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
import org.springframework.cloud.skipper.domain.Deployer;
import org.springframework.cloud.skipper.domain.PackageIdentifier;
import org.springframework.cloud.skipper.domain.Release;
import org.springframework.cloud.skipper.domain.SpringCloudDeployerApplicationManifest;
import org.springframework.cloud.skipper.domain.SpringCloudDeployerApplicationManifestReader;
import org.springframework.cloud.skipper.domain.SpringCloudDeployerApplicationSpec;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
import org.yaml.snakeyaml.representer.Representer;

@Transactional
/* loaded from: input_file:BOOT-INF/lib/spring-cloud-dataflow-server-core-2.11.0.jar:org/springframework/cloud/dataflow/server/service/impl/DefaultStreamService.class */
public class DefaultStreamService implements StreamService {
    public static final String DEFAULT_SKIPPER_PACKAGE_VERSION = "1.0.0";
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DefaultStreamService.class);
    private static final Pattern STREAM_NAME_PATTERN = Pattern.compile("[a-zA-Z]([-a-zA-Z0-9]*[a-zA-Z0-9])?");
    private static final String STREAM_NAME_VALIDATION_MSG = "Stream name must consist of alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123')";
    protected final StreamDefinitionRepository streamDefinitionRepository;
    protected final AuditRecordService auditRecordService;
    protected final AuditServiceUtils auditServiceUtils;
    protected final StreamValidationService streamValidationService;
    private final SkipperStreamDeployer skipperStreamDeployer;
    private final AppDeploymentRequestCreator appDeploymentRequestCreator;
    private final StreamDefinitionService streamDefinitionService;

    public DefaultStreamService(StreamDefinitionRepository streamDefinitionRepository, SkipperStreamDeployer skipperStreamDeployer, AppDeploymentRequestCreator appDeploymentRequestCreator, StreamValidationService streamValidationService, AuditRecordService auditRecordService, StreamDefinitionService streamDefinitionService) {
        Assert.notNull(skipperStreamDeployer, "SkipperStreamDeployer must not be null");
        Assert.notNull(appDeploymentRequestCreator, "AppDeploymentRequestCreator must not be null");
        Assert.notNull(streamDefinitionRepository, "StreamDefinitionRepository must not be null");
        Assert.notNull(streamValidationService, "StreamValidationService must not be null");
        Assert.notNull(auditRecordService, "AuditRecordService must not be null");
        Assert.notNull(streamDefinitionService, "StreamDefinitionService must not be null");
        this.skipperStreamDeployer = skipperStreamDeployer;
        this.appDeploymentRequestCreator = appDeploymentRequestCreator;
        this.streamDefinitionRepository = streamDefinitionRepository;
        this.streamValidationService = streamValidationService;
        this.auditRecordService = auditRecordService;
        this.auditServiceUtils = new AuditServiceUtils();
        this.streamDefinitionService = streamDefinitionService;
    }

    private Release doDeployStream(StreamDefinition streamDefinition, Map<String, String> map) {
        Map<String, String> skipperProperties = getSkipperProperties(map);
        if (!skipperProperties.containsKey(SkipperStream.SKIPPER_PACKAGE_VERSION)) {
            skipperProperties.put(SkipperStream.SKIPPER_PACKAGE_VERSION, DEFAULT_SKIPPER_PACKAGE_VERSION);
        }
        Map<String, String> map2 = (Map) map.entrySet().stream().filter(entry -> {
            return !((String) entry.getKey()).startsWith(SkipperStream.SKIPPER_KEY_PREFIX);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        String orDefault = skipperProperties.getOrDefault(SkipperStream.SKIPPER_PLATFORM_NAME, "default");
        String str = (String) platformList().stream().filter(deployer -> {
            return deployer.getName().equalsIgnoreCase(orDefault);
        }).map((v0) -> {
            return v0.getType();
        }).findFirst().orElse("unknown");
        if (str.equals("kubernetes") && !STREAM_NAME_PATTERN.matcher(streamDefinition.getName()).matches()) {
            throw new InvalidStreamDefinitionException(String.format("Stream name %s is invalid. %s", streamDefinition.getName(), STREAM_NAME_VALIDATION_MSG));
        }
        List<AppDeploymentRequest> createRequests = this.appDeploymentRequestCreator.createRequests(streamDefinition, map2, str);
        DeploymentPropertiesUtils.validateSkipperDeploymentProperties(map2);
        Release deployStream = this.skipperStreamDeployer.deployStream(new StreamDeploymentRequest(streamDefinition.getName(), streamDefinition.getDslText(), createRequests, skipperProperties));
        if (deployStream != null) {
            updateStreamDefinitionFromReleaseManifest(streamDefinition.getName(), deployStream.getManifest().getData());
        } else {
            logger.error("Missing skipper release after Stream deploy!");
        }
        return deployStream;
    }

    public DeploymentState doCalculateStreamState(String str) {
        return this.skipperStreamDeployer.streamState(str);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public void undeployStream(String str) {
        StreamDefinition orElseThrow = this.streamDefinitionRepository.findById(str).orElseThrow(() -> {
            return new NoSuchStreamDefinitionException(str);
        });
        this.skipperStreamDeployer.undeployStream(str);
        this.auditRecordService.populateAndSaveAuditRecord(AuditOperationType.STREAM, AuditActionType.UNDEPLOY, orElseThrow.getName(), this.streamDefinitionService.redactDsl(orElseThrow), null);
    }

    private void updateStreamDefinitionFromReleaseManifest(String str, String str2) {
        List<SpringCloudDeployerApplicationManifest> read = new SpringCloudDeployerApplicationManifestReader().read(str2);
        HashMap hashMap = new HashMap();
        for (SpringCloudDeployerApplicationManifest springCloudDeployerApplicationManifest : read) {
            hashMap.put(springCloudDeployerApplicationManifest.getSpec().getApplicationProperties().get(DataFlowPropertyKeys.STREAM_APP_LABEL), springCloudDeployerApplicationManifest);
        }
        StreamDefinition orElseThrow = this.streamDefinitionRepository.findById(str).orElseThrow(() -> {
            return new NoSuchStreamDefinitionException(str);
        });
        LinkedList<StreamAppDefinition> linkedList = new LinkedList<>();
        Iterator<StreamAppDefinition> it = this.streamDefinitionService.getAppDefinitions(orElseThrow).iterator();
        while (it.hasNext()) {
            StreamAppDefinition next = it.next();
            StreamAppDefinition.Builder from = StreamAppDefinition.Builder.from(next);
            from.setProperties(((SpringCloudDeployerApplicationManifest) hashMap.get(next.getName())).getSpec().getApplicationProperties());
            linkedList.addLast(from.build(orElseThrow.getName()));
        }
        StreamDefinition streamDefinition = new StreamDefinition(str, this.streamDefinitionService.constructDsl(orElseThrow.getDslText(), linkedList), orElseThrow.getOriginalDslText(), orElseThrow.getDescription());
        logger.debug("Updated StreamDefinition: " + streamDefinition);
        this.streamDefinitionRepository.delete(streamDefinition);
        this.streamDefinitionRepository.save(streamDefinition);
        this.auditRecordService.populateAndSaveAuditRecord(AuditOperationType.STREAM, AuditActionType.UPDATE, str, this.streamDefinitionService.redactDsl(streamDefinition), null);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public void scaleApplicationInstances(String str, String str2, int i, Map<String, String> map) {
        logger.info("Scale {}:{} to {} with properties: {}", str, str2, Integer.valueOf(i), map);
        this.skipperStreamDeployer.scale(str, str2, i, map);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public void updateStream(String str, UpdateStreamRequest updateStreamRequest) {
        updateStream(str, updateStreamRequest.getReleaseName(), updateStreamRequest.getPackageIdentifier(), updateStreamRequest.getUpdateProperties(), updateStreamRequest.isForce(), updateStreamRequest.getAppNames());
    }

    public void updateStream(String str, String str2, PackageIdentifier packageIdentifier, Map<String, String> map, boolean z, List<String> list) {
        StreamDefinition orElseThrow = this.streamDefinitionRepository.findById(str).orElseThrow(() -> {
            return new NoSuchStreamDefinitionException(str);
        });
        Release upgradeStream = this.skipperStreamDeployer.upgradeStream(str2, packageIdentifier, convertPropertiesToSkipperYaml(orElseThrow, map), z, list);
        if (upgradeStream == null) {
            logger.error("Missing release after Stream Update!");
            return;
        }
        updateStreamDefinitionFromReleaseManifest(str, upgradeStream.getManifest().getData());
        String convertPropertiesToSkipperYaml = convertPropertiesToSkipperYaml(orElseThrow, this.auditServiceUtils.sanitizeProperties(map));
        HashMap hashMap = new HashMap(3);
        hashMap.put("releaseName", str2);
        hashMap.put("packageIdentifier", packageIdentifier);
        hashMap.put("updateYaml", convertPropertiesToSkipperYaml);
        this.auditRecordService.populateAndSaveAuditRecordUsingMapData(AuditOperationType.STREAM, AuditActionType.UPDATE, str, hashMap, upgradeStream.getPlatformName());
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public void rollbackStream(String str, int i) {
        Assert.isTrue(StringUtils.hasText(str), "Stream name must not be null");
        Release rollbackStream = this.skipperStreamDeployer.rollbackStream(str, i);
        String str2 = null;
        if (rollbackStream != null) {
            str2 = rollbackStream.getPlatformName();
            if (rollbackStream.getManifest() != null) {
                updateStreamDefinitionFromReleaseManifest(str, rollbackStream.getManifest().getData());
            }
        }
        this.auditRecordService.populateAndSaveAuditRecord(AuditOperationType.STREAM, AuditActionType.ROLLBACK, str, "Rollback to version: " + i, str2);
    }

    String convertPropertiesToSkipperYaml(StreamDefinition streamDefinition, Map<String, String> map) {
        List<AppDeploymentRequest> createUpdateRequests = this.appDeploymentRequestCreator.createUpdateRequests(streamDefinition, map);
        HashMap hashMap = new HashMap();
        for (AppDeploymentRequest appDeploymentRequest : createUpdateRequests) {
            boolean z = false;
            String name = appDeploymentRequest.getDefinition().getName();
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            if (!appDeploymentRequest.getDefinition().getProperties().isEmpty()) {
                z = true;
                hashMap3.put(SpringCloudDeployerApplicationSpec.APPLICATION_PROPERTIES_STRING, appDeploymentRequest.getDefinition().getProperties());
            }
            if (!appDeploymentRequest.getDeploymentProperties().isEmpty()) {
                z = true;
                hashMap3.put("deploymentProperties", appDeploymentRequest.getDeploymentProperties());
            }
            if (appDeploymentRequest.getCommandlineArguments().size() == 1) {
                z = true;
                String str = appDeploymentRequest.getCommandlineArguments().get(0);
                this.skipperStreamDeployer.validateAppVersionIsRegistered(streamDefinition, appDeploymentRequest, str);
                hashMap3.put("version", str);
            }
            if (z) {
                hashMap2.put("spec", hashMap3);
            }
            if (!hashMap2.isEmpty()) {
                hashMap.put(name, hashMap2);
            }
        }
        if (hashMap.isEmpty()) {
            return "";
        }
        DumperOptions dumperOptions = new DumperOptions();
        dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
        dumperOptions.setPrettyFlow(true);
        dumperOptions.setLineBreak(DumperOptions.LineBreak.getPlatformLineBreak());
        return new Yaml(new SafeConstructor(new LoaderOptions()), new Representer(dumperOptions), dumperOptions).dump(hashMap);
    }

    private Map<String, String> getSkipperProperties(Map<String, String> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(SkipperStream.SKIPPER_KEY_PREFIX);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public Map<StreamDefinition, DeploymentState> state(List<StreamDefinition> list) {
        return this.skipperStreamDeployer.streamsStates(list);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public String manifest(String str, int i) {
        return this.skipperStreamDeployer.manifest(str, i);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public Collection<Release> history(String str) {
        return this.skipperStreamDeployer.history(str);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public Collection<Deployer> platformList() {
        return this.skipperStreamDeployer.platformList();
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public StreamDeployment info(String str) {
        return this.skipperStreamDeployer.getStreamInfo(str);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public StreamDefinition createStream(String str, String str2, String str3, boolean z, Map<String, String> map) {
        StreamDefinition createStreamDefinition = createStreamDefinition(str, str2, str3);
        ArrayList arrayList = new ArrayList();
        Iterator<StreamAppDefinition> it = this.streamDefinitionService.getAppDefinitions(createStreamDefinition).iterator();
        while (it.hasNext()) {
            StreamAppDefinition next = it.next();
            String registeredAppName = next.getRegisteredAppName();
            ApplicationType applicationType = next.getApplicationType();
            if (!this.streamValidationService.isRegistered(registeredAppName, applicationType)) {
                arrayList.add(String.format("Application name '%s' with type '%s' does not exist in the app registry.", registeredAppName, applicationType));
            }
        }
        if (!arrayList.isEmpty()) {
            throw new InvalidStreamDefinitionException(StringUtils.collectionToDelimitedString(arrayList, "\n"));
        }
        if (this.streamDefinitionRepository.existsById(str)) {
            throw new DuplicateStreamDefinitionException(String.format("Cannot create stream %s because another one has already been created with the same name", str));
        }
        if (z) {
            deployStream(str, map);
        }
        this.auditRecordService.populateAndSaveAuditRecord(AuditOperationType.STREAM, AuditActionType.CREATE, createStreamDefinition.getName(), this.streamDefinitionService.redactDsl(createStreamDefinition), null);
        return createStreamDefinition;
    }

    public StreamDefinition createStreamDefinition(String str, String str2, String str3) {
        try {
            StreamDefinition streamDefinition = new StreamDefinition(str, str2, str2, str3);
            this.streamDefinitionService.parse(streamDefinition);
            return streamDefinition;
        } catch (ParseException e) {
            throw new InvalidStreamDefinitionException(e.getMessage());
        }
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public void deployStream(String str, Map<String, String> map) {
        if (map == null) {
            map = new HashMap();
        }
        StreamDefinition orElseThrow = this.streamDefinitionRepository.findById(str).orElseThrow(() -> {
            return new NoSuchStreamDefinitionException(str);
        });
        DeploymentState doCalculateStreamState = doCalculateStreamState(str);
        if (DeploymentState.deployed == doCalculateStreamState) {
            throw new StreamAlreadyDeployedException(str);
        }
        if (DeploymentState.deploying == doCalculateStreamState) {
            throw new StreamAlreadyDeployingException(str);
        }
        Release doDeployStream = doDeployStream(orElseThrow, map);
        this.auditRecordService.populateAndSaveAuditRecordUsingMapData(AuditOperationType.STREAM, AuditActionType.DEPLOY, orElseThrow.getName(), this.auditServiceUtils.convertStreamDefinitionToAuditData(this.streamDefinitionService.redactDsl(orElseThrow), map), doDeployStream == null ? null : doDeployStream.getPlatformName());
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public void deleteStream(String str) {
        StreamDefinition orElseThrow = this.streamDefinitionRepository.findById(str).orElseThrow(() -> {
            return new NoSuchStreamDefinitionException(str);
        });
        undeployStream(str);
        this.streamDefinitionRepository.deleteById(str);
        this.auditRecordService.populateAndSaveAuditRecord(AuditOperationType.STREAM, AuditActionType.DELETE, orElseThrow.getName(), this.streamDefinitionService.redactDsl(orElseThrow), null);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public void deleteAll() {
        Iterable<StreamDefinition> findAll = this.streamDefinitionRepository.findAll();
        Iterator<StreamDefinition> it = findAll.iterator();
        while (it.hasNext()) {
            undeployStream(it.next().getName());
        }
        this.streamDefinitionRepository.deleteAll();
        for (StreamDefinition streamDefinition : findAll) {
            this.auditRecordService.populateAndSaveAuditRecord(AuditOperationType.STREAM, AuditActionType.DELETE, streamDefinition.getName(), this.streamDefinitionService.redactDsl(streamDefinition), null);
        }
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public List<StreamDefinition> findRelatedStreams(String str, boolean z) {
        return new ArrayList(findRelatedDefinitions(this.streamDefinitionRepository.findById(str).orElseThrow(() -> {
            return new NoSuchStreamDefinitionException(str);
        }), this.streamDefinitionRepository.findAll(), new LinkedHashSet(), z));
    }

    private Set<StreamDefinition> findRelatedDefinitions(StreamDefinition streamDefinition, Iterable<StreamDefinition> iterable, Set<StreamDefinition> set, boolean z) {
        set.add(streamDefinition);
        String name = streamDefinition.getName();
        String str = name + ".";
        for (StreamDefinition streamDefinition2 : iterable) {
            StreamNode parse = this.streamDefinitionService.parse(streamDefinition2);
            if (parse.getSourceDestinationNode() != null) {
                String destinationName = parse.getSourceDestinationNode().getDestinationName();
                if (destinationName.equals(name) || destinationName.startsWith(str)) {
                    boolean add = set.add(streamDefinition2);
                    if (z && add) {
                        findRelatedDefinitions(streamDefinition2, iterable, set, true);
                    }
                }
            }
        }
        return set;
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public Page<StreamDefinition> findDefinitionByNameContains(Pageable pageable, String str) {
        return StringUtils.hasLength(str) ? this.streamDefinitionRepository.findByNameContains(str, pageable) : this.streamDefinitionRepository.findAll(pageable);
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public StreamDefinition findOne(String str) {
        return this.streamDefinitionRepository.findById(str).orElseThrow(() -> {
            return new NoSuchStreamDefinitionException(str);
        });
    }

    @Override // org.springframework.cloud.dataflow.server.service.StreamService
    public ValidationStatus validateStream(String str) {
        return this.streamValidationService.validateStream(str);
    }
}
