/*
 * Decompiled with CFR 0.152.
 */
package io.digdag.core.agent;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.client.config.ConfigFactory;
import io.digdag.core.ErrorReporter;
import io.digdag.core.Limits;
import io.digdag.core.agent.AgentConfig;
import io.digdag.core.agent.AgentId;
import io.digdag.core.agent.CheckedConfig;
import io.digdag.core.agent.ConfigEvalEngine;
import io.digdag.core.agent.DefaultOperatorContext;
import io.digdag.core.agent.DefaultSecretProvider;
import io.digdag.core.agent.EditDistance;
import io.digdag.core.agent.GrantedPrivilegedVariables;
import io.digdag.core.agent.OperatorRegistry;
import io.digdag.core.agent.RuntimeParams;
import io.digdag.core.agent.SetThreadName;
import io.digdag.core.agent.TaskCallbackApi;
import io.digdag.core.agent.WorkspaceManager;
import io.digdag.core.log.LogLevel;
import io.digdag.core.log.LogMarkers;
import io.digdag.core.log.TaskContextLogging;
import io.digdag.core.log.TaskLogger;
import io.digdag.metrics.DigdagTimed;
import io.digdag.spi.ImmutableTaskRequest;
import io.digdag.spi.Operator;
import io.digdag.spi.OperatorContext;
import io.digdag.spi.OperatorFactory;
import io.digdag.spi.SecretAccessContext;
import io.digdag.spi.SecretStore;
import io.digdag.spi.SecretStoreManager;
import io.digdag.spi.TaskExecutionException;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import io.digdag.spi.TemplateException;
import io.digdag.spi.metrics.DigdagMetrics;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OperatorManager {
    private static Logger logger = LoggerFactory.getLogger(OperatorManager.class);
    protected final AgentConfig agentConfig;
    protected final AgentId agentId;
    protected final TaskCallbackApi callback;
    private final WorkspaceManager workspaceManager;
    private final ConfigFactory cf;
    private final ConfigEvalEngine evalEngine;
    private final OperatorRegistry registry;
    private final SecretStoreManager secretStoreManager;
    private final ScheduledExecutorService heartbeatScheduler;
    private final ConcurrentHashMap<Long, TaskRequest> runningTaskMap = new ConcurrentHashMap();
    @Inject(optional=true)
    private ErrorReporter errorReporter = ErrorReporter.empty();
    @Inject
    private DigdagMetrics metrics;
    private final Limits limits;
    private static final String[] CONFIG_KEYS_FOR_LOGGING = new String[]{"project_id", "session_id", "session_time", "attempt_id", "task_name", "last_session_time", "last_executed_session_time", "next_session_time", "session_uuid", "timezone"};

    @Inject
    public OperatorManager(AgentConfig agentConfig, AgentId agentId, TaskCallbackApi callback, WorkspaceManager workspaceManager, ConfigFactory cf, ConfigEvalEngine evalEngine, OperatorRegistry registry, SecretStoreManager secretStoreManager, Limits limits) {
        this.agentConfig = agentConfig;
        this.agentId = agentId;
        this.callback = callback;
        this.workspaceManager = workspaceManager;
        this.cf = cf;
        this.evalEngine = evalEngine;
        this.registry = registry;
        this.secretStoreManager = secretStoreManager;
        this.limits = limits;
        this.heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("heartbeat-%d").build());
    }

    @PostConstruct
    public void start() {
        this.heartbeatScheduler.scheduleAtFixedRate(() -> this.heartbeat(), this.agentConfig.getHeartbeatInterval(), this.agentConfig.getHeartbeatInterval(), TimeUnit.SECONDS);
    }

    @PreDestroy
    public void shutdown() {
        this.heartbeatScheduler.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @DigdagTimed(value="opm_", category="agent", appendMethodName=true)
    public void run(TaskRequest request) {
        long taskId = request.getTaskId();
        String origThreadName = String.format("[%d:%s:%d:%d]%s", request.getSiteId(), request.getProjectName().or((Object)"----"), request.getSessionId(), request.getAttemptId(), request.getTaskName());
        try (SetThreadName threadName = new SetThreadName(origThreadName);
             TaskLogger taskLogger = this.callback.newTaskLogger(request);){
            TaskContextLogging.enter(LogLevel.DEBUG, taskLogger);
            try {
                this.runningTaskMap.put(taskId, request);
                try {
                    this.runWithHeartbeat(request);
                }
                finally {
                    this.runningTaskMap.remove(taskId);
                }
            }
            finally {
                TaskContextLogging.leave();
            }
        }
    }

    @DigdagTimed(value="opm_", category="agent", appendMethodName=true)
    protected void runWithHeartbeat(TaskRequest request) {
        try {
            this.workspaceManager.withExtractedArchive(request, () -> this.callback.openArchive(request), projectPath -> {
                block12: {
                    try {
                        this.runWithWorkspace(projectPath, request);
                    }
                    catch (TaskExecutionException ex) {
                        if (ex.getRetryInterval().isPresent()) {
                            if (!ex.getError(this.cf).isPresent()) {
                                logger.debug("Retrying task {}", (Object)ex.toString());
                            } else {
                                logger.error("Task failed, retrying", (Throwable)ex);
                            }
                            this.callback.retryTask(request, this.agentId, (Integer)ex.getRetryInterval().get(), (Config)ex.getStateParams(this.cf).get(), (com.google.common.base.Optional<Config>)ex.getError(this.cf));
                        } else {
                            if (request.isCancelRequested()) {
                                logger.warn("Task {} is canceled.", (Object)request.getTaskName());
                            } else {
                                logger.error("Task {} failed.\n{}", (Object)request.getTaskName(), (Object)OperatorManager.formatExceptionMessage(ex));
                                logger.debug("", (Throwable)ex);
                            }
                            this.callback.taskFailed(request, this.agentId, (Config)ex.getError(this.cf).get());
                        }
                    }
                    catch (AssertionError | RuntimeException ex) {
                        if (ex instanceof ConfigException) {
                            logger.error("Configuration error at task {}: {}", (Object)request.getTaskName(), (Object)OperatorManager.formatExceptionMessage((Throwable)ex));
                        } else {
                            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Task failed with unexpected error: {}", (Object)((Throwable)ex).getMessage(), ex);
                        }
                        this.callback.taskFailed(request, this.agentId, TaskExecutionException.buildExceptionErrorConfig((Throwable)ex).toConfig(this.cf));
                    }
                    catch (Throwable ex) {
                        logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Task failed with unexpected error: {}", (Object)ex.getMessage(), (Object)ex);
                        if (!request.isCancelRequested()) break block12;
                        logger.warn("This task will be canceled since it's already requested to be canceled");
                        this.callback.taskFailed(request, this.agentId, TaskExecutionException.buildExceptionErrorConfig((Throwable)ex).toConfig(this.cf));
                    }
                }
                return true;
            });
        }
        catch (IOException | RuntimeException ex) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Task failed with unexpected error: {}", (Object)ex.getMessage(), (Object)ex);
            this.callback.taskFailed(request, this.agentId, TaskExecutionException.buildExceptionErrorConfig((Throwable)ex).toConfig(this.cf));
        }
    }

    @DigdagTimed(value="opm_", category="agent", appendMethodName=true)
    protected Config evalConfig(TaskRequest request) throws RuntimeException, AssertionError {
        try {
            Config all = this.cf.create();
            all.merge(request.getConfig());
            Config runtimeParams = RuntimeParams.buildRuntimeParams(request.getConfig().getFactory(), request).deepCopy();
            all.merge(runtimeParams);
            Config evalParams = all.deepCopy();
            all.merge(request.getLocalConfig());
            return this.evalEngine.eval(all, evalParams);
        }
        catch (TemplateException te) {
            throw new ConfigException(te.getMessage(), (Throwable)te);
        }
    }

    @DigdagTimed(value="opm_", category="agent", appendMethodName=true)
    protected void runWithWorkspace(Path projectPath, TaskRequest request) throws TaskExecutionException {
        String type;
        Config config;
        try {
            config = this.evalConfig(request);
        }
        catch (ConfigException ex) {
            throw ex;
        }
        catch (RuntimeException ex) {
            throw new RuntimeException("Failed to process variables", ex);
        }
        catch (AssertionError ex) {
            throw new RuntimeException("Unexpected error happened in ConfigEvalEngine: " + ((Throwable)((Object)ex)).getMessage(), (Throwable)((Object)ex));
        }
        logger.debug("evaluated config: {}", (Object)this.filterConfigForLogging(config));
        HashSet<String> shouldBeUsedKeys = new HashSet<String>(request.getLocalConfig().getKeys());
        if (config.has("_type")) {
            type = (String)config.get("_type", String.class);
            if (OperatorManager.checkTaskLogPrintable(request)) {
                logger.info("type: {}", (Object)type);
            } else {
                logger.debug("(retry: {}) type: {}", (Object)request.getRetryCount(), (Object)type);
            }
            shouldBeUsedKeys.remove("_type");
        } else {
            Optional<String> operatorKey = config.getKeys().stream().filter(key -> key.endsWith(">")).findFirst();
            if (!operatorKey.isPresent()) {
                this.callback.taskSucceeded(request, this.agentId, TaskResult.empty((ConfigFactory)this.cf));
                return;
            }
            type = operatorKey.get().substring(0, operatorKey.get().length() - 1);
            Object command = config.getOptional(operatorKey.get(), Object.class).orNull();
            config.set("_type", (Object)type);
            config.set("_command", command);
            if (OperatorManager.checkTaskLogPrintable(request)) {
                logger.info("{}>: {}", (Object)type, com.google.common.base.Optional.fromNullable((Object)command).or((Object)""));
            } else {
                logger.debug("(retry: {}) {}>: {}", new Object[]{request.getRetryCount(), type, com.google.common.base.Optional.fromNullable((Object)command).or((Object)"")});
            }
            shouldBeUsedKeys.remove(operatorKey.get());
        }
        Config localConfig = config.getFactory().create();
        for (String localKey : request.getLocalConfig().getKeys()) {
            localConfig.set(localKey, config.getOptional(localKey, JsonNode.class).transform(JsonNode::deepCopy).orNull());
        }
        CheckedConfig.UsedKeysSet usedKeys = new CheckedConfig.UsedKeysSet();
        ImmutableTaskRequest mergedRequest = TaskRequest.builder().from(request).localConfig((Config)new CheckedConfig(localConfig, usedKeys)).config((Config)new CheckedConfig(config, usedKeys)).build();
        TaskResult result = this.callExecutor(projectPath, type, (TaskRequest)mergedRequest);
        if (!usedKeys.isAllUsed()) {
            shouldBeUsedKeys.removeAll(usedKeys);
            if (!shouldBeUsedKeys.isEmpty()) {
                this.warnUnusedKeys(request, shouldBeUsedKeys, usedKeys);
            }
        }
        this.callback.taskSucceeded(request, this.agentId, result);
    }

    @VisibleForTesting
    static boolean checkTaskLogPrintable(TaskRequest request) {
        boolean print = false;
        if (request.getRetryCount() == 0) {
            print = true;
        } else if (request.getStartedAt().isPresent() && Instant.now().isAfter(((Instant)request.getStartedAt().get()).plusSeconds(1800L)) && request.getRetryCount() % 10 == 0) {
            print = true;
        }
        return print;
    }

    private void warnUnusedKeys(TaskRequest request, Set<String> shouldBeUsedButNotUsedKeys, Collection<String> candidateKeys) {
        for (String key : shouldBeUsedButNotUsedKeys) {
            logger.error("Parameter '{}' is not used at task {}.", (Object)key, (Object)request.getTaskName());
            List<String> suggestions = EditDistance.suggest(key, candidateKeys, 0.5);
            if (suggestions.isEmpty()) continue;
            logger.error("  > Did you mean {}?", suggestions);
        }
    }

    @DigdagTimed(value="opm_", category="agent", appendMethodName=true)
    protected TaskResult callExecutor(Path projectPath, String type, TaskRequest mergedRequest) {
        OperatorFactory factory = this.registry.get(mergedRequest, type);
        if (factory == null) {
            throw new ConfigException("Unknown task type: " + type);
        }
        SecretStore secretStore = this.secretStoreManager.getSecretStore(mergedRequest.getSiteId());
        SecretAccessContext secretContext = SecretAccessContext.builder().siteId(mergedRequest.getSiteId()).projectId(mergedRequest.getProjectId()).revision((String)mergedRequest.getRevision().get()).workflowName(mergedRequest.getWorkflowName()).taskName(mergedRequest.getTaskName()).operatorType(type).build();
        Config secretMounts = mergedRequest.getConfig().getNestedOrGetEmpty("_secrets");
        mergedRequest.getConfig().remove("_secrets");
        DefaultSecretProvider secretProvider = new DefaultSecretProvider(secretContext, secretMounts, secretStore);
        GrantedPrivilegedVariables privilegedVariables = GrantedPrivilegedVariables.build(mergedRequest.getLocalConfig().getNestedOrGetEmpty("_env"), GrantedPrivilegedVariables.privilegedSecretProvider(secretContext, secretStore));
        DefaultOperatorContext context = new DefaultOperatorContext(projectPath, mergedRequest, secretProvider, privilegedVariables, this.limits);
        Operator operator = factory.newOperator((OperatorContext)context);
        if (mergedRequest.isCancelRequested()) {
            operator.cleanup(mergedRequest);
            throw new TaskExecutionException(String.format(Locale.ENGLISH, "Got a cancel-requested: attempt_id=%d, task_id=%d", mergedRequest.getAttemptId(), mergedRequest.getTaskId()));
        }
        return operator.run();
    }

    private void heartbeat() {
        try {
            Map sites = this.runningTaskMap.values().stream().collect(Collectors.groupingBy(TaskRequest::getSiteId, Collectors.mapping(Function.identity(), Collectors.toList())));
            for (Map.Entry pair : sites.entrySet()) {
                int siteId = pair.getKey();
                List<TaskRequest> taskRequests = pair.getValue();
                this.callback.taskHeartbeat(siteId, taskRequests, this.agentId, this.agentConfig.getLockRetentionTime());
            }
        }
        catch (Throwable t) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Uncaught exception during sending task heartbeats to a server. Ignoring. Heartbeat thread will be retried.", t);
            this.errorReporter.reportUncaughtError(t);
            this.metrics.increment(DigdagMetrics.Category.AGENT, "uncaughtErrors");
        }
    }

    public static String formatExceptionMessage(Throwable ex) {
        StringBuilder sb = new StringBuilder();
        OperatorManager.collectExceptionMessage(sb, ex, new StringBuffer());
        return sb.toString();
    }

    public static void collectExceptionMessage(StringBuilder sb, Throwable ex, StringBuffer used) {
        String message = ex.getMessage();
        if (Strings.isNullOrEmpty((String)message)) {
            message = ex.getClass().getSimpleName();
        }
        if (used.indexOf(message) == -1) {
            used.append("\n").append(message);
            if (sb.length() > 0) {
                sb.append("\n> ");
            }
            sb.append(message);
            if (!(ex instanceof TaskExecutionException)) {
                sb.append(" (");
                sb.append(ex.getClass().getSimpleName().replaceFirst("(?:Exception|Error)$", "").replaceAll("([A-Z]+)([A-Z][a-z])", "$1 $2").replaceAll("([a-z])([A-Z])", "$1 $2").toLowerCase());
                sb.append(")");
            }
        }
        if (ex.getCause() != null) {
            OperatorManager.collectExceptionMessage(sb, ex.getCause(), used);
        }
        for (Throwable t : ex.getSuppressed()) {
            OperatorManager.collectExceptionMessage(sb, t, used);
        }
    }

    @VisibleForTesting
    public Config filterConfigForLogging(Config src) {
        Config dst = this.cf.create();
        for (String key : CONFIG_KEYS_FOR_LOGGING) {
            com.google.common.base.Optional v = src.getOptional(key, Object.class);
            if (!v.isPresent()) continue;
            dst.set(key, v.get());
        }
        return dst;
    }
}

