package org.apache.gobblin.publisher;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.jdbc.DataSourceBuilder;
import org.apache.gobblin.writer.commands.JdbcWriterCommands;
import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/publisher/JdbcPublisher.class */
public class JdbcPublisher extends DataPublisher {
    public static final String JDBC_PUBLISHER_PREFIX = "jdbc.publisher.";
    public static final String JDBC_PUBLISHER_DATABASE_NAME = "jdbc.publisher.database_name";
    public static final String JDBC_PUBLISHER_FINAL_TABLE_NAME = "jdbc.publisher.table_name";
    public static final String JDBC_PUBLISHER_REPLACE_FINAL_TABLE = "jdbc.publisher.replace_table";
    public static final String JDBC_PUBLISHER_USERNAME = "jdbc.publisher.username";
    public static final String JDBC_PUBLISHER_PASSWORD = "jdbc.publisher.password";
    public static final String JDBC_PUBLISHER_ENCRYPTION_KEY_LOC = "jdbc.publisher.encrypt_key_loc";
    public static final String JDBC_PUBLISHER_URL = "jdbc.publisher.url";
    public static final String JDBC_PUBLISHER_TIMEOUT = "jdbc.publisher.timeout";
    public static final String JDBC_PUBLISHER_DRIVER = "jdbc.publisher.driver";
    private static final Logger LOG = LoggerFactory.getLogger(JdbcPublisher.class);
    private final JdbcWriterCommandsFactory jdbcWriterCommandsFactory;

    @VisibleForTesting
    public JdbcPublisher(State state, JdbcWriterCommandsFactory jdbcWriterCommandsFactory) {
        super(state);
        this.jdbcWriterCommandsFactory = jdbcWriterCommandsFactory;
        validate(getState());
    }

    public JdbcPublisher(State state) {
        this(state, new JdbcWriterCommandsFactory());
        validate(getState());
    }

    private void validate(State state) {
        JobCommitPolicy commitPolicy = JobCommitPolicy.getCommitPolicy(getState().getProperties());
        if (JobCommitPolicy.COMMIT_ON_FULL_SUCCESS != commitPolicy) {
            throw new IllegalArgumentException(getClass().getSimpleName() + " won't publish as already commited by task. Job commit policy " + commitPolicy);
        }
        if (!state.getPropAsBoolean("publish.data.at.job.level", true)) {
            throw new IllegalArgumentException(getClass().getSimpleName() + " won't publish as publish.data.at.job.level is set as false");
        }
    }

    @VisibleForTesting
    public Connection createConnection() {
        try {
            return DataSourceBuilder.builder().url(this.state.getProp(JDBC_PUBLISHER_URL)).driver(this.state.getProp(JDBC_PUBLISHER_DRIVER)).userName(this.state.getProp(JDBC_PUBLISHER_USERNAME)).passWord(this.state.getProp(JDBC_PUBLISHER_PASSWORD)).cryptoKeyLocation(this.state.getProp(JDBC_PUBLISHER_ENCRYPTION_KEY_LOC)).maxActiveConnections(1).state(this.state).build().getConnection();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public void close() throws IOException {
    }

    public void initialize() throws IOException {
    }

    public void publishData(Collection<? extends WorkUnitState> collection) throws IOException {
        LOG.info("Start publishing data");
        int propAsInt = this.state.getPropAsInt("fork.branches", 1);
        HashSet newHashSet = Sets.newHashSet();
        Connection createConnection = createConnection();
        JdbcWriterCommands newInstance = this.jdbcWriterCommandsFactory.newInstance(this.state, createConnection);
        try {
            try {
                createConnection.setAutoCommit(false);
                for (int i = 0; i < propAsInt; i++) {
                    String prop = this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch(JDBC_PUBLISHER_FINAL_TABLE_NAME, propAsInt, i));
                    String prop2 = this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch(JDBC_PUBLISHER_DATABASE_NAME, propAsInt, i));
                    Preconditions.checkNotNull(prop);
                    if (this.state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch(JDBC_PUBLISHER_REPLACE_FINAL_TABLE, propAsInt, i), false) && !newHashSet.contains(prop)) {
                        LOG.info("Deleting table " + prop);
                        newInstance.deleteAll(prop2, prop);
                        newHashSet.add(prop);
                    }
                    for (Map.Entry<String, List<WorkUnitState>> entry : getStagingTables(collection, propAsInt, i).entrySet()) {
                        String key = entry.getKey();
                        LOG.info("Copying data from staging table " + key + " into destination table " + prop);
                        newInstance.copyTable(prop2, key, prop);
                        Iterator<WorkUnitState> it = entry.getValue().iterator();
                        while (it.hasNext()) {
                            it.next().setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                        }
                    }
                }
                LOG.info("Commit publish data");
                createConnection.commit();
                try {
                    createConnection.close();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                try {
                    createConnection.close();
                    throw th;
                } catch (SQLException e2) {
                    throw new RuntimeException(e2);
                }
            }
        } catch (Exception e3) {
            try {
                LOG.error("Failed publishing. Rolling back.");
                createConnection.rollback();
            } catch (SQLException e4) {
                LOG.error("Failed rolling back.", e4);
            }
            throw new RuntimeException("Failed publishing", e3);
        }
    }

    private static Map<String, List<WorkUnitState>> getStagingTables(Collection<? extends WorkUnitState> collection, int i, int i2) {
        HashMap newHashMap = Maps.newHashMap();
        for (WorkUnitState workUnitState : collection) {
            String str = (String) Preconditions.checkNotNull(workUnitState.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.staging.table", i, i2)));
            List list = (List) newHashMap.get(str);
            if (list == null) {
                list = Lists.newArrayList();
                newHashMap.put(str, list);
            }
            list.add(workUnitState);
        }
        return newHashMap;
    }

    public void publishMetadata(Collection<? extends WorkUnitState> collection) throws IOException {
    }
}
