/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.connectors.redshift;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.PutObjectRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RedshiftManifestEmitter
implements IEmitter<String> {
    private static final Log LOG = LogFactory.getLog(RedshiftManifestEmitter.class);
    private final String s3Bucket;
    private final String dataTable;
    private final String fileTable;
    private final String fileKeyColumn;
    private final char dataDelimiter;
    private final AWSCredentialsProvider credentialsProvider;
    private final String s3Endpoint;
    private final AmazonS3Client s3Client;
    private final boolean copyMandatory;
    private final Properties loginProps;
    private final String redshiftURL;
    private static final String MANIFEST_PREFIX = "manifests/";

    public RedshiftManifestEmitter(KinesisConnectorConfiguration configuration) {
        this.dataTable = configuration.REDSHIFT_DATA_TABLE;
        this.fileTable = configuration.REDSHIFT_FILE_TABLE;
        this.fileKeyColumn = configuration.REDSHIFT_FILE_KEY_COLUMN;
        this.dataDelimiter = configuration.REDSHIFT_DATA_DELIMITER.charValue();
        this.copyMandatory = configuration.REDSHIFT_COPY_MANDATORY;
        this.s3Bucket = configuration.S3_BUCKET;
        this.s3Endpoint = configuration.S3_ENDPOINT;
        this.s3Client = new AmazonS3Client(configuration.AWS_CREDENTIALS_PROVIDER);
        if (this.s3Endpoint != null) {
            this.s3Client.setEndpoint(this.s3Endpoint);
        }
        this.credentialsProvider = configuration.AWS_CREDENTIALS_PROVIDER;
        this.loginProps = new Properties();
        this.loginProps.setProperty("user", configuration.REDSHIFT_USERNAME);
        this.loginProps.setProperty("password", configuration.REDSHIFT_PASSWORD);
        this.redshiftURL = configuration.REDSHIFT_URL;
    }

    @Override
    public List<String> emit(UnmodifiableBuffer<String> buffer) throws IOException {
        List<String> records = buffer.getRecords();
        Connection conn = null;
        String manifestFileName = this.getManifestFile(records);
        try {
            conn = DriverManager.getConnection(this.redshiftURL, this.loginProps);
            conn.setAutoCommit(false);
            List<String> deduplicatedRecords = this.checkForExistingFiles(conn, records);
            if (deduplicatedRecords.isEmpty()) {
                LOG.info((Object)"All the files in this set were already copied to Redshift.");
                this.rollbackAndCloseConnection(conn);
                return Collections.emptyList();
            }
            if (deduplicatedRecords.size() != records.size()) {
                manifestFileName = this.getManifestFile(deduplicatedRecords);
            }
            try {
                this.writeManifestToS3(manifestFileName, deduplicatedRecords);
            }
            catch (Exception e) {
                LOG.error((Object)("Error writing file " + manifestFileName + " to S3. Failing this emit attempt."), (Throwable)e);
                return buffer.getRecords();
            }
            LOG.info((Object)("Inserting " + deduplicatedRecords.size() + " rows into the files table."));
            this.insertRecords(conn, deduplicatedRecords);
            LOG.info((Object)("Initiating Amazon Redshift manifest copy of " + deduplicatedRecords.size() + " files."));
            this.redshiftCopy(conn, manifestFileName);
            conn.commit();
            LOG.info((Object)("Successful Amazon Redshift manifest copy of " + this.getNumberOfCopiedRecords(conn) + " records from " + deduplicatedRecords.size() + " files using manifest s3://" + this.s3Bucket + "/" + this.getManifestFile(records)));
            this.closeConnection(conn);
            return Collections.emptyList();
        }
        catch (IOException | SQLException e) {
            LOG.error((Object)("Error copying data from manifest file " + manifestFileName + " into Amazon Redshift. Failing this emit attempt."), (Throwable)e);
            this.rollbackAndCloseConnection(conn);
            return buffer.getRecords();
        }
        catch (Exception e) {
            LOG.error((Object)("Error copying data from manifest file " + manifestFileName + " into Redshift. Failing this emit attempt."), (Throwable)e);
            this.rollbackAndCloseConnection(conn);
            return buffer.getRecords();
        }
    }

    private void rollbackAndCloseConnection(Connection conn) {
        try {
            if (conn != null && !conn.isClosed()) {
                conn.rollback();
            }
        }
        catch (Exception e) {
            LOG.error((Object)"Unable to rollback Amazon Redshift transaction.", (Throwable)e);
        }
        this.closeConnection(conn);
    }

    private void closeConnection(Connection conn) {
        try {
            if (conn != null && !conn.isClosed()) {
                conn.close();
            }
        }
        catch (Exception e) {
            LOG.error((Object)"Unable to close Amazon Redshift connection.", (Throwable)e);
        }
    }

    @Override
    public void fail(List<String> records) {
        for (String record : records) {
            LOG.error((Object)("Record failed: " + record));
        }
    }

    private String writeManifestToS3(String fileName, List<String> records) throws IOException {
        String fileContents = this.generateManifestFile(records);
        PutObjectRequest putObjectRequest = new PutObjectRequest(this.s3Bucket, fileName, (InputStream)new ByteArrayInputStream(fileContents.getBytes()), null);
        this.s3Client.putObject(putObjectRequest);
        return fileName;
    }

    private void insertRecords(Connection conn, Collection<String> records) throws IOException {
        String toInsert = this.getCollectionString(records, "(", "),(", ")");
        StringBuilder insertSQL = new StringBuilder();
        insertSQL.append("INSERT INTO ");
        insertSQL.append(this.fileTable);
        insertSQL.append(" VALUES ");
        insertSQL.append(toInsert);
        insertSQL.append(";");
        this.executeStatement(conn, insertSQL.toString());
    }

    private List<String> checkForExistingFiles(Connection conn, List<String> records) throws IOException {
        TreeSet<String> recordSet = new TreeSet<String>(records);
        String files = this.getCollectionString(recordSet, "(", ",", ")");
        StringBuilder selectExisting = new StringBuilder();
        selectExisting.append("SELECT " + this.fileKeyColumn + " FROM ");
        selectExisting.append(this.fileTable);
        selectExisting.append(" WHERE ");
        selectExisting.append(this.fileKeyColumn);
        selectExisting.append(" IN ");
        selectExisting.append(files);
        selectExisting.append(";");
        Statement stmt = null;
        ResultSet resultSet = null;
        try {
            stmt = conn.createStatement();
            String query = selectExisting.toString();
            resultSet = stmt.executeQuery(query);
            while (resultSet.next()) {
                String existingFile = resultSet.getString(1);
                LOG.info((Object)("File " + existingFile + " has already been copied. Leaving it out."));
                recordSet.remove(existingFile);
            }
            resultSet.close();
            stmt.close();
            return new ArrayList<String>(recordSet);
        }
        catch (SQLException e) {
            try {
                resultSet.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                stmt.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            throw new IOException(e);
        }
    }

    private int getNumberOfCopiedRecords(Connection conn) throws IOException {
        String cmd = "select pg_last_copy_count();";
        Statement stmt = null;
        ResultSet resultSet = null;
        try {
            stmt = conn.createStatement();
            resultSet = stmt.executeQuery(cmd);
            resultSet.next();
            int numCopiedRecords = resultSet.getInt(1);
            resultSet.close();
            stmt.close();
            return numCopiedRecords;
        }
        catch (SQLException e) {
            try {
                resultSet.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                stmt.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            throw new IOException(e);
        }
    }

    protected void redshiftCopy(Connection conn, String manifestFile) throws IOException {
        AWSCredentials credentials = this.credentialsProvider.getCredentials();
        StringBuilder redshiftCopy = new StringBuilder();
        redshiftCopy.append("COPY " + this.dataTable + " ");
        redshiftCopy.append("FROM 's3://" + this.s3Bucket + "/" + manifestFile + "' ");
        redshiftCopy.append("CREDENTIALS '");
        redshiftCopy.append("aws_access_key_id=" + credentials.getAWSAccessKeyId());
        redshiftCopy.append(";");
        redshiftCopy.append("aws_secret_access_key=" + credentials.getAWSSecretKey());
        if (credentials instanceof AWSSessionCredentials) {
            redshiftCopy.append(";");
            redshiftCopy.append("token=" + ((AWSSessionCredentials)credentials).getSessionToken());
        }
        redshiftCopy.append("' ");
        redshiftCopy.append("DELIMITER '" + this.dataDelimiter + "' ");
        redshiftCopy.append("MANIFEST");
        redshiftCopy.append(";");
        this.executeStatement(conn, redshiftCopy.toString());
    }

    private void executeStatement(Connection conn, String statement) throws IOException {
        try {
            Statement stmt = conn.createStatement();
            stmt.execute(statement);
            stmt.close();
            return;
        }
        catch (SQLException e) {
            LOG.error((Object)("Amazon S3 endpoint set to: " + this.s3Endpoint));
            LOG.error((Object)("Error executing statement: " + statement), (Throwable)e);
            throw new IOException(e);
        }
    }

    private String getCollectionString(Collection<String> members, String prepend, String delimiter, String append) {
        StringBuilder s = new StringBuilder();
        s.append(prepend);
        for (String m : members) {
            s.append("'");
            s.append(m);
            s.append("'");
            s.append(delimiter);
        }
        s.replace(s.length() - delimiter.length(), s.length(), "");
        s.append(append);
        return s.toString();
    }

    private String getManifestFile(List<String> records) {
        return MANIFEST_PREFIX + records.get(0) + "-" + records.get(records.size() - 1);
    }

    private String generateManifestFile(List<String> files) {
        StringBuilder s = new StringBuilder();
        s.append("{\n");
        s.append("\t\"entries\": [\n");
        for (String file : files) {
            s.append("\t\t{");
            s.append("\"url\":\"s3://");
            s.append(this.s3Bucket);
            s.append("/");
            s.append(file);
            s.append("\"");
            s.append(",");
            s.append("\"mandatory\":" + Boolean.toString(this.copyMandatory));
            s.append("},\n");
        }
        s.replace(s.length() - 2, s.length() - 1, "");
        s.append("\t]\n");
        s.append("}\n");
        return s.toString();
    }

    @Override
    public void shutdown() {
        this.s3Client.shutdown();
    }
}

