/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.rest.service;

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Cluster;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.JsonUtil;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.EnvCommonOptions;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.ExecutionAddress;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.dag.DAGUtils;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
import org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.RestUtil;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseService {
    private static final Logger log = LoggerFactory.getLogger(BaseService.class);
    protected final NodeEngineImpl nodeEngine;

    public BaseService(NodeEngineImpl nodeEngine) {
        this.nodeEngine = nodeEngine;
    }

    protected SeaTunnelServer getSeaTunnelServer(boolean shouldBeMaster) {
        Map extensionServices = this.nodeEngine.getNode().getNodeExtension().createExtensionServices();
        SeaTunnelServer seaTunnelServer = (SeaTunnelServer)extensionServices.get("st:impl:seaTunnelServer");
        if (shouldBeMaster && !seaTunnelServer.isMasterNode()) {
            return null;
        }
        return seaTunnelServer;
    }

    protected JsonObject convertToJson(JobInfo jobInfo, long jobId) {
        JobStatus jobStatus;
        String jobMetrics;
        JsonObject jobInfoJson = new JsonObject();
        JobImmutableInformation jobImmutableInformation = (JobImmutableInformation)this.nodeEngine.getSerializationService().toObject(this.nodeEngine.getSerializationService().toObject((Object)jobInfo.getJobImmutableInformation()));
        SeaTunnelServer seaTunnelServer = this.getSeaTunnelServer(true);
        ClassLoaderService classLoaderService = seaTunnelServer == null ? this.getSeaTunnelServer(false).getClassLoaderService() : seaTunnelServer.getClassLoaderService();
        LogicalDag logicalDag = DAGUtils.restoreLogicalDag(jobImmutableInformation, this.nodeEngine.getSerializationService(), classLoaderService);
        if (seaTunnelServer == null) {
            jobMetrics = (String)NodeEngineUtil.sendOperationToMasterNode((NodeEngine)this.nodeEngine, new GetJobMetricsOperation(jobId)).join();
            jobStatus = JobStatus.values()[(Integer)NodeEngineUtil.sendOperationToMasterNode((NodeEngine)this.nodeEngine, new GetJobStatusOperation(jobId)).join()];
        } else {
            jobMetrics = seaTunnelServer.getCoordinatorService().getJobMetrics(jobId).toJsonString();
            jobStatus = seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
        }
        JobDAGInfo jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag, jobImmutableInformation, this.getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(), true, new ExecutionAddress(this.nodeEngine.getMasterAddress().getHost(), this.nodeEngine.getMasterAddress().getPort()), new HashSet<ExecutionAddress>());
        jobInfoJson.add("jobId", String.valueOf(jobId)).add("jobName", logicalDag.getJobConfig().getName()).add("jobStatus", jobStatus.toString()).add("envOptions", (JsonValue)JsonUtil.toJsonObject((Map)logicalDag.getJobConfig().getEnvOptions())).add("createTime", DateTimeUtils.toString((long)jobImmutableInformation.getCreateTime(), (DateTimeUtils.Formatter)DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)).add("startTime", this.getJobStartTime(jobId)).add("jobDag", (JsonValue)jobDAGInfo.toJsonObject()).add("pluginJarsUrls", (JsonValue)jobImmutableInformation.getPluginJarsUrls().stream().map(url -> {
            JsonObject jarUrl = new JsonObject();
            jarUrl.add("jarPath", url.toString());
            return jarUrl;
        }).collect(JsonArray::new, JsonArray::add, JsonArray::add)).add("isStartWithSavePoint", jobImmutableInformation.isStartWithSavePoint()).add("metrics", (JsonValue)this.metricsToJsonObject(this.getJobMetrics(jobMetrics)));
        return jobInfoJson;
    }

    private String getJobStartTime(long jobId) {
        IMap stateTimestamps = this.nodeEngine.getHazelcastInstance().getMap("engine_stateTimestamps");
        Long[] jobnStateTimestamps = (Long[])stateTimestamps.get((Object)jobId);
        if (jobnStateTimestamps != null) {
            Long startTimestamp = jobnStateTimestamps[JobStatus.SCHEDULED.ordinal()];
            return DateTimeUtils.toString((long)startTimestamp, (DateTimeUtils.Formatter)DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS);
        }
        return null;
    }

    protected JsonObject getJobInfoJson(JobHistoryService.JobState jobState, String jobMetrics, JobDAGInfo jobDAGInfo) {
        return new JsonObject().add("jobId", String.valueOf(jobState.getJobId())).add("jobName", jobState.getJobName()).add("jobStatus", jobState.getJobStatus().toString()).add("errorMsg", jobState.getErrorMessage()).add("createTime", DateTimeUtils.toString((long)jobState.getSubmitTime(), (DateTimeUtils.Formatter)DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)).add("startTime", DateTimeUtils.toString((long)jobState.getStartTime(), (DateTimeUtils.Formatter)DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)).add("finishTime", DateTimeUtils.toString((long)jobState.getFinishTime(), (DateTimeUtils.Formatter)DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)).add("jobDag", (JsonValue)jobDAGInfo.toJsonObject()).add("pluginJarsUrls", (JsonValue)new JsonArray()).add("metrics", (JsonValue)this.metricsToJsonObject(this.getJobMetrics(jobMetrics)));
    }

    private Map<String, Object> getJobMetrics(String jobMetrics) {
        HashMap<String, Object> metricsMap = new HashMap<String, Object>();
        Object[] countMetricsNames = new String[]{"SourceReceivedCount", "SinkWriteCount", "SourceReceivedBytes", "SinkWriteBytes", "IntermediateQueueSize"};
        Object[] rateMetricsNames = new String[]{"SourceReceivedQPS", "SinkWriteQPS", "SourceReceivedBytesPerSeconds", "SinkWriteBytesPerSeconds"};
        Object[] tableCountMetricsNames = new String[]{"TableSourceReceivedCount", "TableSinkWriteCount", "TableSourceReceivedBytes", "TableSinkWriteBytes"};
        Object[] tableRateMetricsNames = new String[]{"TableSourceReceivedQPS", "TableSinkWriteQPS", "TableSourceReceivedBytesPerSeconds", "TableSinkWriteBytesPerSeconds"};
        Long[] metricsSums = (Long[])Stream.generate(() -> 0L).limit(countMetricsNames.length).toArray(Long[]::new);
        Double[] metricsRates = (Double[])Stream.generate(() -> 0.0).limit(rateMetricsNames.length).toArray(Double[]::new);
        Object[] tableMetricsMaps = new Map[]{new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashMap()};
        try {
            JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
            jobMetricsStr.fieldNames().forEachRemaining(arg_0 -> this.lambda$getJobMetrics$5(jobMetricsStr, (Map[])tableMetricsMaps, arg_0));
            this.aggregateMetrics(jobMetricsStr, metricsSums, metricsRates, (String[])ArrayUtils.addAll((Object[])countMetricsNames, (Object[])rateMetricsNames));
        }
        catch (JsonProcessingException e) {
            return metricsMap;
        }
        this.populateMetricsMap(metricsMap, tableMetricsMaps, (String[])ArrayUtils.addAll((Object[])tableCountMetricsNames, (Object[])tableRateMetricsNames), countMetricsNames.length);
        this.populateMetricsMap(metricsMap, Stream.concat(Arrays.stream(metricsSums), Arrays.stream(metricsRates)).toArray(Number[]::new), (String[])ArrayUtils.addAll((Object[])countMetricsNames, (Object[])rateMetricsNames), metricsSums.length);
        return metricsMap;
    }

    private void processMetric(String metricName, String tableName, JsonNode metricNode, Map<String, JsonNode>[] tableMetricsMaps) {
        if (metricNode == null) {
            return;
        }
        boolean SOURCE_COUNT_IDX = false;
        boolean SINK_COUNT_IDX = true;
        int SOURCE_BYTES_IDX = 2;
        int SINK_BYTES_IDX = 3;
        int SOURCE_QPS_IDX = 4;
        int SINK_QPS_IDX = 5;
        int SOURCE_BYTES_SEC_IDX = 6;
        int SINK_BYTES_SEC_IDX = 7;
        if (metricName.startsWith("SourceReceivedCount#")) {
            tableMetricsMaps[0].put(tableName, metricNode);
        } else if (metricName.startsWith("SinkWriteCount#")) {
            tableMetricsMaps[1].put(tableName, metricNode);
        } else if (metricName.startsWith("SourceReceivedBytes#")) {
            tableMetricsMaps[2].put(tableName, metricNode);
        } else if (metricName.startsWith("SinkWriteBytes#")) {
            tableMetricsMaps[3].put(tableName, metricNode);
        } else if (metricName.startsWith("SourceReceivedQPS#")) {
            tableMetricsMaps[4].put(tableName, metricNode);
        } else if (metricName.startsWith("SinkWriteQPS#")) {
            tableMetricsMaps[5].put(tableName, metricNode);
        } else if (metricName.startsWith("SourceReceivedBytesPerSeconds#")) {
            tableMetricsMaps[6].put(tableName, metricNode);
        } else if (metricName.startsWith("SinkWriteBytesPerSeconds#")) {
            tableMetricsMaps[7].put(tableName, metricNode);
        }
    }

    private void aggregateMetrics(JsonNode jobMetricsStr, Long[] metricsSums, Double[] metricsRates, String[] metricsNames) {
        for (int i = 0; i < metricsNames.length; ++i) {
            JsonNode metricNode = jobMetricsStr.get(metricsNames[i]);
            if (metricNode == null || !metricNode.isArray()) continue;
            for (JsonNode node : metricNode) {
                int n;
                Number[] numberArray;
                if (i < metricsSums.length) {
                    numberArray = metricsSums;
                    n = i;
                    Long.valueOf((Long)numberArray[n] + node.path("value").asLong());
                    continue;
                }
                numberArray = metricsRates;
                n = i - metricsSums.length;
                Double.valueOf((Double)numberArray[n] + node.path("value").asDouble());
            }
        }
    }

    private void populateMetricsMap(Map<String, Object> metricsMap, Object[] metrics, String[] metricNames, int countMetricNames) {
        for (int i = 0; i < metrics.length; ++i) {
            if (metrics[i] == null) continue;
            if (metrics[i] instanceof Map) {
                metricsMap.put(metricNames[i], this.aggregateMap((Map)metrics[i], i >= countMetricNames));
                continue;
            }
            metricsMap.put(metricNames[i], metrics[i]);
        }
    }

    private Map<String, Object> aggregateMap(Map<String, JsonNode> inputMap, boolean isRate) {
        return isRate ? inputMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> StreamSupport.stream(((JsonNode)entry.getValue()).spliterator(), false).mapToDouble(node -> node.path("value").asDouble()).sum())) : inputMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> StreamSupport.stream(((JsonNode)entry.getValue()).spliterator(), false).mapToLong(node -> node.path("value").asLong()).sum()));
    }

    private JsonObject metricsToJsonObject(Map<String, Object> jobMetrics) {
        JsonObject members = new JsonObject();
        jobMetrics.forEach((key, value) -> {
            if (value instanceof Map) {
                members.add(key, (JsonValue)this.metricsToJsonObject((Map)value));
            } else {
                members.add(key, value.toString());
            }
        });
        return members;
    }

    protected JsonNode requestHandle(byte[] requestBody) {
        JsonNode requestBodyJsonNode;
        if (requestBody.length == 0) {
            throw new IllegalArgumentException("Request body is empty.");
        }
        try {
            requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Invalid JSON format in request body.");
        }
        return requestBodyJsonNode;
    }

    protected void handleStopJob(Map<String, Object> map, SeaTunnelServer seaTunnelServer, Node node) {
        boolean isStopWithSavePoint = false;
        if (map.get("jobId") == null) {
            throw new IllegalArgumentException("jobId cannot be empty.");
        }
        long jobId = Long.parseLong(map.get("jobId").toString());
        if (map.get("isStopWithSavePoint") != null) {
            isStopWithSavePoint = Boolean.parseBoolean(map.get("isStopWithSavePoint").toString());
        }
        if (!seaTunnelServer.isMasterNode()) {
            if (isStopWithSavePoint) {
                NodeEngineUtil.sendOperationToMasterNode((NodeEngine)node.nodeEngine, new SavePointJobOperation(jobId)).join();
            } else {
                NodeEngineUtil.sendOperationToMasterNode((NodeEngine)node.nodeEngine, new CancelJobOperation(jobId)).join();
            }
        } else {
            CoordinatorService coordinatorService = seaTunnelServer.getCoordinatorService();
            if (isStopWithSavePoint) {
                coordinatorService.savePoint(jobId);
            } else {
                coordinatorService.cancelJob(jobId);
            }
        }
    }

    protected String mapToUrlParams(Map<String, String> params) {
        return params.entrySet().stream().map(entry -> (String)entry.getKey() + "=" + (String)entry.getValue()).collect(Collectors.joining("&", "?", ""));
    }

    protected JsonObject submitJobInternal(Config config, Map<String, String> requestParams, SeaTunnelServer seaTunnelServer, Node node) {
        ReadonlyConfig envOptions = ReadonlyConfig.fromConfig((Config)config.getConfig("env"));
        String jobName = (String)envOptions.get(EnvCommonOptions.JOB_NAME);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(StringUtils.isEmpty((CharSequence)requestParams.get("jobName")) ? jobName : requestParams.get("jobName"));
        boolean startWithSavePoint = Boolean.parseBoolean(requestParams.get("isStartWithSavePoint"));
        String jobIdStr = requestParams.get("jobId");
        Long finalJobId = StringUtils.isNotBlank((CharSequence)jobIdStr) ? Long.valueOf(Long.parseLong(jobIdStr)) : null;
        RestJobExecutionEnvironment restJobExecutionEnvironment = new RestJobExecutionEnvironment(seaTunnelServer, jobConfig, config, node, startWithSavePoint, finalJobId);
        JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build();
        long jobId = jobImmutableInformation.getJobId();
        if (!seaTunnelServer.isMasterNode()) {
            NodeEngineUtil.sendOperationToMasterNode((NodeEngine)node.nodeEngine, new SubmitJobOperation(jobId, node.nodeEngine.toData((Object)jobImmutableInformation), jobImmutableInformation.isStartWithSavePoint())).join();
        } else {
            this.submitJob(node, seaTunnelServer, jobImmutableInformation, jobConfig);
        }
        return new JsonObject().add("jobId", String.valueOf(jobId)).add("jobName", jobConfig.getName());
    }

    private void submitJob(Node node, SeaTunnelServer seaTunnelServer, JobImmutableInformation jobImmutableInformation, JobConfig jobConfig) {
        CoordinatorService coordinatorService = seaTunnelServer.getCoordinatorService();
        Data data = node.nodeEngine.getSerializationService().toData((Object)jobImmutableInformation);
        PassiveCompletableFuture<Void> voidPassiveCompletableFuture = coordinatorService.submitJob(Long.parseLong(jobConfig.getJobContext().getJobId()), data, jobImmutableInformation.isStartWithSavePoint());
        voidPassiveCompletableFuture.join();
    }

    protected JsonArray getSystemMonitoringInformationJsonValues() {
        Cluster cluster = this.nodeEngine.getHazelcastInstance().getCluster();
        Set members = cluster.getMembers();
        JsonArray jsonValues = members.stream().map(member -> {
            Address address = member.getAddress();
            String input = null;
            try {
                input = (String)NodeEngineUtil.sendOperationToMemberNode((NodeEngine)this.nodeEngine, new GetClusterHealthMetricsOperation(), address).get();
            }
            catch (InterruptedException | ExecutionException e) {
                log.error("Failed to get cluster health metrics", (Throwable)e);
            }
            String[] parts = input.split(", ");
            JsonObject jobInfo = new JsonObject();
            Arrays.stream(parts).forEach(part -> {
                String[] keyValue = part.split("=");
                jobInfo.add(keyValue[0], keyValue[1]);
            });
            return jobInfo;
        }).collect(JsonArray::new, JsonArray::add, JsonArray::add);
        return jsonValues;
    }

    private /* synthetic */ void lambda$getJobMetrics$5(JsonNode jobMetricsStr, Map[] tableMetricsMaps, String metricName) {
        if (metricName.contains("#")) {
            String tableName = TablePath.of((String)metricName.split("#")[1]).getFullName();
            JsonNode metricNode = jobMetricsStr.get(metricName);
            this.processMetric(metricName, tableName, metricNode, tableMetricsMaps);
        }
    }
}

