/*
 * Decompiled with CFR 0.152.
 */
package uk.gov.dstl.baleen.services;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.annot8.api.components.Resource;
import io.annot8.api.context.Context;
import io.annot8.api.data.ItemFactory;
import io.annot8.api.pipelines.ErrorConfiguration;
import io.annot8.api.pipelines.ItemStatus;
import io.annot8.api.pipelines.PipelineDescriptor;
import io.annot8.api.pipelines.PipelineRunner;
import io.annot8.common.components.logging.Logging;
import io.annot8.common.components.metering.Metering;
import io.annot8.implementations.pipeline.InMemoryPipelineRunner;
import io.annot8.implementations.reference.factories.DefaultItemFactory;
import io.annot8.implementations.support.context.SimpleContext;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import uk.gov.dstl.annot8.baleen.MutablePipelineDescriptor;
import uk.gov.dstl.annot8.baleen.RestApi;
import uk.gov.dstl.annot8.baleen.RestApiQueue;
import uk.gov.dstl.annot8.baleen.SubmittedData;
import uk.gov.dstl.baleen.data.PipelineHolder;
import uk.gov.dstl.baleen.data.PipelineMetadata;
import uk.gov.dstl.baleen.exceptions.AlreadyExistsException;
import uk.gov.dstl.baleen.exceptions.BadRequestException;
import uk.gov.dstl.baleen.exceptions.PipelineNotFoundException;
import uk.gov.dstl.baleen.logging.BaleenLoggerFactory;
import uk.gov.dstl.baleen.services.Annot8ComponentService;

