/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.writer.initializer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.jdbc.DataSourceBuilder;
import org.apache.gobblin.writer.Destination;
import org.apache.gobblin.writer.commands.JdbcWriterCommands;
import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
import org.apache.gobblin.writer.initializer.WriterInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcWriterInitializer
implements WriterInitializer {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterInitializer.class);
    private static final String STAGING_TABLE_FORMAT = "stage_%d";
    private static final int NAMING_STAGING_TABLE_TRIAL = 10;
    private static final Random RANDOM = new Random();
    private final int branches;
    private final int branchId;
    private final State state;
    private final Collection<WorkUnit> workUnits;
    private final JdbcWriterCommandsFactory jdbcWriterCommandsFactory;
    private final String database;
    private String userCreatedStagingTable;
    private Set<String> createdStagingTables;

    public JdbcWriterInitializer(State state, Collection<WorkUnit> workUnits) {
        this(state, workUnits, new JdbcWriterCommandsFactory(), 1, 0);
    }

    public JdbcWriterInitializer(State state, Collection<WorkUnit> workUnits, JdbcWriterCommandsFactory jdbcWriterCommandsFactory, int branches, int branchId) {
        JdbcWriterInitializer.validateInput(state);
        this.state = state;
        this.workUnits = Lists.newArrayList(workUnits);
        this.branches = branches;
        this.branchId = branchId;
        this.jdbcWriterCommandsFactory = jdbcWriterCommandsFactory;
        this.database = JdbcWriterInitializer.getProp(this.state, "jdbc.publisher.database_name", this.branches, this.branchId);
        this.createdStagingTables = Sets.newHashSet();
        state.setProp("cleanup.staging.data.by.initializer", (Object)Boolean.toString(true));
    }

    public void close() {
        LOG.info("Closing " + this.getClass().getSimpleName());
        try (Connection conn = this.createConnection();){
            JdbcWriterCommands commands = this.createJdbcWriterCommands(conn);
            if (!this.createdStagingTables.isEmpty()) {
                for (String stagingTable : this.createdStagingTables) {
                    LOG.info("Dropping staging table " + this.createdStagingTables);
                    commands.drop(this.database, stagingTable);
                }
            }
            if (this.userCreatedStagingTable != null) {
                LOG.info("Truncating staging table " + this.userCreatedStagingTable);
                commands.truncate(this.database, this.userCreatedStagingTable);
            }
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed to close", e);
        }
    }

    @VisibleForTesting
    public Connection createConnection() throws SQLException {
        DataSource dataSource = DataSourceBuilder.builder().url(this.state.getProp("jdbc.publisher.url")).driver(this.state.getProp("jdbc.publisher.driver")).userName(this.state.getProp("jdbc.publisher.username")).passWord(this.state.getProp("jdbc.publisher.password")).cryptoKeyLocation(this.state.getProp("jdbc.publisher.encrypt_key_loc")).maxActiveConnections(1).state(this.state).build();
        return dataSource.getConnection();
    }

    private String createStagingTable(Connection conn, JdbcWriterCommands commands) throws SQLException {
        String destTableKey = ForkOperatorUtils.getPropertyNameForBranch((String)"jdbc.publisher.table_name", (int)this.branches, (int)this.branchId);
        String destinationTable = this.state.getProp(destTableKey);
        if (StringUtils.isEmpty((CharSequence)destinationTable)) {
            throw new IllegalArgumentException("jdbc.publisher.table_name is required for " + this.getClass().getSimpleName() + " for branch " + this.branchId);
        }
        String stagingTable = null;
        for (int i = 0; i < 10; ++i) {
            String tmp = String.format(STAGING_TABLE_FORMAT, System.nanoTime());
            LOG.info("Check if staging table " + tmp + " exists.");
            ResultSet res = conn.getMetaData().getTables(null, this.database, tmp, new String[]{"TABLE"});
            if (!res.next()) {
                LOG.info("Staging table " + tmp + " does not exist. Creating.");
                try {
                    commands.createTableStructure(this.database, destinationTable, tmp);
                    LOG.info("Test if staging table can be dropped. Test by dropping and Creating staging table.");
                    commands.drop(this.database, tmp);
                    commands.createTableStructure(this.database, destinationTable, tmp);
                    stagingTable = tmp;
                    break;
                }
                catch (SQLException e) {
                    LOG.warn("Failed to create table. Retrying up to 10 times", (Throwable)e);
                }
            } else {
                LOG.info("Staging table " + tmp + " exists.");
            }
            try {
                TimeUnit.MILLISECONDS.sleep(RANDOM.nextInt(1000));
                continue;
            }
            catch (InterruptedException e) {
                LOG.info("Sleep has been interrupted.", (Throwable)e);
            }
        }
        if (!StringUtils.isEmpty(stagingTable)) {
            return stagingTable;
        }
        throw new RuntimeException("Failed to create staging table");
    }

    private static String getProp(State state, String key, int branches, int branchId) {
        String forkedKey = ForkOperatorUtils.getPropertyNameForBranch((String)key, (int)branches, (int)branchId);
        return state.getProp(forkedKey);
    }

    private static boolean getPropAsBoolean(State state, String key, int branches, int branchId) {
        return Boolean.parseBoolean(JdbcWriterInitializer.getProp(state, key, branches, branchId));
    }

    public void initialize() {
        try (Connection conn = this.createConnection();){
            boolean isSkipStaging;
            JdbcWriterCommands commands = this.createJdbcWriterCommands(conn);
            JobCommitPolicy jobCommitPolicy = JobCommitPolicy.getCommitPolicy((State)this.state);
            boolean bl = isSkipStaging = !JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.equals((Object)jobCommitPolicy);
            if (isSkipStaging) {
                LOG.info("Writer will write directly to destination table as JobCommitPolicy is " + jobCommitPolicy);
            }
            String publishTable = JdbcWriterInitializer.getProp(this.state, "jdbc.publisher.table_name", this.branches, this.branchId);
            String stagingTableKey = ForkOperatorUtils.getPropertyNameForBranch((String)"writer.staging.table", (int)this.branches, (int)this.branchId);
            String stagingTable = this.state.getProp(stagingTableKey);
            int i = -1;
            for (WorkUnit wu : this.workUnits) {
                ++i;
                if (isSkipStaging) {
                    LOG.info("User chose to skip staing table on branch " + this.branchId + " workunit " + i);
                    wu.setProp(stagingTableKey, (Object)publishTable);
                    if (i != 0 || !JdbcWriterInitializer.getPropAsBoolean(this.state, "jdbc.publisher.replace_table", this.branches, this.branchId)) continue;
                    LOG.info("User chose to replace final table " + publishTable + " on branch " + this.branchId + " workunit " + i);
                    commands.truncate(this.database, publishTable);
                    continue;
                }
                if (!StringUtils.isEmpty((CharSequence)stagingTable)) {
                    LOG.info("Staging table for branch " + this.branchId + " from user: " + stagingTable);
                    wu.setProp(stagingTableKey, (Object)stagingTable);
                    if (i != 0) continue;
                    if (this.state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch((String)"writer.truncate.staging.table", (int)this.branches, (int)this.branchId), false)) {
                        LOG.info("Truncating staging table " + stagingTable + " as requested.");
                        commands.truncate(this.database, stagingTable);
                    }
                    if (!commands.isEmpty(this.database, stagingTable)) {
                        LOG.error("Staging table " + stagingTable + " is not empty. Failing.");
                        throw new IllegalArgumentException("Staging table " + stagingTable + " should be empty.");
                    }
                    this.userCreatedStagingTable = stagingTable;
                    continue;
                }
                LOG.info("Staging table has not been passed from user for branch " + this.branchId + " workunit " + i + " . Creating.");
                String createdStagingTable = this.createStagingTable(conn, commands);
                wu.setProp(stagingTableKey, (Object)createdStagingTable);
                this.createdStagingTables.add(createdStagingTable);
                LOG.info("Staging table " + createdStagingTable + " has been created for branchId " + this.branchId + " workunit " + i);
            }
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed with SQL", e);
        }
    }

    private JdbcWriterCommands createJdbcWriterCommands(Connection conn) {
        String destKey = ForkOperatorUtils.getPropertyNameForBranch((String)"writer.destination.type", (int)this.branches, (int)this.branchId);
        String destType = (String)Preconditions.checkNotNull((Object)this.state.getProp(destKey), (Object)(destKey + " is required for underlying JDBC product name"));
        Destination dest = Destination.of((Destination.DestinationType)Destination.DestinationType.valueOf((String)destType.toUpperCase()), (State)this.state);
        return this.jdbcWriterCommandsFactory.newInstance(dest, conn);
    }

    private static void validateInput(State state) {
        int branches = state.getPropAsInt("fork.branches", 1);
        HashSet publishTables = Sets.newHashSet();
        for (int branchId = 0; branchId < branches; ++branchId) {
            String publishTable = (String)Preconditions.checkNotNull((Object)JdbcWriterInitializer.getProp(state, "jdbc.publisher.table_name", branches, branchId), (Object)"jdbc.publisher.table_name should not be null.");
            if (publishTables.contains(publishTable)) {
                throw new IllegalArgumentException("Duplicate jdbc.publisher.table_name is not allowed across branches");
            }
            publishTables.add(publishTable);
        }
        HashSet stagingTables = Sets.newHashSet();
        for (int branchId = 0; branchId < branches; ++branchId) {
            String stagingTable = JdbcWriterInitializer.getProp(state, "writer.staging.table", branches, branchId);
            if (!StringUtils.isEmpty((CharSequence)stagingTable) && stagingTables.contains(stagingTable)) {
                throw new IllegalArgumentException("Duplicate writer.staging.table is not allowed across branches");
            }
            stagingTables.add(stagingTable);
        }
        JobCommitPolicy policy = JobCommitPolicy.getCommitPolicy((State)state);
        boolean isPublishJobLevel = state.getPropAsBoolean("publish.data.at.job.level", true);
        if (JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.equals((Object)policy) ^ isPublishJobLevel) {
            throw new IllegalArgumentException("Job commit policy should be only " + JobCommitPolicy.COMMIT_ON_FULL_SUCCESS + " when " + "publish.data.at.job.level" + " is true. Or Job commit policy should not be " + JobCommitPolicy.COMMIT_ON_FULL_SUCCESS + " and " + "publish.data.at.job.level" + " is false.");
        }
    }

    public String toString() {
        return "JdbcWriterInitializer(branches=" + this.branches + ", branchId=" + this.branchId + ", state=" + this.state + ", workUnits=" + this.workUnits + ", jdbcWriterCommandsFactory=" + this.jdbcWriterCommandsFactory + ", database=" + this.database + ", userCreatedStagingTable=" + this.userCreatedStagingTable + ", createdStagingTables=" + this.createdStagingTables + ")";
    }
}

