package org.apache.gobblin.writer.initializer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.publisher.JdbcPublisher;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.jdbc.DataSourceBuilder;
import org.apache.gobblin.writer.Destination;
import org.apache.gobblin.writer.commands.JdbcWriterCommands;
import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/writer/initializer/JdbcWriterInitializer.class */
public class JdbcWriterInitializer implements WriterInitializer {
    private static final String STAGING_TABLE_FORMAT = "stage_%d";
    private static final int NAMING_STAGING_TABLE_TRIAL = 10;
    private final int branches;
    private final int branchId;
    private final State state;
    private final Collection<WorkUnit> workUnits;
    private final JdbcWriterCommandsFactory jdbcWriterCommandsFactory;
    private final String database;
    private String userCreatedStagingTable;
    private Set<String> createdStagingTables;
    private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterInitializer.class);
    private static final Random RANDOM = new Random();

    public JdbcWriterInitializer(State state, Collection<WorkUnit> collection) {
        this(state, collection, new JdbcWriterCommandsFactory(), 1, 0);
    }

    public JdbcWriterInitializer(State state, Collection<WorkUnit> collection, JdbcWriterCommandsFactory jdbcWriterCommandsFactory, int i, int i2) {
        validateInput(state);
        this.state = state;
        this.workUnits = Lists.newArrayList(collection);
        this.branches = i;
        this.branchId = i2;
        this.jdbcWriterCommandsFactory = jdbcWriterCommandsFactory;
        this.database = getProp(this.state, JdbcPublisher.JDBC_PUBLISHER_DATABASE_NAME, this.branches, this.branchId);
        this.createdStagingTables = Sets.newHashSet();
        state.setProp("cleanup.staging.data.by.initializer", Boolean.toString(true));
    }

    /* JADX WARN: Finally extract failed */
    public void close() {
        LOG.info("Closing " + getClass().getSimpleName());
        try {
            Connection createConnection = createConnection();
            Throwable th = null;
            try {
                JdbcWriterCommands createJdbcWriterCommands = createJdbcWriterCommands(createConnection);
                if (!this.createdStagingTables.isEmpty()) {
                    for (String str : this.createdStagingTables) {
                        LOG.info("Dropping staging table " + this.createdStagingTables);
                        createJdbcWriterCommands.drop(this.database, str);
                    }
                }
                if (this.userCreatedStagingTable != null) {
                    LOG.info("Truncating staging table " + this.userCreatedStagingTable);
                    createJdbcWriterCommands.truncate(this.database, this.userCreatedStagingTable);
                }
                if (createConnection != null) {
                    if (0 != 0) {
                        try {
                            createConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConnection.close();
                    }
                }
            } catch (Throwable th3) {
                if (createConnection != null) {
                    if (0 != 0) {
                        try {
                            createConnection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createConnection.close();
                    }
                }
                throw th3;
            }
        } catch (SQLException e) {
            throw new RuntimeException("Failed to close", e);
        }
    }

    @VisibleForTesting
    public Connection createConnection() throws SQLException {
        return DataSourceBuilder.builder().url(this.state.getProp(JdbcPublisher.JDBC_PUBLISHER_URL)).driver(this.state.getProp(JdbcPublisher.JDBC_PUBLISHER_DRIVER)).userName(this.state.getProp(JdbcPublisher.JDBC_PUBLISHER_USERNAME)).passWord(this.state.getProp(JdbcPublisher.JDBC_PUBLISHER_PASSWORD)).cryptoKeyLocation(this.state.getProp(JdbcPublisher.JDBC_PUBLISHER_ENCRYPTION_KEY_LOC)).maxActiveConnections(1).state(this.state).build().getConnection();
    }

    private String createStagingTable(Connection connection, JdbcWriterCommands jdbcWriterCommands) throws SQLException {
        String prop = this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch(JdbcPublisher.JDBC_PUBLISHER_FINAL_TABLE_NAME, this.branches, this.branchId));
        if (StringUtils.isEmpty(prop)) {
            throw new IllegalArgumentException("jdbc.publisher.table_name is required for " + getClass().getSimpleName() + " for branch " + this.branchId);
        }
        String str = null;
        for (int i = 0; i < NAMING_STAGING_TABLE_TRIAL; i++) {
            String format = String.format(STAGING_TABLE_FORMAT, Long.valueOf(System.nanoTime()));
            LOG.info("Check if staging table " + format + " exists.");
            if (connection.getMetaData().getTables(null, this.database, format, new String[]{"TABLE"}).next()) {
                LOG.info("Staging table " + format + " exists.");
            } else {
                LOG.info("Staging table " + format + " does not exist. Creating.");
                try {
                    jdbcWriterCommands.createTableStructure(this.database, prop, format);
                    LOG.info("Test if staging table can be dropped. Test by dropping and Creating staging table.");
                    jdbcWriterCommands.drop(this.database, format);
                    jdbcWriterCommands.createTableStructure(this.database, prop, format);
                    str = format;
                    break;
                } catch (SQLException e) {
                    LOG.warn("Failed to create table. Retrying up to 10 times", e);
                }
            }
            try {
                TimeUnit.MILLISECONDS.sleep(RANDOM.nextInt(1000));
            } catch (InterruptedException e2) {
                LOG.info("Sleep has been interrupted.", e2);
            }
        }
        if (StringUtils.isEmpty(str)) {
            throw new RuntimeException("Failed to create staging table");
        }
        return str;
    }

    private static String getProp(State state, String str, int i, int i2) {
        return state.getProp(ForkOperatorUtils.getPropertyNameForBranch(str, i, i2));
    }

    private static boolean getPropAsBoolean(State state, String str, int i, int i2) {
        return Boolean.parseBoolean(getProp(state, str, i, i2));
    }

    /* JADX WARN: Finally extract failed */
    public void initialize() {
        try {
            Connection createConnection = createConnection();
            Throwable th = null;
            try {
                JdbcWriterCommands createJdbcWriterCommands = createJdbcWriterCommands(createConnection);
                JobCommitPolicy commitPolicy = JobCommitPolicy.getCommitPolicy(this.state);
                boolean z = !JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.equals(commitPolicy);
                if (z) {
                    LOG.info("Writer will write directly to destination table as JobCommitPolicy is " + commitPolicy);
                }
                String prop = getProp(this.state, JdbcPublisher.JDBC_PUBLISHER_FINAL_TABLE_NAME, this.branches, this.branchId);
                String propertyNameForBranch = ForkOperatorUtils.getPropertyNameForBranch("writer.staging.table", this.branches, this.branchId);
                String prop2 = this.state.getProp(propertyNameForBranch);
                int i = -1;
                for (WorkUnit workUnit : this.workUnits) {
                    i++;
                    if (z) {
                        LOG.info("User chose to skip staing table on branch " + this.branchId + " workunit " + i);
                        workUnit.setProp(propertyNameForBranch, prop);
                        if (i == 0 && getPropAsBoolean(this.state, JdbcPublisher.JDBC_PUBLISHER_REPLACE_FINAL_TABLE, this.branches, this.branchId)) {
                            LOG.info("User chose to replace final table " + prop + " on branch " + this.branchId + " workunit " + i);
                            createJdbcWriterCommands.truncate(this.database, prop);
                        }
                    } else if (StringUtils.isEmpty(prop2)) {
                        LOG.info("Staging table has not been passed from user for branch " + this.branchId + " workunit " + i + " . Creating.");
                        String createStagingTable = createStagingTable(createConnection, createJdbcWriterCommands);
                        workUnit.setProp(propertyNameForBranch, createStagingTable);
                        this.createdStagingTables.add(createStagingTable);
                        LOG.info("Staging table " + createStagingTable + " has been created for branchId " + this.branchId + " workunit " + i);
                    } else {
                        LOG.info("Staging table for branch " + this.branchId + " from user: " + prop2);
                        workUnit.setProp(propertyNameForBranch, prop2);
                        if (i != 0) {
                            continue;
                        } else {
                            if (this.state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch("writer.truncate.staging.table", this.branches, this.branchId), false)) {
                                LOG.info("Truncating staging table " + prop2 + " as requested.");
                                createJdbcWriterCommands.truncate(this.database, prop2);
                            }
                            if (!createJdbcWriterCommands.isEmpty(this.database, prop2)) {
                                LOG.error("Staging table " + prop2 + " is not empty. Failing.");
                                throw new IllegalArgumentException("Staging table " + prop2 + " should be empty.");
                            }
                            this.userCreatedStagingTable = prop2;
                        }
                    }
                }
                if (createConnection != null) {
                    if (0 != 0) {
                        try {
                            createConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConnection.close();
                    }
                }
            } catch (Throwable th3) {
                if (createConnection != null) {
                    if (0 != 0) {
                        try {
                            createConnection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createConnection.close();
                    }
                }
                throw th3;
            }
        } catch (SQLException e) {
            throw new RuntimeException("Failed with SQL", e);
        }
    }

    private JdbcWriterCommands createJdbcWriterCommands(Connection connection) {
        String propertyNameForBranch = ForkOperatorUtils.getPropertyNameForBranch("writer.destination.type", this.branches, this.branchId);
        return this.jdbcWriterCommandsFactory.newInstance(Destination.of(Destination.DestinationType.valueOf(((String) Preconditions.checkNotNull(this.state.getProp(propertyNameForBranch), propertyNameForBranch + " is required for underlying JDBC product name")).toUpperCase()), this.state), connection);
    }

    private static void validateInput(State state) {
        int propAsInt = state.getPropAsInt("fork.branches", 1);
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < propAsInt; i++) {
            String str = (String) Preconditions.checkNotNull(getProp(state, JdbcPublisher.JDBC_PUBLISHER_FINAL_TABLE_NAME, propAsInt, i), "jdbc.publisher.table_name should not be null.");
            if (newHashSet.contains(str)) {
                throw new IllegalArgumentException("Duplicate jdbc.publisher.table_name is not allowed across branches");
            }
            newHashSet.add(str);
        }
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i2 = 0; i2 < propAsInt; i2++) {
            String prop = getProp(state, "writer.staging.table", propAsInt, i2);
            if (!StringUtils.isEmpty(prop) && newHashSet2.contains(prop)) {
                throw new IllegalArgumentException("Duplicate writer.staging.table is not allowed across branches");
            }
            newHashSet2.add(prop);
        }
        if (JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.equals(JobCommitPolicy.getCommitPolicy(state)) ^ state.getPropAsBoolean("publish.data.at.job.level", true)) {
            throw new IllegalArgumentException("Job commit policy should be only " + JobCommitPolicy.COMMIT_ON_FULL_SUCCESS + " when publish.data.at.job.level is true. Or Job commit policy should not be " + JobCommitPolicy.COMMIT_ON_FULL_SUCCESS + " and publish.data.at.job.level is false.");
        }
    }

    public String toString() {
        return "JdbcWriterInitializer(branches=" + this.branches + ", branchId=" + this.branchId + ", state=" + this.state + ", workUnits=" + this.workUnits + ", jdbcWriterCommandsFactory=" + this.jdbcWriterCommandsFactory + ", database=" + this.database + ", userCreatedStagingTable=" + this.userCreatedStagingTable + ", createdStagingTables=" + this.createdStagingTables + ")";
    }
}