@Service
public class PipelineService {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipelineService.class);
    @Value(value="${baleen.persistence}")
    private File persistenceFolder;
    @Value(value="${baleen.persistence}/${baleen.stopped}")
    private File stoppedState;
    @Value(value="${baleen.logging.max}")
    private Integer loggingMax;
    @Value(value="${baleen.pipeline.delay}")
    private Long pipelineDelay;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private Annot8ComponentService annot8ComponentService;
    private WatchService watchService = null;
    private final Map<String, PipelineHolder> pipelines = new HashMap<String, PipelineHolder>();
    private final Map<String, RestApiQueue> queues = new HashMap<String, RestApiQueue>();
    private final Map<String, File> persistedPipelines = new HashMap<String, File>();

    public PipelineService() {
        try {
            this.watchService = FileSystems.getDefault().newWatchService();
        }
        catch (IOException e) {
            LOGGER.warn("Unable to create WatchService - files added/removed to the persistence folder will not be detected", (Throwable)e);
        }
    }

    @PostConstruct
    public void startPersistedPipelines() {
        File[] files;
        if (!this.createPersistenceFolder()) {
            return;
        }
        if (!this.persistenceFolder.isDirectory()) {
            LOGGER.warn("Persistence folder {} is not a directory", (Object)this.persistenceFolder);
            return;
        }
        if (!this.persistenceFolder.canRead()) {
            LOGGER.warn("Can not read from persistence folder {}", (Object)this.persistenceFolder);
            return;
        }
        if (this.watchService != null) {
            try {
                this.persistenceFolder.toPath().register(this.watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE);
            }
            catch (IOException e) {
                LOGGER.warn("Unable to create WatchKey - files added/removed to the persistence folder will not be detected", (Throwable)e);
            }
        }
        if (!this.persistenceFolder.canWrite()) {
            LOGGER.warn("Can not write to persistence folder {} - existing pipelines will be created but new pipelines will not be persisted", (Object)this.persistenceFolder);
        }
        if ((files = this.persistenceFolder.listFiles((dir, name) -> name.toLowerCase().endsWith(".json"))) == null) {
            LOGGER.warn("Can not retrieve JSON files from persistence folder {}", (Object)this.persistenceFolder);
            return;
        }
        for (File f : files) {
            LOGGER.info("Recreating persisted pipeline from file {}", (Object)f);
            this.createPipelineFromFile(f);
        }
    }

    private boolean createPipelineFromFile(File f) {
        PipelineDescriptor configuration;
        try (FileReader reader = new FileReader(f);){
            configuration = (PipelineDescriptor)this.objectMapper.readValue((Reader)reader, MutablePipelineDescriptor.class);
        }
        catch (Exception e) {
            LOGGER.error("Unable to parse file {}", (Object)f, (Object)e);
            return false;
        }
        try {
            this.createPipeline(configuration);
            this.persistedPipelines.put(configuration.getName(), f.getCanonicalFile());
        }
        catch (Exception e) {
            LOGGER.error("Unable to create pipeline {} from file {}", new Object[]{configuration.getName(), f, e});
            return false;
        }
        return true;
    }

    @Scheduled(fixedDelay=5000L)
    public void detectChanges() {
        WatchKey key;
        if (this.watchService == null) {
            return;
        }
        LOGGER.debug("Checking persistence folder for changes");
        while ((key = this.watchService.poll()) != null) {
            for (WatchEvent<?> event : key.pollEvents()) {
                String pipelineName;
                Path path = (Path)event.context();
                if (!path.toString().toLowerCase().endsWith(".json")) continue;
                LOGGER.info("{} event detected on path {}", event.kind(), (Object)path);
                File f = new File(this.persistenceFolder, path.getFileName().toString());
                if (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind())) {
                    pipelineName = this.getPipelineName(f);
                    if (pipelineName != null && this.pipelines.containsKey(pipelineName)) {
                        LOGGER.info("Stopping existing pipeline {} for modified file {}", (Object)pipelineName, (Object)f.getName());
                        this.stopPipeline(pipelineName);
                    }
                    LOGGER.info("Creating pipeline for file {}", (Object)f.getName());
                    this.createPipelineFromFile(f);
                    continue;
                }
                if (!StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind())) continue;
                pipelineName = this.getPipelineName(f);
                if (pipelineName == null) {
                    LOGGER.info("Could not determine pipeline name for file {}", (Object)f.getName());
                    continue;
                }
                LOGGER.info("Stopping pipeline {} as persisted file {} was deleted", (Object)pipelineName, (Object)f.getName());
                this.persistedPipelines.remove(pipelineName);
                this.stopPipeline(pipelineName);
            }
            key.reset();
        }
    }

    private String getPipelineName(File f) {
        for (Map.Entry<String, File> e : this.persistedPipelines.entrySet()) {
            if (!e.getValue().getName().equals(f.getName())) continue;
            return e.getKey();
        }
        return null;
    }

    public void createPipeline(PipelineDescriptor descriptor) {
        if (this.pipelines.containsKey(descriptor.getName())) {
            throw new AlreadyExistsException("Pipeline '" + descriptor.getName() + "' already exists");
        }
        LOGGER.info("Creating pipeline {}", (Object)descriptor.getName());
        PipelineHolder holder = new PipelineHolder(descriptor, this.loggingMax);
        this.pipelines.put(descriptor.getName(), holder);
        if (this.getPipelineState().contains(descriptor.getName())) {
            LOGGER.info("Pipeline {} was previously in stopped state, and will not be started", (Object)descriptor.getName());
            return;
        }
        this.startPipeline(descriptor.getName());
    }

    public Set<String> getPipelineNames() {
        return new TreeSet<String>(this.pipelines.keySet());
    }

    public List<PipelineMetadata> getPipelinesMetadata() {
        return this.pipelines.values().stream().map(PipelineMetadata::new).sorted(Comparator.comparing(PipelineMetadata::getName)).collect(Collectors.toList());
    }

    public PipelineHolder getPipeline(String pipelineName) {
        if (!this.pipelines.containsKey(pipelineName)) {
            throw new PipelineNotFoundException();
        }
        return this.pipelines.get(pipelineName);
    }

    public void startPipeline(String pipelineName) {
        SimpleContext context;
        if (!this.pipelines.containsKey(pipelineName)) {
            throw new PipelineNotFoundException();
        }
        PipelineHolder holder = this.pipelines.get(pipelineName);
        if (holder.isRunning()) {
            return;
        }
        LOGGER.info("Starting pipeline {}", (Object)pipelineName);
        PipelineDescriptor descriptor = holder.getDescriptor();
        LOGGER.debug("Creating resources and context for pipeline {}", (Object)descriptor.getName());
        DefaultItemFactory itemFactory = new DefaultItemFactory(this.annot8ComponentService.getContentBuilderFactoryRegistry());
        Logging logging = Logging.useILoggerFactory((ILoggerFactory)new BaleenLoggerFactory(holder.getLogEntries()));
        Metering metering = Metering.useMeterRegistry((MeterRegistry)holder.getMeterRegistry(), null);
        if (descriptor.getSources().stream().anyMatch(sd -> sd instanceof RestApi)) {
            RestApiQueue resource = new RestApiQueue();
            this.queues.put(descriptor.getName(), resource);
            context = new SimpleContext(new Resource[]{logging, metering, resource});
        } else {
            context = new SimpleContext(new Resource[]{logging, metering});
        }
        LOGGER.debug("Determining error configuration for pipeline {}", (Object)descriptor.getName());
        ErrorConfiguration errorConfiguration = descriptor.getErrorConfiguration();
        if (errorConfiguration == null) {
            errorConfiguration = new ErrorConfiguration();
        }
        LOGGER.info("Error configuration for pipeline {} on source errors is {}", (Object)descriptor.getName(), (Object)errorConfiguration.getOnSourceError());
        LOGGER.info("Error configuration for pipeline {} on processing errors is {}", (Object)descriptor.getName(), (Object)errorConfiguration.getOnProcessorError());
        LOGGER.info("Error configuration for pipeline {} on item errors is {}", (Object)descriptor.getName(), (Object)errorConfiguration.getOnItemError());
        LOGGER.debug("Creating runner for pipeline {}", (Object)descriptor.getName());
        InMemoryPipelineRunner runner = new InMemoryPipelineRunner.Builder().withPipelineDescriptor(descriptor).withItemFactory((ItemFactory)itemFactory).withContext((Context)context).withDelay(this.pipelineDelay.longValue()).withErrorConfiguration(errorConfiguration).withItemState(holder.getPipelineItemState()).build();
        holder.setPipelineRunner((PipelineRunner)runner);
        Thread t = new Thread((Runnable)runner);
        t.start();
        LOGGER.info("Pipeline {} started on thread {}", (Object)descriptor.getName(), (Object)t.getName());
        this.updatePipelineState();
    }

    public void stopPipeline(String pipelineName) {
        if (!this.pipelines.containsKey(pipelineName)) {
            throw new PipelineNotFoundException();
        }
        PipelineHolder holder = this.pipelines.get(pipelineName);
        if (!holder.isRunning()) {
            return;
        }
        LOGGER.info("Stopping pipeline {}", (Object)pipelineName);
        holder.getPipelineRunner().stop();
        holder.setPipelineRunner(null);
        this.queues.remove(pipelineName);
        holder.getLogEntries().clear();
        holder.getMeterRegistry().clear();
        LOGGER.info("Pipeline {} stopped", (Object)pipelineName);
        this.updatePipelineState();
    }

    public void deletePipeline(String pipelineName) {
        boolean deleted;
        LOGGER.info("Deleting pipeline {}", (Object)pipelineName);
        this.stopPipeline(pipelineName);
        this.pipelines.remove(pipelineName);
        if (this.persistedPipelines.containsKey(pipelineName) && !(deleted = this.persistedPipelines.remove(pipelineName).delete())) {
            LOGGER.warn("Failed to delete persisted file for {} - file may have already been deleted", (Object)pipelineName);
        }
        LOGGER.info("Pipeline {} deleted", (Object)pipelineName);
        this.updatePipelineState();
    }

    public boolean pipelineExists(String pipelineName) {
        return this.pipelines.containsKey(pipelineName);
    }

    public boolean save(PipelineDescriptor descriptor) {
        String json;
        if (!this.createPersistenceFolder()) {
            LOGGER.error("Pipeline {} will not be persisted as persistence folder does not exist", (Object)descriptor.getName());
            return false;
        }
        if (!this.persistenceFolder.canWrite()) {
            LOGGER.error("Can not write to persistence folder {} - pipeline {} will not be persisted", (Object)this.persistenceFolder, (Object)descriptor.getName());
            return false;
        }
        try {
            json = this.objectMapper.writeValueAsString((Object)descriptor);
        }
        catch (JsonProcessingException e) {
            LOGGER.error("Could not serialize pipeline", (Throwable)e);
            return false;
        }
        LOGGER.info("Persisting pipeline {}", (Object)descriptor.getName());
        try {
            File f = new File(this.persistenceFolder, UUID.randomUUID().toString() + ".json");
            Files.writeString(f.toPath(), (CharSequence)json, StandardCharsets.UTF_8, new OpenOption[0]);
            LOGGER.info("Pipeline {} persisted to {}", (Object)descriptor.getName(), (Object)f.getPath());
            this.persistedPipelines.put(descriptor.getName(), f.getCanonicalFile());
        }
        catch (IOException ioe) {
            LOGGER.error("Can not persist pipeline {}", (Object)descriptor.getName());
            return false;
        }
        return true;
    }

    public void submitData(String pipelineName, SubmittedData data) {
        if (!this.pipelines.containsKey(pipelineName)) {
            throw new PipelineNotFoundException();
        }
        if (!this.queues.containsKey(pipelineName)) {
            throw new BadRequestException("Pipeline is not configured to support REST API");
        }
        LOGGER.debug("Data received via REST API for pipeline {}", (Object)pipelineName);
        this.queues.get(pipelineName).addToQueue(data);
    }

    public uk.gov.dstl.baleen.data.ItemStatus getItemStatus(String pipelineName, String id) {
        if (!this.pipelines.containsKey(pipelineName)) {
            throw new PipelineNotFoundException();
        }
        if (this.queues.containsKey(pipelineName) && this.queues.get(pipelineName).inQueue(id)) {
            return uk.gov.dstl.baleen.data.ItemStatus.QUEUED;
        }
        Optional status = this.pipelines.get(pipelineName).getPipelineItemState().getItemStatus(id);
        if (status.isPresent()) {
            switch ((ItemStatus)status.get()) {
                case PROCESSING: {
                    return uk.gov.dstl.baleen.data.ItemStatus.PROCESSING;
                }
                case PROCESSED_OK: {
                    return uk.gov.dstl.baleen.data.ItemStatus.PROCESSED_OK;
                }
                case PROCESSED_ITEM_ERROR: {
                    return uk.gov.dstl.baleen.data.ItemStatus.PROCESSED_ITEM_ERROR;
                }
                case PROCESSED_PROCESSOR_ERROR: {
                    return uk.gov.dstl.baleen.data.ItemStatus.PROCESSED_PROCESSOR_ERROR;
                }
            }
        } else {
            return uk.gov.dstl.baleen.data.ItemStatus.NOT_FOUND;
        }
        return uk.gov.dstl.baleen.data.ItemStatus.UNKNOWN;
    }

    private List<String> getPipelineState() {
        try {
            return Files.readAllLines(this.stoppedState.toPath());
        }
        catch (FileNotFoundException | NoSuchFileException e) {
            return Collections.emptyList();
        }
        catch (IOException e) {
            LOGGER.error("Could not read pipeline state from disk", (Throwable)e);
            return Collections.emptyList();
        }
    }

    private void updatePipelineState() {
        List stoppedPipelines = this.pipelines.entrySet().stream().filter(e -> !((PipelineHolder)e.getValue()).isRunning()).map(Map.Entry::getKey).collect(Collectors.toList());
        this.createPersistenceFolder();
        try {
            Files.write(this.stoppedState.toPath(), stoppedPipelines, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
        }
        catch (IOException e2) {
            LOGGER.error("Unable to persist pipeline state to disk", (Throwable)e2);
        }
    }

    private boolean createPersistenceFolder() {
        if (!this.persistenceFolder.exists()) {
            LOGGER.info("Creating persistence folder {}", (Object)this.persistenceFolder);
            if (!this.persistenceFolder.mkdirs()) {
                LOGGER.warn("Unable to create persistence folder {}", (Object)this.persistenceFolder);
                return false;
            }
        }
        return true;
    }
}

