/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.extract.extractor;

import com.alibaba.otter.node.etl.common.db.dialect.DbDialect;
import com.alibaba.otter.node.etl.common.db.dialect.oracle.OracleDialect;
import com.alibaba.otter.node.etl.common.db.utils.SqlUtils;
import com.alibaba.otter.node.etl.extract.exceptions.ExtractException;
import com.alibaba.otter.node.etl.extract.extractor.AbstractExtractor;
import com.alibaba.otter.shared.common.model.config.ConfigHelper;
import com.alibaba.otter.shared.common.model.config.data.ColumnPair;
import com.alibaba.otter.shared.common.model.config.data.DataMedia;
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.utils.thread.NamedThreadFactory;
import com.alibaba.otter.shared.etl.model.DbBatch;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventColumnIndexComparable;
import com.alibaba.otter.shared.etl.model.EventData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.ddlutils.model.Column;
import org.apache.ddlutils.model.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public class DatabaseExtractor
extends AbstractExtractor<DbBatch>
implements InitializingBean,
DisposableBean {
    private static final String WORKER_NAME = "DataBaseExtractor";
    private static final String WORKER_NAME_FORMAT = "pipelineId = %s , pipelineName = %s , DataBaseExtractor";
    private static final Logger logger = LoggerFactory.getLogger(DatabaseExtractor.class);
    private static final int DEFAULT_POOL_SIZE = 5;
    private static final int retryTimes = 3;
    private int poolSize = 5;
    private ExecutorService executor;

    @Override
    public void extract(DbBatch dbBatch) throws ExtractException {
        int index;
        DataItem item;
        Assert.notNull((Object)dbBatch);
        Assert.notNull((Object)dbBatch.getRowBatch());
        Pipeline pipeline = this.getPipeline(dbBatch.getRowBatch().getIdentity().getPipelineId());
        boolean mustDb = pipeline.getParameters().getSyncConsistency().isMedia();
        boolean isRow = pipeline.getParameters().getSyncMode().isRow();
        this.adjustPoolSize(pipeline.getParameters().getExtractPoolSize());
        ExecutorCompletionService completionService = new ExecutorCompletionService(this.executor);
        ExtractException exception = null;
        ArrayList<DataItem> items = new ArrayList<DataItem>();
        ArrayList<Future> futures = new ArrayList<Future>();
        List eventDatas = dbBatch.getRowBatch().getDatas();
        for (EventData eventData : eventDatas) {
            DataMedia dataMedia;
            boolean flag;
            if (eventData.getEventType().isDdl()) continue;
            item = new DataItem(eventData);
            boolean bl = flag = mustDb || eventData.getSyncConsistency() != null && eventData.getSyncConsistency().isMedia();
            if (!flag && CollectionUtils.isEmpty((Collection)eventData.getUpdatedColumns()) && (dataMedia = ConfigHelper.findDataMedia((Pipeline)pipeline, (Long)eventData.getTableId())).getSource().getType().isOracle()) {
                flag |= true;
                eventData.setRemedy(true);
            }
            if (isRow && !flag) {
                flag = this.checkNeedDbForRowMode(pipeline, eventData);
            }
            if (flag && (eventData.getEventType().isInsert() || eventData.getEventType().isUpdate())) {
                Future<Object> future = completionService.submit(new DatabaseExtractWorker(pipeline, item), null);
                if (future.isDone()) {
                    try {
                        future.get();
                    }
                    catch (InterruptedException e) {
                        this.cancel(futures);
                        throw new ExtractException(e);
                    }
                    catch (ExecutionException e) {
                        this.cancel(futures);
                        throw new ExtractException(e);
                    }
                }
                futures.add(future);
            }
            items.add(item);
        }
        for (index = 0; index < futures.size(); ++index) {
            try {
                Future future = completionService.take();
                future.get();
                continue;
            }
            catch (InterruptedException e) {
                exception = new ExtractException(e);
                break;
            }
            catch (ExecutionException e) {
                exception = new ExtractException(e);
                break;
            }
        }
        if (index < futures.size()) {
            this.cancel(futures);
            throw exception;
        }
        for (int i = 0; i < items.size(); ++i) {
            item = (DataItem)items.get(i);
            if (!item.filter) continue;
            eventDatas.remove(item.getEventData());
        }
    }

    private boolean checkNeedDbForRowMode(Pipeline pipeline, EventData eventData) {
        DataMedia dataMedia = ConfigHelper.findDataMedia((Pipeline)pipeline, (Long)eventData.getTableId());
        DbDialect dbDialect = this.dbDialectFactory.getDbDialect(pipeline.getId(), (DbMediaSource)dataMedia.getSource());
        Table table = dbDialect.findTable(eventData.getSchemaName(), eventData.getTableName());
        return table.getColumnCount() != eventData.getColumns().size() + eventData.getKeys().size();
    }

    private void cancel(List<Future> futures) {
        for (int i = 0; i < futures.size(); ++i) {
            Future future = futures.get(i);
            if (future.isDone()) continue;
            future.cancel(true);
        }
    }

    private void adjustPoolSize(int newPoolSize) {
        if (newPoolSize != this.poolSize) {
            this.poolSize = newPoolSize;
            if (this.executor instanceof ThreadPoolExecutor) {
                ThreadPoolExecutor pool = (ThreadPoolExecutor)this.executor;
                pool.setCorePoolSize(newPoolSize);
                pool.setMaximumPoolSize(newPoolSize);
            }
        }
    }

    public void afterPropertiesSet() throws Exception {
        this.executor = new ThreadPoolExecutor(this.poolSize, this.poolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(this.poolSize * 4), (ThreadFactory)new NamedThreadFactory(WORKER_NAME), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void destroy() throws Exception {
        this.executor.shutdownNow();
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    class RowDataMapper
    implements RowMapper {
        private int[] columnTypes;

        public RowDataMapper(int[] columnTypes) {
            this.columnTypes = columnTypes;
        }

        public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
            ArrayList<String> result = new ArrayList<String>();
            for (int i = 0; i < this.columnTypes.length; ++i) {
                try {
                    String value = SqlUtils.sqlValueToString(rs, i + 1, this.columnTypes[i]);
                    result.add(value);
                    continue;
                }
                catch (Exception e) {
                    throw new ExtractException("ERROR ## get columnName has an error", e);
                }
            }
            return result;
        }
    }

    class TableData {
        int[] indexs;
        String[] columnNames;
        int[] columnTypes;
        Object[] columnValues;

        TableData() {
        }
    }

    class DatabaseExtractWorker
    implements Runnable {
        private final int event_default_capacity = 1024;
        private String eventData_format = null;
        private final String SEP = SystemUtils.LINE_SEPARATOR;
        private Pipeline pipeline;
        private DataItem item;
        private EventData eventData;

        public DatabaseExtractWorker(Pipeline pipeline, DataItem item) {
            this.eventData_format = "-----------------" + this.SEP;
            this.eventData_format = this.eventData_format + "- PairId: {0} , TableId: {1} " + this.SEP;
            this.eventData_format = this.eventData_format + "-----------------" + this.SEP;
            this.eventData_format = this.eventData_format + "---START" + this.SEP;
            this.eventData_format = this.eventData_format + "---Pks" + this.SEP;
            this.eventData_format = this.eventData_format + "{2}" + this.SEP;
            this.eventData_format = this.eventData_format + "---Sql" + this.SEP;
            this.eventData_format = this.eventData_format + "{3}" + this.SEP;
            this.eventData_format = this.eventData_format + "---END" + this.SEP;
            this.pipeline = pipeline;
            this.item = item;
            this.eventData = item.getEventData();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block11: {
                try {
                    MDC.put((String)"otter", (String)String.valueOf(this.pipeline.getId()));
                    Thread.currentThread().setName(String.format(DatabaseExtractor.WORKER_NAME_FORMAT, this.pipeline.getId(), this.pipeline.getName()));
                    DataMedia dataMedia = ConfigHelper.findDataMedia((Pipeline)this.pipeline, (Long)this.eventData.getTableId());
                    DbDialect dbDialect = DatabaseExtractor.this.dbDialectFactory.getDbDialect(this.pipeline.getId(), (DbMediaSource)dataMedia.getSource());
                    Table table = dbDialect.findTable(this.eventData.getSchemaName(), this.eventData.getTableName());
                    TableData keyTableData = this.buildTableData(table, this.eventData.getKeys());
                    if (dbDialect instanceof OracleDialect) {
                        keyTableData.columnTypes = this.getOraclePkTypes(table, keyTableData.columnNames);
                    }
                    boolean needAll = this.pipeline.getParameters().getSyncMode().isRow() || this.eventData.getSyncMode() != null && this.eventData.getSyncMode().isRow();
                    boolean bl = CollectionUtils.isEmpty((Collection)this.eventData.getUpdatedColumns()) && dataMedia.getSource().getType().isOracle();
                    List mediaParis = ConfigHelper.findDataMediaPairByMediaId((Pipeline)this.pipeline, (Long)dataMedia.getId());
                    List<String> viewColumnNames = this.buildMaxColumnsFromColumnPairs(mediaParis, this.eventData.getKeys());
                    TableData columnTableData = this.buildTableData(table, this.eventData.getUpdatedColumns(), needAll |= bl, viewColumnNames);
                    if (columnTableData.columnNames.length == 0) {
                        break block11;
                    }
                    List<String> newColumnValues = this.select(dbDialect, this.eventData.getSchemaName(), this.eventData.getTableName(), keyTableData, columnTableData);
                    if (newColumnValues == null) {
                        boolean needFilter = this.eventData.isRemedy() || this.pipeline.getParameters().getSkipNoRow() != false;
                        this.item.setFilter(needFilter);
                        int index = 0;
                        for (EventColumn oldKey : this.eventData.getOldKeys()) {
                            if (oldKey.equals(this.eventData.getKeys().get(index))) continue;
                            this.item.setFilter(false);
                            break block11;
                        }
                        break block11;
                    }
                    ArrayList<EventColumn> newEventColumns = new ArrayList<EventColumn>();
                    for (int i = 0; i < newColumnValues.size(); ++i) {
                        EventColumn column = new EventColumn();
                        column.setIndex(columnTableData.indexs[i]);
                        column.setColumnName(columnTableData.columnNames[i]);
                        column.setColumnType(columnTableData.columnTypes[i]);
                        column.setNull(newColumnValues.get(i) == null);
                        column.setColumnValue(newColumnValues.get(i));
                        column.setUpdate(true);
                        newEventColumns.add(column);
                    }
                    for (EventColumn column : this.eventData.getColumns()) {
                        boolean override = false;
                        for (EventColumn newEventColumn : newEventColumns) {
                            if (!StringUtils.equalsIgnoreCase((String)newEventColumn.getColumnName(), (String)column.getColumnName())) continue;
                            override = true;
                            break;
                        }
                        if (override) continue;
                        newEventColumns.add(column);
                    }
                    Collections.sort(newEventColumns, new EventColumnIndexComparable());
                    this.eventData.setColumns(newEventColumns);
                }
                catch (InterruptedException interruptedException) {
                }
                finally {
                    Thread.currentThread().setName(DatabaseExtractor.WORKER_NAME);
                    MDC.remove((String)"otter");
                }
            }
        }

        private List<String> buildMaxColumnsFromColumnPairs(List<DataMediaPair> mediaPairs, List<EventColumn> pks) {
            HashSet<String> allColumns = new HashSet<String>();
            HashMap<String, EventColumn> pkMap = new HashMap<String, EventColumn>(pks.size(), 1.0f);
            for (EventColumn pk : pks) {
                pkMap.put(StringUtils.lowerCase((String)pk.getColumnName()), pk);
            }
            for (DataMediaPair mediaPair : mediaPairs) {
                List columnPairs = mediaPair.getColumnPairs();
                if (CollectionUtils.isEmpty((Collection)columnPairs) || mediaPair.getColumnPairMode().isExclude()) {
                    return new ArrayList<String>();
                }
                for (ColumnPair columnPair : columnPairs) {
                    String columnName = columnPair.getSourceColumn().getName();
                    if (pkMap.containsKey(StringUtils.lowerCase((String)columnName))) continue;
                    allColumns.add(columnPair.getSourceColumn().getName());
                }
            }
            return new ArrayList<String>(allColumns);
        }

        private List<String> select(DbDialect dbDialect, String schemaName, String tableName, TableData keyTableData, TableData columnTableData) throws InterruptedException {
            String selectSql = dbDialect.getSqlTemplate().getSelectSql(schemaName, tableName, keyTableData.columnNames, columnTableData.columnNames);
            Exception exception = null;
            for (int i = 0; i < 3; ++i) {
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
                try {
                    List result = dbDialect.getJdbcTemplate().query(selectSql, keyTableData.columnValues, keyTableData.columnTypes, (RowMapper)new RowDataMapper(columnTableData.columnTypes));
                    if (CollectionUtils.isEmpty((Collection)result)) {
                        logger.warn("the mediaName = {}.{} not has rowdate in db \n {}", new Object[]{schemaName, tableName, this.dumpEventData(this.eventData, selectSql)});
                        return null;
                    }
                    return (List)result.get(0);
                }
                catch (Exception e) {
                    exception = e;
                    logger.warn("retry [" + (i + 1) + "] failed", (Throwable)e);
                    continue;
                }
            }
            throw new RuntimeException("db extract failed , data:\n " + this.dumpEventData(this.eventData, selectSql), exception);
        }

        private int[] getOraclePkTypes(Table table, String[] pkNames) {
            Column[] columns = table.getColumns();
            ArrayList<Integer> pkTypes = new ArrayList<Integer>();
            for (String pkName : pkNames) {
                for (Column column : columns) {
                    if (!column.getName().equalsIgnoreCase(pkName)) continue;
                    pkTypes.add(column.getTypeCode());
                }
            }
            int[] types = new int[pkTypes.size()];
            for (int i = 0; i < types.length; ++i) {
                types[i] = (Integer)pkTypes.get(i);
            }
            return types;
        }

        private boolean checkNeedDbForRowMode(Table table, List<String> viewColumns, EventData eventData) {
            if (viewColumns.size() != 0) {
                if (viewColumns.size() != eventData.getColumns().size()) {
                    return true;
                }
                for (EventColumn column : eventData.getColumns()) {
                    if (viewColumns.contains(column.getColumnName())) continue;
                    return true;
                }
                return false;
            }
            return table.getColumnCount() != eventData.getColumns().size() + eventData.getKeys().size();
        }

        private TableData buildTableData(Table table, List<EventColumn> keys) {
            Column[] tableColumns = table.getColumns();
            TableData data = new TableData();
            data.indexs = new int[keys.size()];
            data.columnNames = new String[keys.size()];
            data.columnTypes = new int[keys.size()];
            data.columnValues = new Object[keys.size()];
            int i = 0;
            int index = 0;
            block0: for (EventColumn keyColumn : keys) {
                for (Column tableColumn : tableColumns) {
                    if (StringUtils.equalsIgnoreCase((String)keyColumn.getColumnName(), (String)tableColumn.getName())) {
                        data.indexs[i] = index;
                        data.columnNames[i] = tableColumn.getName();
                        data.columnTypes[i] = tableColumn.getTypeCode();
                        data.columnValues[i] = SqlUtils.stringToSqlValue(keyColumn.getColumnValue(), tableColumn.getTypeCode(), tableColumn.isRequired(), false);
                        ++i;
                        continue block0;
                    }
                    ++index;
                }
            }
            if (i != keys.size()) {
                throw new ExtractException("keys is not found in table " + table.toString() + " keys : " + this.dumpEventColumn(keys));
            }
            return data;
        }

        private TableData buildTableData(Table table, List<EventColumn> columns, boolean needAll, List<String> viewColumnNames) {
            Column[] tableColumns = table.getColumns();
            ArrayList<Column> noPkcolumns = new ArrayList<Column>();
            for (Column tableColumn : tableColumns) {
                if (tableColumn.isPrimaryKey()) continue;
                noPkcolumns.add(tableColumn);
            }
            TableData data = new TableData();
            int size = columns.size();
            if (needAll) {
                size = viewColumnNames.size() != 0 ? viewColumnNames.size() : noPkcolumns.size();
            }
            data.indexs = new int[size];
            data.columnNames = new String[size];
            data.columnTypes = new int[size];
            data.columnValues = new Object[size];
            int i = 0;
            if (needAll) {
                int index = 0;
                if (viewColumnNames.size() != 0) {
                    for (Column tableColumn : tableColumns) {
                        if (viewColumnNames.contains(tableColumn.getName())) {
                            data.indexs[i] = index;
                            data.columnNames[i] = tableColumn.getName();
                            data.columnTypes[i] = tableColumn.getTypeCode();
                            ++i;
                        }
                        ++index;
                    }
                } else {
                    for (Column tableColumn : tableColumns) {
                        if (!tableColumn.isPrimaryKey()) {
                            data.indexs[i] = index;
                            data.columnNames[i] = tableColumn.getName();
                            data.columnTypes[i] = tableColumn.getTypeCode();
                            ++i;
                        }
                        ++index;
                    }
                }
            } else {
                block3: for (EventColumn column : columns) {
                    int index = 0;
                    for (Column tableColumn : tableColumns) {
                        if (StringUtils.equalsIgnoreCase((String)column.getColumnName(), (String)tableColumn.getName())) {
                            data.indexs[i] = index;
                            data.columnNames[i] = tableColumn.getName();
                            data.columnTypes[i] = tableColumn.getTypeCode();
                            ++i;
                            continue block3;
                        }
                        ++index;
                    }
                }
                if (i != columns.size()) {
                    throw new ExtractException("columns is not found in table " + table.toString() + " columns : " + this.dumpEventColumn(columns));
                }
            }
            return data;
        }

        private String dumpEventData(EventData eventData, String selectSql) {
            return MessageFormat.format(this.eventData_format, eventData.getPairId(), eventData.getTableId(), this.dumpEventColumn(eventData.getKeys()), "\t" + selectSql);
        }

        private String dumpEventColumn(List<EventColumn> columns) {
            StringBuilder builder = new StringBuilder(1024);
            int size = columns.size();
            for (int i = 0; i < size; ++i) {
                EventColumn column = columns.get(i);
                builder.append("\t").append(column.toString());
                if (i >= columns.size() - 1) continue;
                builder.append(this.SEP);
            }
            return builder.toString();
        }
    }

    class DataItem {
        private EventData eventData;
        private boolean filter = false;

        public DataItem(EventData eventData) {
            this.eventData = eventData;
        }

        public EventData getEventData() {
            return this.eventData;
        }

        public void setEventData(EventData eventData) {
            this.eventData = eventData;
        }

        public boolean isFilter() {
            return this.filter;
        }

        public void setFilter(boolean filter) {
            this.filter = filter;
        }
    }
}

