package com.alibaba.otter.node.etl.extract.extractor;

import com.alibaba.otter.node.etl.OtterConstants;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialect;
import com.alibaba.otter.node.etl.common.db.dialect.oracle.OracleDialect;
import com.alibaba.otter.node.etl.common.db.utils.SqlUtils;
import com.alibaba.otter.node.etl.extract.exceptions.ExtractException;
import com.alibaba.otter.shared.common.model.config.ConfigHelper;
import com.alibaba.otter.shared.common.model.config.data.ColumnPair;
import com.alibaba.otter.shared.common.model.config.data.DataMedia;
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.utils.thread.NamedThreadFactory;
import com.alibaba.otter.shared.etl.model.DbBatch;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventColumnIndexComparable;
import com.alibaba.otter.shared.etl.model.EventData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang.exception.NestableRuntimeException;
import org.apache.ddlutils.model.Column;
import org.apache.ddlutils.model.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/node/etl/extract/extractor/DatabaseExtractor.class */
public class DatabaseExtractor extends AbstractExtractor<DbBatch> implements InitializingBean, DisposableBean {
    private static final String WORKER_NAME = "DataBaseExtractor";
    private static final String WORKER_NAME_FORMAT = "pipelineId = %s , pipelineName = %s , DataBaseExtractor";
    private static final Logger logger = LoggerFactory.getLogger(DatabaseExtractor.class);
    private static final int DEFAULT_POOL_SIZE = 5;
    private static final int retryTimes = 3;
    private int poolSize = 5;
    private ExecutorService executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/otter/node/etl/extract/extractor/DatabaseExtractor$DataItem.class */
    public class DataItem {
        private EventData eventData;
        private boolean filter = false;

        public DataItem(EventData eventData) {
            this.eventData = eventData;
        }

        public EventData getEventData() {
            return this.eventData;
        }

        public void setEventData(EventData eventData) {
            this.eventData = eventData;
        }

        public boolean isFilter() {
            return this.filter;
        }

        public void setFilter(boolean z) {
            this.filter = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/otter/node/etl/extract/extractor/DatabaseExtractor$DatabaseExtractWorker.class */
    public class DatabaseExtractWorker implements Runnable {
        private String eventData_format;
        private Pipeline pipeline;
        private DataItem item;
        private EventData eventData;
        private final int event_default_capacity = 1024;
        private final String SEP = SystemUtils.LINE_SEPARATOR;

        public DatabaseExtractWorker(Pipeline pipeline, DataItem dataItem) {
            this.eventData_format = null;
            this.eventData_format = "-----------------" + this.SEP;
            this.eventData_format += "- PairId: {0} , TableId: {1} " + this.SEP;
            this.eventData_format += "-----------------" + this.SEP;
            this.eventData_format += "---START" + this.SEP;
            this.eventData_format += "---Pks" + this.SEP;
            this.eventData_format += "{2}" + this.SEP;
            this.eventData_format += "---Sql" + this.SEP;
            this.eventData_format += "{3}" + this.SEP;
            this.eventData_format += "---END" + this.SEP;
            this.pipeline = pipeline;
            this.item = dataItem;
            this.eventData = dataItem.getEventData();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(this.pipeline.getId()));
                Thread.currentThread().setName(String.format(DatabaseExtractor.WORKER_NAME_FORMAT, this.pipeline.getId(), this.pipeline.getName()));
                DataMedia findDataMedia = ConfigHelper.findDataMedia(this.pipeline, Long.valueOf(this.eventData.getTableId()));
                DbDialect dbDialect = DatabaseExtractor.this.dbDialectFactory.getDbDialect(this.pipeline.getId(), (DbMediaSource) findDataMedia.getSource());
                Table findTable = dbDialect.findTable(this.eventData.getSchemaName(), this.eventData.getTableName());
                TableData buildTableData = buildTableData(findTable, this.eventData.getKeys());
                if (dbDialect instanceof OracleDialect) {
                    buildTableData.columnTypes = getOraclePkTypes(findTable, buildTableData.columnNames);
                }
                TableData buildTableData2 = buildTableData(findTable, this.eventData.getUpdatedColumns(), (this.pipeline.getParameters().getSyncMode().isRow() || (this.eventData.getSyncMode() != null && this.eventData.getSyncMode().isRow())) | (CollectionUtils.isEmpty(this.eventData.getUpdatedColumns()) && findDataMedia.getSource().getType().isOracle()), buildMaxColumnsFromColumnPairs(ConfigHelper.findDataMediaPairByMediaId(this.pipeline, findDataMedia.getId()), this.eventData.getKeys()));
                if (buildTableData2.columnNames.length != 0) {
                    List<String> select = select(dbDialect, this.eventData.getSchemaName(), this.eventData.getTableName(), buildTableData, buildTableData2);
                    if (select == null) {
                        this.item.setFilter(this.eventData.isRemedy() || this.pipeline.getParameters().getSkipNoRow().booleanValue());
                        Iterator it = this.eventData.getOldKeys().iterator();
                        while (true) {
                            if (it.hasNext()) {
                                if (!((EventColumn) it.next()).equals(this.eventData.getKeys().get(0))) {
                                    this.item.setFilter(false);
                                    break;
                                }
                            } else {
                                break;
                            }
                        }
                    } else {
                        ArrayList arrayList = new ArrayList();
                        for (int i = 0; i < select.size(); i++) {
                            EventColumn eventColumn = new EventColumn();
                            eventColumn.setIndex(buildTableData2.indexs[i]);
                            eventColumn.setColumnName(buildTableData2.columnNames[i]);
                            eventColumn.setColumnType(buildTableData2.columnTypes[i]);
                            eventColumn.setNull(select.get(i) == null);
                            eventColumn.setColumnValue(select.get(i));
                            eventColumn.setUpdate(true);
                            arrayList.add(eventColumn);
                        }
                        for (EventColumn eventColumn2 : this.eventData.getColumns()) {
                            boolean z = false;
                            Iterator it2 = arrayList.iterator();
                            while (true) {
                                if (it2.hasNext()) {
                                    if (StringUtils.equalsIgnoreCase(((EventColumn) it2.next()).getColumnName(), eventColumn2.getColumnName())) {
                                        z = true;
                                        break;
                                    }
                                } else {
                                    break;
                                }
                            }
                            if (!z) {
                                arrayList.add(eventColumn2);
                            }
                        }
                        Collections.sort(arrayList, new EventColumnIndexComparable());
                        this.eventData.setColumns(arrayList);
                    }
                }
                Thread.currentThread().setName(DatabaseExtractor.WORKER_NAME);
                MDC.remove(OtterConstants.splitPipelineLogFileKey);
            } catch (InterruptedException e) {
                Thread.currentThread().setName(DatabaseExtractor.WORKER_NAME);
                MDC.remove(OtterConstants.splitPipelineLogFileKey);
            } catch (Throwable th) {
                Thread.currentThread().setName(DatabaseExtractor.WORKER_NAME);
                MDC.remove(OtterConstants.splitPipelineLogFileKey);
                throw th;
            }
        }

