/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.entity.AuditInformation;
import org.apache.inlong.audit.entity.AuditProxy;
import org.apache.inlong.audit.entity.FlowType;
import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditProxyResponse;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.service.audit.AuditRunnable;
import org.apache.inlong.manager.service.core.AuditService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Lazy
@Service
public class AuditServiceImpl
implements AuditService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class);
    private final Map<String, Map<Integer, AuditInformation>> auditIndicatorMap = new ConcurrentHashMap<String, Map<Integer, AuditInformation>>();
    private final Map<String, String> auditItemMap = new ConcurrentHashMap<String, String>();
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    @Value(value="#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForAdmin;
    @Value(value="#{'${audit.user.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForUser;
    @Value(value="${audit.query.url:http://127.0.0.1:10080}")
    private String auditQueryUrl;
    @Autowired
    private StreamSinkEntityMapper sinkEntityMapper;
    @Autowired
    private StreamSourceEntityMapper sourceEntityMapper;
    @Autowired
    private InlongGroupEntityMapper inlongGroupMapper;
    @Autowired
    private RestTemplate restTemplate;

    @PostConstruct
    public void initialize() {
        LOGGER.info("init audit base item cache map for {}", (Object)AuditServiceImpl.class.getSimpleName());
        try {
            this.refreshBaseItemCache();
        }
        catch (Throwable t) {
            LOGGER.error("initialize audit base item cache error", t);
        }
    }

    @Override
    public Boolean refreshBaseItemCache() {
        LOGGER.debug("start to reload audit base item info");
        try {
            this.auditIndicatorMap.clear();
            List auditInformationList = AuditOperator.getInstance().getAllAuditInformation();
            List metricInformationList = AuditOperator.getInstance().getAllMetricInformation();
            List cdcMetricInformationList = AuditOperator.getInstance().getAllCdcIdInformation();
            cdcMetricInformationList.forEach(v -> this.auditItemMap.put(String.valueOf(v.getAuditId()), v.getNameInChinese()));
            auditInformationList.forEach(v -> this.auditItemMap.put(String.valueOf(v.getAuditId()), v.getNameInChinese()));
            metricInformationList.forEach(v -> this.auditItemMap.put(String.valueOf(v.getAuditId()), v.getNameInChinese()));
        }
        catch (Throwable t) {
            LOGGER.error("failed to reload audit base item info", t);
            return false;
        }
        LOGGER.debug("success to reload audit base item info");
        return true;
    }

    @Override
    public String getAuditId(String type, IndicatorType indicatorType) {
        if (StringUtils.isBlank((CharSequence)type)) {
            return null;
        }
        Map itemMap = this.auditIndicatorMap.computeIfAbsent(type, v -> new HashMap());
        AuditInformation auditInformation = (AuditInformation)itemMap.get(indicatorType.getCode());
        if (auditInformation != null) {
            return String.valueOf(auditInformation.getAuditId());
        }
        FlowType flowType = indicatorType.getCode() % 2 == 0 ? FlowType.INPUT : FlowType.OUTPUT;
        auditInformation = AuditOperator.getInstance().buildAuditInformation(type, flowType, IndicatorType.isSuccessType((IndicatorType)indicatorType).booleanValue(), true, IndicatorType.isDiscardType((IndicatorType)indicatorType).booleanValue(), IndicatorType.isRetryType((IndicatorType)indicatorType).booleanValue());
        Preconditions.expectNotNull((Object)auditInformation, (ErrorCodeEnum)ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED, (String)String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), type));
        itemMap.put(indicatorType.getCode(), auditInformation);
        return String.valueOf(auditInformation.getAuditId());
    }

    @Override
    public List<AuditVO> listByCondition(AuditRequest request) throws Exception {
        LOGGER.info("begin query audit list request={}", (Object)request);
        Preconditions.expectNotNull((Object)request, (String)"request is null");
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        String sinkNodeType = null;
        String sourceNodeType = null;
        Integer sinkId = request.getSinkId();
        StreamSinkEntity sinkEntity = null;
        if (StringUtils.isNotBlank((CharSequence)streamId)) {
            List sinkEntityList = this.sinkEntityMapper.selectByRelatedId(groupId, streamId);
            if (sinkId != null) {
                sinkEntity = this.sinkEntityMapper.selectByPrimaryKey(sinkId);
            } else if (CollectionUtils.isNotEmpty((Collection)sinkEntityList)) {
                sinkEntity = (StreamSinkEntity)sinkEntityList.get(0);
            }
            if (sinkEntity != null) {
                sinkNodeType = sinkEntity.getSinkType();
            }
        } else {
            sinkNodeType = request.getSinkType();
        }
        HashMap<String, String> auditIdMap = new HashMap<String, String>();
        if (StringUtils.isNotBlank((CharSequence)groupId)) {
            InlongGroupEntity groupEntity = this.inlongGroupMapper.selectByGroupId(groupId);
            List sourceEntityList = this.sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
            if (CollectionUtils.isNotEmpty((Collection)sourceEntityList)) {
                sourceNodeType = ((StreamSourceEntity)sourceEntityList.get(0)).getSourceType();
            }
            auditIdMap.put(this.getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS), sinkNodeType);
            if (CollectionUtils.isEmpty((Collection)request.getAuditIds())) {
                if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()) || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) {
                    List<AuditInformation> cdcAuditInfoList = this.getCdcAuditInfoList(sourceNodeType, IndicatorType.RECEIVED_SUCCESS);
                    List<String> cdcAuditIdList = cdcAuditInfoList.stream().map(v -> String.valueOf(v.getAuditId())).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(cdcAuditIdList)) {
                        String tempSourceNodeType = sourceNodeType;
                        cdcAuditIdList.forEach(v -> auditIdMap.put((String)v, tempSourceNodeType));
                    }
                    auditIdMap.put(this.getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS), sourceNodeType);
                    request.setAuditIds(this.getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType));
                } else {
                    auditIdMap.put(this.getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS), sinkNodeType);
                    request.setAuditIds(this.getAuditIds(groupId, streamId, null, sinkNodeType));
                }
            }
        } else if (CollectionUtils.isEmpty((Collection)request.getAuditIds())) {
            throw new BusinessException("audits id is empty");
        }
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        CountDownLatch latch = new CountDownLatch(request.getAuditIds().size());
        for (String auditId : request.getAuditIds()) {
            String auditName = this.auditItemMap.get(auditId);
            this.executor.execute(new AuditRunnable(request, auditId, auditName, result, latch, this.restTemplate, this.auditQueryUrl, auditIdMap, false));
        }
        latch.await(30L, TimeUnit.SECONDS);
        LOGGER.info("success to query audit list for request={}", (Object)request);
        return result;
    }

    @Override
    public List<AuditVO> listAll(AuditRequest request) throws Exception {
        ArrayList<AuditVO> result = new ArrayList<AuditVO>();
        CountDownLatch latch = new CountDownLatch(request.getAuditIds().size());
        for (String auditId : request.getAuditIds()) {
            String auditName = this.auditItemMap.get(auditId);
            this.executor.execute(new AuditRunnable(request, auditId, auditName, result, latch, this.restTemplate, this.auditQueryUrl, null, true));
        }
        latch.await(30L, TimeUnit.SECONDS);
        return result;
    }

    @Override
    public List<AuditInformation> getAuditBases(Boolean isMetric) {
        if (isMetric.booleanValue()) {
            return AuditOperator.getInstance().getAllMetricInformation();
        }
        return AuditOperator.getInstance().getAllAuditInformation();
    }

    private List<String> getAuditIds(String groupId, String streamId, String sourceNodeType, String sinkNodeType) {
        boolean dpReceivedNeeded;
        HashSet<String> auditSet;
        HashSet<String> hashSet = auditSet = LoginUserUtils.getLoginUser().getRoles().contains("TENANT_ADMIN") ? new HashSet<String>(this.auditIdListForAdmin) : new HashSet<String>(this.auditIdListForUser);
        if (sinkNodeType == null) {
            auditSet.add(this.getAuditId("DATAPROXY", IndicatorType.SEND_SUCCESS));
        } else {
            auditSet.add(this.getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS));
            InlongGroupEntity inlongGroup = this.inlongGroupMapper.selectByGroupId(groupId);
            if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()) || InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroup.getInlongGroupMode())) {
                List<AuditInformation> cdcAuditInfoList = this.getCdcAuditInfoList(sourceNodeType, IndicatorType.RECEIVED_SUCCESS);
                if (CollectionUtils.isNotEmpty(cdcAuditInfoList)) {
                    List cdcAuditIdList = cdcAuditInfoList.stream().map(v -> String.valueOf(v.getAuditId())).collect(Collectors.toList());
                    auditSet.addAll(cdcAuditIdList);
                }
                auditSet.add(this.getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS));
            } else {
                auditSet.add(this.getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS));
            }
        }
        List sourceList = this.sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
        if ((CollectionUtils.isEmpty((Collection)sourceList) || sourceList.stream().allMatch(s -> "AUTO_PUSH".equals(s.getSourceType()))) && (dpReceivedNeeded = auditSet.contains(this.getAuditId("AGENT", IndicatorType.RECEIVED_SUCCESS)))) {
            auditSet.add(this.getAuditId("DATAPROXY", IndicatorType.RECEIVED_SUCCESS));
        }
        return new ArrayList<String>(auditSet);
    }

    @Override
    public List<AuditProxy> getAuditProxy(String component) throws Exception {
        try {
            StringBuilder builder = new StringBuilder();
            builder.append(this.auditQueryUrl).append("/audit/query/getAuditProxy?").append("component=").append(component);
            String url = builder.toString();
            LOGGER.info("query audit url ={}", (Object)url);
            AuditProxyResponse result = (AuditProxyResponse)HttpUtils.request((RestTemplate)this.restTemplate, (String)url, (HttpMethod)HttpMethod.GET, null, null, AuditProxyResponse.class);
            LOGGER.info("success to query audit proxy url list for request url ={}", (Object)url);
            return result.getData();
        }
        catch (Exception e) {
            String errMsg = String.format("get audit proxy url failed for %s", component);
            LOGGER.info(errMsg, (Throwable)e);
            throw new BusinessException(errMsg);
        }
    }

    @Override
    public List<AuditInformation> getCdcAuditInfoList(String type, IndicatorType indicatorType) {
        if (StringUtils.isBlank((CharSequence)type)) {
            return null;
        }
        FlowType flowType = indicatorType.getCode() % 2 == 0 ? FlowType.INPUT : FlowType.OUTPUT;
        List cdcAuditInfo = AuditOperator.getInstance().getAllCdcIdInformation(type, flowType);
        Preconditions.expectNotNull((Object)cdcAuditInfo, (ErrorCodeEnum)ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED, (String)String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), type));
        return cdcAuditInfo;
    }
}

