/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.plugin.poller;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.inlong.manager.common.enums.SortStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SortStatusPoller
implements SortPoller {
    private static final Logger log = LoggerFactory.getLogger(SortStatusPoller.class);
    private static final Map<JobStatus, SortStatus> JOB_SORT_STATUS_MAP = new HashMap<JobStatus, SortStatus>(16);

    public List<SortStatusInfo> pollSortStatus(List<InlongStreamInfo> streamInfos, String credentials) {
        log.debug("begin to poll sort status for stream");
        if (CollectionUtils.isEmpty(streamInfos)) {
            log.debug("end to poll sort status, as the stream list is empty");
            return Collections.emptyList();
        }
        ArrayList<SortStatusInfo> statusInfos = new ArrayList<SortStatusInfo>(streamInfos.size());
        for (InlongStreamInfo streamInfo : streamInfos) {
            try {
                List extList = streamInfo.getExtList();
                log.debug("stream {} ext info: {}", (Object)streamInfo.getInlongStreamId(), (Object)extList);
                HashMap kvConf = new HashMap();
                extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue()));
                String sortExt = (String)kvConf.get("sort.properties");
                if (StringUtils.isNotEmpty((CharSequence)sortExt)) {
                    Map result = (Map)JsonUtils.OBJECT_MAPPER.convertValue((Object)JsonUtils.OBJECT_MAPPER.readTree(sortExt), (TypeReference)new TypeReference<Map<String, String>>(){});
                    kvConf.putAll(result);
                }
                String jobId = (String)kvConf.get("sort.job.id");
                SortStatusInfo statusInfo = SortStatusInfo.builder().inlongGroupId(streamInfo.getInlongGroupId()).inlongStreamId(streamInfo.getInlongStreamId()).build();
                if (StringUtils.isBlank((CharSequence)jobId)) {
                    statusInfo.setSortStatus(SortStatus.NOT_EXISTS);
                    statusInfos.add(statusInfo);
                    continue;
                }
                String sortUrl = (String)kvConf.get("sort.url");
                FlinkService flinkService = new FlinkService(sortUrl);
                statusInfo.setSortStatus(JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId), SortStatus.UNKNOWN));
                statusInfos.add(statusInfo);
            }
            catch (Exception e) {
                log.error("polling sort status failed for groupId=" + streamInfo.getInlongGroupId() + " streamId=" + streamInfo.getInlongStreamId(), (Throwable)e);
            }
        }
        log.debug("success to get sort status: {}", statusInfos);
        return statusInfos;
    }

    static {
        JOB_SORT_STATUS_MAP.put(JobStatus.CREATED, SortStatus.NEW);
        JOB_SORT_STATUS_MAP.put(JobStatus.INITIALIZING, SortStatus.NEW);
        JOB_SORT_STATUS_MAP.put(JobStatus.RUNNING, SortStatus.RUNNING);
        JOB_SORT_STATUS_MAP.put(JobStatus.FAILED, SortStatus.FAILED);
        JOB_SORT_STATUS_MAP.put(JobStatus.CANCELED, SortStatus.STOPPED);
        JOB_SORT_STATUS_MAP.put(JobStatus.SUSPENDED, SortStatus.PAUSED);
        JOB_SORT_STATUS_MAP.put(JobStatus.FINISHED, SortStatus.FINISHED);
        JOB_SORT_STATUS_MAP.put(JobStatus.FAILING, SortStatus.OPERATING);
        JOB_SORT_STATUS_MAP.put(JobStatus.CANCELLING, SortStatus.OPERATING);
        JOB_SORT_STATUS_MAP.put(JobStatus.RESTARTING, SortStatus.OPERATING);
        JOB_SORT_STATUS_MAP.put(JobStatus.RECONCILING, SortStatus.OPERATING);
    }
}