        private List<String> buildMaxColumnsFromColumnPairs(List<DataMediaPair> list, List<EventColumn> list2) {
            HashSet hashSet = new HashSet();
            HashMap hashMap = new HashMap(list2.size(), 1.0f);
            for (EventColumn eventColumn : list2) {
                hashMap.put(StringUtils.lowerCase(eventColumn.getColumnName()), eventColumn);
            }
            for (DataMediaPair dataMediaPair : list) {
                List<ColumnPair> columnPairs = dataMediaPair.getColumnPairs();
                if (CollectionUtils.isEmpty(columnPairs) || dataMediaPair.getColumnPairMode().isExclude()) {
                    return new ArrayList();
                }
                for (ColumnPair columnPair : columnPairs) {
                    if (!hashMap.containsKey(StringUtils.lowerCase(columnPair.getSourceColumn().getName()))) {
                        hashSet.add(columnPair.getSourceColumn().getName());
                    }
                }
            }
            return new ArrayList(hashSet);
        }

        private List<String> select(DbDialect dbDialect, String str, String str2, TableData tableData, TableData tableData2) throws InterruptedException {
            String selectSql = dbDialect.getSqlTemplate().getSelectSql(str, str2, tableData.columnNames, tableData2.columnNames);
            Exception exc = null;
            for (int i = 0; i < 3; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
                try {
                    List query = dbDialect.getJdbcTemplate().query(selectSql, tableData.columnValues, tableData.columnTypes, new RowDataMapper(tableData2.columnTypes));
                    if (!CollectionUtils.isEmpty(query)) {
                        return (List) query.get(0);
                    }
                    DatabaseExtractor.logger.warn("the mediaName = {}.{} not has rowdate in db \n {}", new Object[]{str, str2, dumpEventData(this.eventData, selectSql)});
                    return null;
                } catch (Exception e) {
                    exc = e;
                    DatabaseExtractor.logger.warn("retry [" + (i + 1) + "] failed", e);
                }
            }
            throw new RuntimeException("db extract failed , data:\n " + dumpEventData(this.eventData, selectSql), exc);
        }

        private int[] getOraclePkTypes(Table table, String[] strArr) {
            Column[] columns = table.getColumns();
            ArrayList arrayList = new ArrayList();
            for (String str : strArr) {
                for (Column column : columns) {
                    if (column.getName().equalsIgnoreCase(str)) {
                        arrayList.add(Integer.valueOf(column.getTypeCode()));
                    }
                }
            }
            int[] iArr = new int[arrayList.size()];
            for (int i = 0; i < iArr.length; i++) {
                iArr[i] = ((Integer) arrayList.get(i)).intValue();
            }
            return iArr;
        }

