/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.audit.store.service;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.audit.protocol.AuditData;
import org.apache.inlong.audit.store.config.JdbcConfig;
import org.apache.inlong.audit.store.entities.JdbcDataPo;
import org.apache.inlong.audit.store.metric.MetricsManager;
import org.apache.inlong.audit.store.route.AuditRouteManager;
import org.apache.inlong.audit.store.service.InsertData;
import org.apache.inlong.audit.store.utils.PulsarUtils;
import org.apache.inlong.audit.utils.DataUtils;
import org.apache.inlong.audit.utils.RouteUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcService
implements InsertData,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcService.class);
    private static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id, \r\n      sdk_ts, packet_id, log_ts, \r\n      inlong_group_id, inlong_stream_id, audit_id, audit_tag, audit_version, \r\n      count, size, delay, \r\n      update_time)\r\n    values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
    private final JdbcConfig jdbcConfig;
    private final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
    private LinkedBlockingQueue<JdbcDataPo> receiveQueue;
    private long lastCheckTime = System.currentTimeMillis();
    private Connection connection;
    private final List<JdbcDataPo> writeDataList = new LinkedList();

    public JdbcService(JdbcConfig jdbcConfig) {
        this.jdbcConfig = jdbcConfig;
    }

    public void start() {
        this.receiveQueue = new LinkedBlockingQueue(this.jdbcConfig.getDataQueueSize());
        try {
            Class.forName(this.jdbcConfig.getDriver());
            this.reconnect();
        }
        catch (Exception e) {
            LOG.error("Start failure!", (Throwable)e);
        }
        this.timerService.scheduleWithFixedDelay(() -> this.process(), this.jdbcConfig.getProcessIntervalMs(), this.jdbcConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
    }

    private void process() {
        if (this.receiveQueue.size() < this.jdbcConfig.getBatchThreshold() && System.currentTimeMillis() - this.lastCheckTime < (long)this.jdbcConfig.getBatchIntervalMs()) {
            return;
        }
        this.lastCheckTime = System.currentTimeMillis();
        if (this.writeDataList.size() > 0) {
            if (this.executeBatch(this.writeDataList)) {
                this.acknowledge(this.writeDataList);
                this.writeDataList.clear();
            } else {
                return;
            }
        }
        JdbcDataPo data = (JdbcDataPo)this.receiveQueue.poll();
        while (data != null) {
            this.writeDataList.add(data);
            if (this.writeDataList.size() > this.jdbcConfig.getBatchThreshold()) {
                if (!this.executeBatch(this.writeDataList)) break;
                this.acknowledge(this.writeDataList);
                this.writeDataList.clear();
            }
            data = (JdbcDataPo)this.receiveQueue.poll();
        }
    }

    private boolean executeBatch(List<JdbcDataPo> dataList) {
        boolean result = false;
        long currentTimestamp = System.currentTimeMillis();
        try (PreparedStatement statement = this.connection.prepareStatement(INSERT_SQL);){
            for (JdbcDataPo data : dataList) {
                statement.setString(1, data.getIp());
                statement.setString(2, data.getDockerId());
                statement.setString(3, data.getThreadId());
                statement.setTimestamp(4, data.getSdkTs());
                statement.setLong(5, data.getPacketId());
                statement.setTimestamp(6, data.getLogTs());
                statement.setString(7, data.getInLongGroupId());
                statement.setString(8, data.getInLongStreamId());
                statement.setString(9, data.getAuditId());
                statement.setString(10, data.getAuditTag());
                statement.setLong(11, data.getAuditVersion());
                statement.setLong(12, data.getCount());
                statement.setLong(13, data.getSize());
                statement.setLong(14, data.getDelay());
                statement.setTimestamp(15, data.getUpdateTime());
                statement.addBatch();
            }
            statement.executeBatch();
            this.connection.commit();
            result = true;
            MetricsManager.getInstance().addSendSuccess((long)dataList.size(), System.currentTimeMillis() - currentTimestamp);
        }
        catch (Exception exception) {
            MetricsManager.getInstance().addSendFailed((long)dataList.size(), System.currentTimeMillis() - currentTimestamp);
            LOG.error("Execute batch has failure!", (Throwable)exception);
            try {
                this.reconnect();
            }
            catch (SQLException sqlException) {
                LOG.error("Re-connect has  failure!", (Throwable)sqlException);
            }
        }
        return result;
    }

    private void reconnect() throws SQLException {
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (Exception e) {
                LOG.error("Reconnect has exception!", (Throwable)e);
            }
            this.connection = null;
        }
        this.connection = DriverManager.getConnection(this.jdbcConfig.getUrl(), this.jdbcConfig.getUserName(), this.jdbcConfig.getPassword());
        this.connection.setAutoCommit(false);
    }

    public void insert(AuditData msgBody) {
    }

    public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId messageId) {
        if (!this.isAuditDataValid(msgBody)) {
            MetricsManager.getInstance().addInvalidData();
            PulsarUtils.acknowledge(consumer, (MessageId)messageId);
            LOG.error("Invalid audit data: {} ", (Object)msgBody);
            return;
        }
        if (!RouteUtils.matchesAuditRoute((String)msgBody.getAuditId(), (String)msgBody.getInlongGroupId(), (List)AuditRouteManager.getInstance().getAuditRoutes())) {
            MetricsManager.getInstance().filterSuccess();
            PulsarUtils.acknowledge(consumer, (MessageId)messageId);
            LOG.warn("The audit data does not match the routing rules and is filtered out: {} ", (Object)msgBody);
            return;
        }
        JdbcDataPo data = new JdbcDataPo();
        data.setConsumer(consumer);
        data.setMessageId(messageId);
        data.setIp(msgBody.getIp());
        data.setThreadId(msgBody.getThreadId());
        data.setDockerId(msgBody.getDockerId());
        data.setPacketId(Long.valueOf(msgBody.getPacketId()));
        data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
        data.setLogTs(new Timestamp(msgBody.getLogTs()));
        data.setAuditId(msgBody.getAuditId());
        data.setAuditTag(msgBody.getAuditTag());
        data.setAuditVersion(msgBody.getAuditVersion());
        data.setCount(Long.valueOf(msgBody.getCount()));
        data.setDelay(Long.valueOf(msgBody.getDelay()));
        data.setInLongGroupId(msgBody.getInlongGroupId());
        data.setInLongStreamId(msgBody.getInlongStreamId());
        data.setSize(Long.valueOf(msgBody.getSize()));
        data.setUpdateTime(new Timestamp(System.currentTimeMillis()));
        try {
            this.receiveQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException exception) {
            LOG.error("Insert data has InterruptedException ", (Throwable)exception);
        }
    }

    @Override
    public void close() throws Exception {
        this.connection.close();
        this.timerService.shutdown();
    }

    private void acknowledge(List<JdbcDataPo> dataList) {
        Iterator<JdbcDataPo> iterator = dataList.iterator();
        while (iterator.hasNext()) {
            JdbcDataPo jdbcData = iterator.next();
            try {
                if (jdbcData.getConsumer() != null && jdbcData.getMessageId() != null) {
                    jdbcData.getConsumer().acknowledge(jdbcData.getMessageId());
                }
                iterator.remove();
            }
            catch (Exception exception) {
                LOG.error("Acknowledge has exception!", (Throwable)exception);
            }
        }
    }

    private boolean isAuditDataValid(AuditData auditData) {
        if (!this.isDataTimeValid(auditData)) {
            return false;
        }
        return this.isAuditItemValid(auditData);
    }

    private boolean isDataTimeValid(AuditData auditData) {
        long validDataTimeRangeMs = this.jdbcConfig.getValidDataTimeRangeMs();
        return DataUtils.isDataTimeValid((long)auditData.getLogTs(), (long)validDataTimeRangeMs) && DataUtils.isDataTimeValid((long)auditData.getSdkTs(), (long)validDataTimeRangeMs);
    }

    private boolean isAuditItemValid(AuditData auditData) {
        return DataUtils.isAuditItemValid((String)auditData.getInlongGroupId()) && DataUtils.isAuditItemValid((String)auditData.getInlongStreamId()) && DataUtils.isAuditItemValid((String)auditData.getAuditId()) && DataUtils.isAuditItemValid((String)auditData.getAuditTag()) && DataUtils.isAuditItemValid((String)auditData.getIp()) && DataUtils.isAuditItemValid((String)auditData.getDockerId()) && DataUtils.isAuditItemValid((String)auditData.getThreadId());
    }
}

