/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.audit.service.cache;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.sql.DataSource;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.inlong.audit.service.config.Configuration;
import org.apache.inlong.audit.service.entities.JdbcConfig;
import org.apache.inlong.audit.service.entities.StatData;
import org.apache.inlong.audit.service.node.ConfigService;
import org.apache.inlong.audit.service.utils.AuditUtils;
import org.apache.inlong.audit.service.utils.CacheUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RealTimeQuery {
    private static final Logger LOGGER = LoggerFactory.getLogger(RealTimeQuery.class);
    private static volatile RealTimeQuery realTimeQuery = null;
    private final List<BasicDataSource> dataSourceList = new LinkedList();
    private final ExecutorService executor = Executors.newFixedThreadPool(Configuration.getInstance().get("api.thread.pool.size", 10));

    private RealTimeQuery() {
        List jdbcConfigList = ConfigService.getInstance().getAllAuditSource();
        for (JdbcConfig jdbcConfig : jdbcConfigList) {
            BasicDataSource dataSource = new BasicDataSource();
            dataSource.setDriverClassName(jdbcConfig.getDriverClass());
            dataSource.setUrl(jdbcConfig.getJdbcUrl());
            dataSource.setUsername(jdbcConfig.getUserName());
            dataSource.setPassword(jdbcConfig.getPassword());
            dataSource.setInitialSize(Configuration.getInstance().get("datasource.min.idle.connections", 1));
            dataSource.setMaxActive(Configuration.getInstance().get("datasource.max.total.connections", 10));
            dataSource.setMaxIdle(Configuration.getInstance().get("datasource.max.idle.connections", 2));
            dataSource.setMinIdle(Configuration.getInstance().get("datasource.min.idle.connections", 1));
            dataSource.setTestOnBorrow(true);
            dataSource.setValidationQuery("SELECT 1");
            dataSource.setTimeBetweenEvictionRunsMillis((long)Configuration.getInstance().get("datasource.detect.interval.ms", 60000));
            this.dataSourceList.add(dataSource);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static RealTimeQuery getInstance() {
        if (realTimeQuery != null) return realTimeQuery;
        Class<Configuration> clazz = Configuration.class;
        synchronized (Configuration.class) {
            if (realTimeQuery != null) return realTimeQuery;
            realTimeQuery = new RealTimeQuery();
            // ** MonitorExit[var0] (shouldn't be in output)
            return realTimeQuery;
        }
    }

    public List<StatData> queryLogTs(String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId) {
        long currentTime = System.currentTimeMillis();
        CopyOnWriteArrayList<StatData> statDataList = new CopyOnWriteArrayList<StatData>();
        if (this.dataSourceList.isEmpty()) {
            return statDataList;
        }
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        for (DataSource dataSource : this.dataSourceList) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                List statDataListTemp = this.doQueryLogTs(dataSource, startTime, endTime, inlongGroupId, inlongStreamId, auditId);
                statDataList.addAll(statDataListTemp);
            }, this.executor);
            futures.add(future);
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        LOGGER.info("Query log ts by params: {} {} {} {} {}, total cost {} ms", new Object[]{startTime, endTime, inlongGroupId, inlongStreamId, auditId, System.currentTimeMillis() - currentTime});
        List maxAuditVersion = this.filterMaxAuditVersion(statDataList);
        if ("*".equals(inlongStreamId)) {
            return AuditUtils.aggregateStatData((List)maxAuditVersion, (String)"*");
        }
        return maxAuditVersion;
    }

    public List<StatData> filterMaxAuditVersion(List<StatData> allStatData) {
        HashMap<String, List> allData = new HashMap<String, List>();
        for (StatData statData : allStatData) {
            String dataKey = CacheUtils.buildCacheKey((String)statData.getLogTs(), (String)statData.getInlongGroupId(), (String)statData.getInlongStreamId(), (String)statData.getAuditId(), (String)statData.getAuditTag());
            List statDataList = allData.computeIfAbsent(dataKey, k -> new LinkedList());
            statDataList.add(statData);
        }
        LinkedList<StatData> result = new LinkedList<StatData>();
        block1: for (Map.Entry entry : allData.entrySet()) {
            long maxAuditVersion = Long.MIN_VALUE;
            for (StatData maxData : (List)entry.getValue()) {
                maxAuditVersion = Math.max(maxData.getAuditVersion(), maxAuditVersion);
            }
            for (StatData statData : (List)entry.getValue()) {
                if (statData.getAuditVersion() != maxAuditVersion) continue;
                result.add(statData);
                continue block1;
            }
        }
        return result;
    }

    private List<StatData> doQueryLogTs(DataSource dataSource, String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId) {
        long currentTime = System.currentTimeMillis();
        LinkedList<StatData> result = new LinkedList<StatData>();
        String querySQL = Configuration.getInstance().get("source.query.minute.sql", "SELECT log_ts, inlong_group_id, inlong_stream_id, audit_id, audit_tag\n\t, sum(count) AS cnt, sum(size) AS size\n\t, sum(delay) AS delay, audit_version\nFROM (\n\tSELECT audit_version, docker_id, thread_id, sdk_ts, packet_id\n\t\t, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id\n\t\t,    CASE \n        WHEN audit_tag ='' THEN '-1'\n        ELSE audit_tag\n    END AS audit_tag , count, size, delay\n\tFROM audit_data\n\tWHERE log_ts >= ? AND log_ts < ? \n\t\tAND inlong_group_id = ?\n\t\tAND inlong_stream_id = ?\n\t\tAND audit_id = ?\n\tGROUP BY audit_version, docker_id, thread_id, sdk_ts, packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag, count, size, delay\n) t_distinct\nGROUP BY audit_version, log_ts, inlong_group_id, inlong_stream_id, audit_id, audit_tag\nLIMIT 1440");
        ArrayList<String> paramList = new ArrayList<String>();
        if ("*".equals(inlongStreamId)) {
            querySQL = AuditUtils.removeStreamIdCondition((String)querySQL);
            querySQL = AuditUtils.removeStreamIdColumn((String)querySQL);
            paramList.add(startTime);
            paramList.add(endTime);
            paramList.add(inlongGroupId);
            paramList.add(auditId);
        } else {
            paramList.add(startTime);
            paramList.add(endTime);
            paramList.add(inlongGroupId);
            paramList.add(inlongStreamId);
            paramList.add(auditId);
        }
        try (Connection connection = dataSource.getConnection();
             PreparedStatement pstat = connection.prepareStatement(querySQL);){
            for (int i = 0; i < paramList.size(); ++i) {
                pstat.setString(i + 1, (String)paramList.get(i));
            }
            try (ResultSet resultSet = pstat.executeQuery();){
                while (resultSet.next()) {
                    StatData data = new StatData();
                    data.setLogTs(resultSet.getString("log_ts"));
                    data.setInlongGroupId(resultSet.getString("inlong_group_id"));
                    data.setAuditId(resultSet.getString("audit_id"));
                    data.setAuditTag(resultSet.getString("audit_tag"));
                    long count = resultSet.getLong("cnt");
                    data.setCount(Long.valueOf(count));
                    data.setDelay(Long.valueOf(CacheUtils.calculateAverageDelay((long)count, (long)resultSet.getLong("delay"))));
                    data.setSize(Long.valueOf(resultSet.getLong("size")));
                    data.setAuditVersion(resultSet.getLong("audit_version"));
                    if ("*".equals(inlongStreamId)) {
                        data.setInlongStreamId("*");
                    } else {
                        data.setInlongStreamId(resultSet.getString("inlong_stream_id"));
                    }
                    result.add(data);
                }
            }
            catch (SQLException sqlException) {
                LOGGER.error("Query log time has SQL exception!, datasource={} ", (Object)dataSource, (Object)sqlException);
            }
        }
        catch (Exception exception) {
            LOGGER.error("Query log time has exception!, datasource={} ", (Object)dataSource, (Object)exception);
        }
        LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", new Object[]{startTime, endTime, inlongGroupId, inlongStreamId, auditId, System.currentTimeMillis() - currentTime});
        return result;
    }

    public List<StatData> queryIdsByIp(String startTime, String endTime, String ip, String auditId) {
        DataSource dataSource;
        List<StatData> statDataList = new LinkedList();
        Iterator iterator = this.dataSourceList.iterator();
        while (iterator.hasNext() && (statDataList = this.doQueryIdsByIp(dataSource = (DataSource)iterator.next(), startTime, endTime, ip, auditId)).isEmpty()) {
        }
        LOGGER.info("Query ids by params:{} {} {} {}, result size:{} ", new Object[]{startTime, endTime, ip, auditId, statDataList.size()});
        return statDataList;
    }

    private List<StatData> doQueryIdsByIp(DataSource dataSource, String startTime, String endTime, String ip, String auditId) {
        LinkedList<StatData> result = new LinkedList<StatData>();
        String querySQL = Configuration.getInstance().get("source.query.ids.sql", "SELECT inlong_group_id, inlong_stream_id, audit_id, CASE \n    WHEN audit_tag = '' THEN '-1'\n    ELSE audit_tag\nEND AS audit_tag \n\t, sum(count) AS cnt, sum(size) AS size\n\t, sum(delay) AS delay\nFROM audit_data\nWHERE log_ts >= ? AND log_ts < ? \n\tAND audit_id = ? \n\tAND ip = ? \nGROUP BY inlong_group_id, inlong_stream_id, audit_id, audit_tag");
        ArrayList<String> paramList = new ArrayList<String>();
        paramList.add(startTime);
        paramList.add(endTime);
        paramList.add(auditId);
        paramList.add(ip);
        try (Connection connection = dataSource.getConnection();
             PreparedStatement pstat = connection.prepareStatement(querySQL);){
            for (int i = 0; i < paramList.size(); ++i) {
                pstat.setString(i + 1, (String)paramList.get(i));
            }
            try (ResultSet resultSet = pstat.executeQuery();){
                while (resultSet.next()) {
                    StatData data = new StatData();
                    data.setInlongGroupId(resultSet.getString("inlong_group_id"));
                    data.setInlongStreamId(resultSet.getString("inlong_stream_id"));
                    data.setAuditId(resultSet.getString("audit_id"));
                    data.setAuditTag(resultSet.getString("audit_tag"));
                    long count = resultSet.getLong("cnt");
                    data.setCount(Long.valueOf(count));
                    data.setSize(Long.valueOf(resultSet.getLong("size")));
                    data.setDelay(Long.valueOf(CacheUtils.calculateAverageDelay((long)count, (long)resultSet.getLong("delay"))));
                    result.add(data);
                }
            }
            catch (SQLException sqlException) {
                LOGGER.error("Query inLongGroupIds has SQL exception!, datasource={} ", (Object)dataSource, (Object)sqlException);
            }
        }
        catch (Exception exception) {
            LOGGER.error("Query inLongGroupIds has exception!, datasource={} ", (Object)dataSource, (Object)exception);
        }
        return result;
    }

    public List<StatData> queryIpsById(String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId) {
        DataSource dataSource;
        List<StatData> statDataList = new LinkedList();
        Iterator iterator = this.dataSourceList.iterator();
        while (iterator.hasNext() && (statDataList = this.doQueryIpsById(dataSource = (DataSource)iterator.next(), startTime, endTime, inlongGroupId, inlongStreamId, auditId)).isEmpty()) {
        }
        LOGGER.info("Query ips by params:{} {} {} {} {}, result size:{} ", new Object[]{startTime, endTime, inlongGroupId, inlongStreamId, auditId, statDataList.size()});
        return statDataList;
    }

    private List<StatData> doQueryIpsById(DataSource dataSource, String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId) {
        LinkedList<StatData> result = new LinkedList<StatData>();
        String querySQL = Configuration.getInstance().get("source.query.ips.sql", "SELECT ip, sum(count) AS cnt, sum(size) AS size\n\t, sum(delay) AS delay\nFROM audit_data\nWHERE log_ts >= ? AND log_ts < ? \n\tAND inlong_group_id = ? \n\tAND inlong_stream_id =  ? \n\tAND audit_id =  ? \nGROUP BY ip ");
        ArrayList<String> paramList = new ArrayList<String>();
        if ("*".equals(inlongStreamId)) {
            querySQL = AuditUtils.removeStreamIdCondition((String)querySQL);
            paramList.add(startTime);
            paramList.add(endTime);
            paramList.add(inlongGroupId);
            paramList.add(auditId);
        } else {
            paramList.add(startTime);
            paramList.add(endTime);
            paramList.add(inlongGroupId);
            paramList.add(inlongStreamId);
            paramList.add(auditId);
        }
        try (Connection connection = dataSource.getConnection();
             PreparedStatement pstat = connection.prepareStatement(querySQL);){
            for (int i = 0; i < paramList.size(); ++i) {
                pstat.setString(i + 1, (String)paramList.get(i));
            }
            try (ResultSet resultSet = pstat.executeQuery();){
                while (resultSet.next()) {
                    StatData data = new StatData();
                    data.setIp(resultSet.getString("ip"));
                    long count = resultSet.getLong("cnt");
                    data.setSize(Long.valueOf(resultSet.getLong("size")));
                    data.setLogTs(startTime);
                    data.setInlongGroupId(inlongGroupId);
                    data.setInlongStreamId(inlongStreamId);
                    data.setAuditId(auditId);
                    data.setCount(Long.valueOf(count));
                    data.setDelay(Long.valueOf(CacheUtils.calculateAverageDelay((long)count, (long)resultSet.getLong(4))));
                    result.add(data);
                }
            }
            catch (SQLException sqlException) {
                LOGGER.error("Query ips has SQL exception!, datasource={} ", (Object)dataSource, (Object)sqlException);
            }
        }
        catch (Exception exception) {
            LOGGER.error("Query ips has exception! ", (Throwable)exception);
        }
        return result;
    }

    public List<StatData> queryAuditData(String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId, String auditTag, boolean distinct) {
        long currentTime = System.currentTimeMillis();
        CopyOnWriteArrayList<StatData> statDataList = new CopyOnWriteArrayList<StatData>();
        if (this.dataSourceList.isEmpty()) {
            return null;
        }
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        for (DataSource dataSource : this.dataSourceList) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                StatData statDataListTemp = this.doQueryAuditData(dataSource, startTime, endTime, inlongGroupId, inlongStreamId, auditId, auditTag, distinct);
                if (statDataListTemp != null) {
                    statDataList.add(statDataListTemp);
                }
            }, this.executor);
            futures.add(future);
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        LOGGER.info("Query audit data by params: {} {} {} {} {}, total cost {} ms", new Object[]{startTime, endTime, inlongGroupId, inlongStreamId, auditId, System.currentTimeMillis() - currentTime});
        return statDataList;
    }

    public StatData doQueryAuditData(DataSource dataSource, String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId, String auditTag, boolean distinct) {
        LinkedList<StatData> result = new LinkedList<StatData>();
        String querySQL = distinct ? Configuration.getInstance().get("audit.reconciliation.distinct.sql", "SELECT\n    audit_version,\n    sum(count) AS cnt\nFROM\n    (\n        SELECT\n            audit_version,\n            docker_id,\n            thread_id,\n            sdk_ts,\n            packet_id,\n            log_ts,\n            ip,\n            inlong_group_id,\n            inlong_stream_id,\n            audit_id,\n            CASE\n                WHEN audit_tag = '' THEN '-1'\n                ELSE audit_tag\n            END AS audit_tag,\n            count,\n            size,\n            delay\n        FROM\n            audit_data\n        where\n            log_ts >= ? \n            and log_ts < ? \n            and audit_id = ? \n            and inlong_group_id = ? \n            and inlong_stream_id = ? \n            and (\n                audit_tag = ? \n                or audit_tag = ''\n            )\n        GROUP BY\n            audit_version,\n            docker_id,\n            thread_id,\n            sdk_ts,\n            packet_id,\n            log_ts,\n            ip,\n            inlong_group_id,\n            inlong_stream_id,\n            audit_id,\n            audit_tag,\n            count,\n            size,\n            delay\n    ) t_distinct\nGROUP BY\n    audit_version") : Configuration.getInstance().get("audit.reconciliation.sql", "select\naudit_version,\nsum(count) cnt\nfrom\n    audit_data\nwhere\n    log_ts >= ? \n    and log_ts < ? \n    and audit_id = ? \n    and inlong_group_id = ? \n    and inlong_stream_id = ? \n    and (\n        audit_tag = ? \n        or audit_tag = '' \n    )\ngroup by\naudit_version");
        ArrayList<String> paramList = new ArrayList<String>();
        if ("*".equals(inlongStreamId)) {
            querySQL = AuditUtils.removeStreamIdCondition((String)querySQL);
            paramList.add(startTime);
            paramList.add(endTime);
            paramList.add(auditId);
            paramList.add(inlongGroupId);
        } else {
            paramList.add(startTime);
            paramList.add(endTime);
            paramList.add(auditId);
            paramList.add(inlongGroupId);
            paramList.add(inlongStreamId);
        }
        paramList.add(auditTag);
        try (Connection connection = dataSource.getConnection();
             PreparedStatement pstat = connection.prepareStatement(querySQL);){
            for (int i = 0; i < paramList.size(); ++i) {
                pstat.setString(i + 1, (String)paramList.get(i));
            }
            try (ResultSet resultSet = pstat.executeQuery();){
                while (resultSet.next()) {
                    StatData data = new StatData();
                    data.setAuditVersion(resultSet.getLong("audit_version"));
                    data.setCount(Long.valueOf(resultSet.getLong("cnt")));
                    data.setLogTs(startTime);
                    data.setInlongGroupId(inlongGroupId);
                    data.setInlongStreamId(inlongStreamId);
                    data.setAuditId(auditId);
                    data.setAuditTag(auditTag);
                    result.add(data);
                }
            }
            catch (SQLException sqlException) {
                LOGGER.error("Query ips has SQL exception!, datasource={} ", (Object)dataSource, (Object)sqlException);
            }
        }
        catch (Exception exception) {
            LOGGER.error("Query audit data has exception! ", (Throwable)exception);
        }
        return AuditUtils.getMaxAuditVersionAuditData(result);
    }
}