        private boolean checkNeedDbForRowMode(Table table, List<String> list, EventData eventData) {
            if (list.size() == 0) {
                return table.getColumnCount() != eventData.getColumns().size() + eventData.getKeys().size();
            }
            if (list.size() != eventData.getColumns().size()) {
                return true;
            }
            Iterator it = eventData.getColumns().iterator();
            while (it.hasNext()) {
                if (!list.contains(((EventColumn) it.next()).getColumnName())) {
                    return true;
                }
            }
            return false;
        }

        private TableData buildTableData(Table table, List<EventColumn> list) {
            Column[] columns = table.getColumns();
            TableData tableData = new TableData();
            tableData.indexs = new int[list.size()];
            tableData.columnNames = new String[list.size()];
            tableData.columnTypes = new int[list.size()];
            tableData.columnValues = new Object[list.size()];
            int i = 0;
            int i2 = 0;
            for (EventColumn eventColumn : list) {
                int length = columns.length;
                int i3 = 0;
                while (true) {
                    if (i3 < length) {
                        Column column = columns[i3];
                        if (StringUtils.equalsIgnoreCase(eventColumn.getColumnName(), column.getName())) {
                            tableData.indexs[i] = i2;
                            tableData.columnNames[i] = column.getName();
                            tableData.columnTypes[i] = column.getTypeCode();
                            tableData.columnValues[i] = SqlUtils.stringToSqlValue(eventColumn.getColumnValue(), column.getTypeCode(), column.isRequired(), false);
                            i++;
                            break;
                        }
                        i2++;
                        i3++;
                    }
                }
            }
            if (i != list.size()) {
                throw new ExtractException("keys is not found in table " + table.toString() + " keys : " + dumpEventColumn(list));
            }
            return tableData;
        }

        private TableData buildTableData(Table table, List<EventColumn> list, boolean z, List<String> list2) {
            Column[] columns = table.getColumns();
            ArrayList arrayList = new ArrayList();
            for (Column column : columns) {
                if (!column.isPrimaryKey()) {
                    arrayList.add(column);
                }
            }
            TableData tableData = new TableData();
            int size = list.size();
            if (z) {
                size = list2.size() != 0 ? list2.size() : arrayList.size();
            }
            tableData.indexs = new int[size];
            tableData.columnNames = new String[size];
            tableData.columnTypes = new int[size];
            tableData.columnValues = new Object[size];
            int i = 0;
            if (z) {
                int i2 = 0;
                if (list2.size() != 0) {
                    for (Column column2 : columns) {
                        if (list2.contains(column2.getName())) {
                            tableData.indexs[i] = i2;
                            tableData.columnNames[i] = column2.getName();
                            tableData.columnTypes[i] = column2.getTypeCode();
                            i++;
                        }
                        i2++;
                    }
                } else {
                    for (Column column3 : columns) {
                        if (!column3.isPrimaryKey()) {
                            tableData.indexs[i] = i2;
                            tableData.columnNames[i] = column3.getName();
                            tableData.columnTypes[i] = column3.getTypeCode();
                            i++;
                        }
                        i2++;
                    }
                }
            } else {
                for (EventColumn eventColumn : list) {
                    int i3 = 0;
                    int length = columns.length;
                    int i4 = 0;
                    while (true) {
                        if (i4 < length) {
                            Column column4 = columns[i4];
                            if (StringUtils.equalsIgnoreCase(eventColumn.getColumnName(), column4.getName())) {
                                tableData.indexs[i] = i3;
                                tableData.columnNames[i] = column4.getName();
                                tableData.columnTypes[i] = column4.getTypeCode();
                                i++;
                                break;
                            }
                            i3++;
                            i4++;
                        }
                    }
                }
                if (i != list.size()) {
                    throw new ExtractException("columns is not found in table " + table.toString() + " columns : " + dumpEventColumn(list));
                }
            }
            return tableData;
        }

        private String dumpEventData(EventData eventData, String str) {
            return MessageFormat.format(this.eventData_format, Long.valueOf(eventData.getPairId()), Long.valueOf(eventData.getTableId()), dumpEventColumn(eventData.getKeys()), "\t" + str);
        }

