/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.reporting.sql;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.sql.CachedStatement;
import org.apache.nifi.reporting.sql.MetricsQueryService;
import org.apache.nifi.reporting.sql.QueryResult;
import org.apache.nifi.reporting.sql.bulletins.BulletinTable;
import org.apache.nifi.reporting.sql.connectionstatus.ConnectionStatusTable;
import org.apache.nifi.reporting.sql.connectionstatuspredictions.ConnectionStatusPredictionsTable;
import org.apache.nifi.reporting.sql.metrics.JvmMetricsTable;
import org.apache.nifi.reporting.sql.processgroupstatus.ProcessGroupStatusTable;
import org.apache.nifi.reporting.sql.processorstatus.ProcessorStatusTable;
import org.apache.nifi.reporting.sql.provenance.ProvenanceTable;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.db.JdbcCommon;

public class MetricsSqlQueryService
implements MetricsQueryService {
    private final ComponentLog logger;
    private final Cache<String, BlockingQueue<CachedStatement>> statementQueues = Caffeine.newBuilder().maximumSize(25L).removalListener(this::onCacheEviction).build();

    public MetricsSqlQueryService(ComponentLog logger) {
        try {
            DriverManager.registerDriver((Driver)new org.apache.calcite.jdbc.Driver());
        }
        catch (SQLException e) {
            throw new ProcessException("Failed to load Calcite JDBC Driver", (Throwable)e);
        }
        this.logger = logger;
    }

    public ComponentLog getLogger() {
        return this.logger;
    }

    @Override
    public QueryResult query(ReportingContext context, final String sql) throws Exception {
        Supplier<CachedStatement> statementBuilder = () -> {
            try {
                return this.buildCachedStatement(sql, context);
            }
            catch (Exception e) {
                throw new PreparedStatementException((Throwable)e);
            }
        };
        final CachedStatement cachedStatement = this.getStatement(sql, statementBuilder, this.statementQueues);
        PreparedStatement stmt = cachedStatement.getStatement();
        final ResultSet rs = stmt.executeQuery();
        return new QueryResult(){

            @Override
            public void close() throws IOException {
                BlockingQueue statementQueue = (BlockingQueue)MetricsSqlQueryService.this.statementQueues.getIfPresent((Object)sql);
                if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
                    try {
                        cachedStatement.getConnection().close();
                    }
                    catch (SQLException e) {
                        throw new IOException("Failed to close statement", e);
                    }
                }
            }

            @Override
            public ResultSet getResultSet() {
                return rs;
            }

            @Override
            public int getRecordsRead() {
                return 0;
            }
        };
    }

    @Override
    public ResultSetRecordSet getResultSetRecordSet(QueryResult queryResult) throws Exception {
        ResultSet rs = queryResult.getResultSet();
        ResultSetRecordSet recordSet = null;
        RecordSchema writerSchema = AvroTypeUtil.createSchema((Schema)JdbcCommon.createSchema((ResultSet)rs));
        try {
            recordSet = new ResultSetRecordSet(rs, writerSchema);
        }
        catch (SQLException e) {
            this.getLogger().error("Error creating record set from query results due to {}", new Object[]{e.getMessage()}, (Throwable)e);
        }
        return recordSet;
    }

    private synchronized CachedStatement getStatement(String sql, Supplier<CachedStatement> statementBuilder, Cache<String, BlockingQueue<CachedStatement>> statementQueues) {
        CachedStatement cachedStmt;
        BlockingQueue statementQueue = (BlockingQueue)statementQueues.get((Object)sql, key -> new LinkedBlockingQueue());
        if (statementQueue != null && (cachedStmt = (CachedStatement)statementQueue.poll()) != null) {
            return cachedStmt;
        }
        return statementBuilder.get();
    }

    private CachedStatement buildCachedStatement(String sql, ReportingContext context) throws Exception {
        CalciteConnection connection = this.createConnection();
        SchemaPlus rootSchema = this.createRootSchema(connection);
        ConnectionStatusTable connectionStatusTable = new ConnectionStatusTable(context, this.getLogger());
        rootSchema.add("CONNECTION_STATUS", (Table)connectionStatusTable);
        if (context.isAnalyticsEnabled()) {
            ConnectionStatusPredictionsTable connectionStatusPredictionsTable = new ConnectionStatusPredictionsTable(context, this.getLogger());
            rootSchema.add("CONNECTION_STATUS_PREDICTIONS", (Table)connectionStatusPredictionsTable);
        } else {
            this.getLogger().debug("Analytics is not enabled, CONNECTION_STATUS_PREDICTIONS table is not available for querying");
        }
        ProcessorStatusTable processorStatusTable = new ProcessorStatusTable(context, this.getLogger());
        rootSchema.add("PROCESSOR_STATUS", (Table)processorStatusTable);
        ProcessGroupStatusTable processGroupStatusTable = new ProcessGroupStatusTable(context, this.getLogger());
        rootSchema.add("PROCESS_GROUP_STATUS", (Table)processGroupStatusTable);
        JvmMetricsTable jvmMetricsTable = new JvmMetricsTable(context, this.getLogger());
        rootSchema.add("JVM_METRICS", (Table)jvmMetricsTable);
        BulletinTable bulletinTable = new BulletinTable(context, this.getLogger());
        rootSchema.add("BULLETINS", (Table)bulletinTable);
        ProvenanceTable provenanceTable = new ProvenanceTable(context, this.getLogger());
        rootSchema.add("PROVENANCE", (Table)provenanceTable);
        rootSchema.setCacheEnabled(false);
        PreparedStatement stmt = connection.prepareStatement(sql);
        return new CachedStatement(stmt, (Connection)connection);
    }

    private SchemaPlus createRootSchema(CalciteConnection calciteConnection) {
        SchemaPlus rootSchema = calciteConnection.getRootSchema();
        return rootSchema;
    }

    private CalciteConnection createConnection() {
        Properties properties = new Properties();
        properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
        try {
            Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
            return connection.unwrap(CalciteConnection.class);
        }
        catch (Exception e) {
            throw new ProcessException((Throwable)e);
        }
    }

    private void clearQueue(BlockingQueue<CachedStatement> statementQueue) {
        CachedStatement stmt;
        while ((stmt = (CachedStatement)statementQueue.poll()) != null) {
            this.closeQuietly(stmt.getStatement(), stmt.getConnection());
        }
    }

    @Override
    public void closeQuietly(AutoCloseable ... closeables) {
        if (closeables == null) {
            return;
        }
        for (AutoCloseable closeable : closeables) {
            if (closeable == null) continue;
            try {
                closeable.close();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to close SQL resource", (Throwable)e);
            }
        }
    }

    private void onCacheEviction(String key, BlockingQueue<CachedStatement> queue, RemovalCause cause) {
        this.clearQueue(queue);
    }

    private class PreparedStatementException
    extends RuntimeException {
        public PreparedStatementException() {
        }

        public PreparedStatementException(String message) {
            super(message);
        }

        public PreparedStatementException(String message, Throwable cause) {
            super(message, cause);
        }

        public PreparedStatementException(Throwable cause) {
            super(cause);
        }

        public PreparedStatementException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
            super(message, cause, enableSuppression, writableStackTrace);
        }
    }
}