        private String dumpEventColumn(List<EventColumn> list) {
            StringBuilder sb = new StringBuilder(1024);
            int size = list.size();
            for (int i = 0; i < size; i++) {
                sb.append("\t").append(list.get(i).toString());
                if (i < list.size() - 1) {
                    sb.append(this.SEP);
                }
            }
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/otter/node/etl/extract/extractor/DatabaseExtractor$RowDataMapper.class */
    public class RowDataMapper implements RowMapper {
        private int[] columnTypes;

        public RowDataMapper(int[] iArr) {
            this.columnTypes = iArr;
        }

        public Object mapRow(ResultSet resultSet, int i) throws SQLException {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < this.columnTypes.length; i2++) {
                try {
                    arrayList.add(SqlUtils.sqlValueToString(resultSet, i2 + 1, this.columnTypes[i2]));
                } catch (Exception e) {
                    throw new ExtractException("ERROR ## get columnName has an error", e);
                }
            }
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/otter/node/etl/extract/extractor/DatabaseExtractor$TableData.class */
    public class TableData {
        int[] indexs;
        String[] columnNames;
        int[] columnTypes;
        Object[] columnValues;

        TableData() {
        }
    }

    @Override // com.alibaba.otter.node.etl.extract.extractor.OtterExtractor
    public void extract(DbBatch dbBatch) throws ExtractException {
        Assert.notNull(dbBatch);
        Assert.notNull(dbBatch.getRowBatch());
        Pipeline pipeline = getPipeline(Long.valueOf(dbBatch.getRowBatch().getIdentity().getPipelineId()));
        boolean isMedia = pipeline.getParameters().getSyncConsistency().isMedia();
        boolean isRow = pipeline.getParameters().getSyncMode().isRow();
        adjustPoolSize(pipeline.getParameters().getExtractPoolSize().intValue());
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.executor);
        NestableRuntimeException nestableRuntimeException = null;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        List<EventData> datas = dbBatch.getRowBatch().getDatas();
        for (EventData eventData : datas) {
            if (!eventData.getEventType().isDdl()) {
                DataItem dataItem = new DataItem(eventData);
                boolean z = isMedia || (eventData.getSyncConsistency() != null && eventData.getSyncConsistency().isMedia());
                if (!z && CollectionUtils.isEmpty(eventData.getUpdatedColumns()) && ConfigHelper.findDataMedia(pipeline, Long.valueOf(eventData.getTableId())).getSource().getType().isOracle()) {
                    z |= true;
                    eventData.setRemedy(true);
                }
                if (isRow && !z) {
                    z = checkNeedDbForRowMode(pipeline, eventData);
                }
                if (z && (eventData.getEventType().isInsert() || eventData.getEventType().isUpdate())) {
                    Future submit = executorCompletionService.submit(new DatabaseExtractWorker(pipeline, dataItem), null);
                    if (submit.isDone()) {
                        try {
                            submit.get();
                        } catch (InterruptedException e) {
                            cancel(arrayList2);
                            throw new ExtractException(e);
                        } catch (ExecutionException e2) {
                            cancel(arrayList2);
                            throw new ExtractException(e2);
                        }
                    }
                    arrayList2.add(submit);
                }
                arrayList.add(dataItem);
            }
        }
        int i = 0;
        while (i < arrayList2.size()) {
            try {
                executorCompletionService.take().get();
                i++;
            } catch (InterruptedException e3) {
                nestableRuntimeException = new ExtractException(e3);
            } catch (ExecutionException e4) {
                nestableRuntimeException = new ExtractException(e4);
            }
        }
        if (i < arrayList2.size()) {
            cancel(arrayList2);
            throw nestableRuntimeException;
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            DataItem dataItem2 = (DataItem) arrayList.get(i2);
            if (dataItem2.filter) {
                datas.remove(dataItem2.getEventData());
            }
        }
    }

    private boolean checkNeedDbForRowMode(Pipeline pipeline, EventData eventData) {
        return this.dbDialectFactory.getDbDialect(pipeline.getId(), (DbMediaSource) ConfigHelper.findDataMedia(pipeline, Long.valueOf(eventData.getTableId())).getSource()).findTable(eventData.getSchemaName(), eventData.getTableName()).getColumnCount() != eventData.getColumns().size() + eventData.getKeys().size();
    }

    private void cancel(List<Future> list) {
        for (int i = 0; i < list.size(); i++) {
            Future future = list.get(i);
            if (!future.isDone()) {
                future.cancel(true);
            }
        }
    }

    private void adjustPoolSize(int i) {
        if (i != this.poolSize) {
            this.poolSize = i;
            if (this.executor instanceof ThreadPoolExecutor) {
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.executor;
                threadPoolExecutor.setCorePoolSize(i);
                threadPoolExecutor.setMaximumPoolSize(i);
            }
        }
    }

    public void afterPropertiesSet() throws Exception {
        this.executor = new ThreadPoolExecutor(this.poolSize, this.poolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(this.poolSize * 4), new NamedThreadFactory(WORKER_NAME), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void destroy() throws Exception {
        this.executor.shutdownNow();
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }
}
