/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.load.loader.db;

import com.alibaba.otter.node.common.config.ConfigClientService;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialect;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialectFactory;
import com.alibaba.otter.node.etl.common.db.dialect.mysql.MysqlDialect;
import com.alibaba.otter.node.etl.common.db.utils.SqlUtils;
import com.alibaba.otter.node.etl.load.exception.LoadException;
import com.alibaba.otter.node.etl.load.loader.LoadStatsTracker;
import com.alibaba.otter.node.etl.load.loader.db.DbLoadData;
import com.alibaba.otter.node.etl.load.loader.db.DbLoadDumper;
import com.alibaba.otter.node.etl.load.loader.db.DbLoadMerger;
import com.alibaba.otter.node.etl.load.loader.db.context.DbLoadContext;
import com.alibaba.otter.node.etl.load.loader.interceptor.LoadInterceptor;
import com.alibaba.otter.node.etl.load.loader.weight.WeightBuckets;
import com.alibaba.otter.node.etl.load.loader.weight.WeightController;
import com.alibaba.otter.shared.common.model.config.ConfigHelper;
import com.alibaba.otter.shared.common.model.config.channel.Channel;
import com.alibaba.otter.shared.common.model.config.data.DataMedia;
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.utils.thread.NamedThreadFactory;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;
import com.alibaba.otter.shared.etl.model.Identity;
import com.alibaba.otter.shared.etl.model.RowBatch;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.ddlutils.model.Column;
import org.apache.ddlutils.model.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.core.StatementCallback;
import org.springframework.jdbc.core.StatementCreatorUtils;
import org.springframework.jdbc.support.lob.LobCreator;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public class DbLoadAction
implements InitializingBean,
DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(DbLoadAction.class);
    private static final String WORKER_NAME = "DbLoadAction";
    private static final String WORKER_NAME_FORMAT = "pipelineId = %s , pipelineName = %s , DbLoadAction";
    private static final int DEFAULT_POOL_SIZE = 5;
    private int poolSize = 5;
    private int retry = 3;
    private int retryWait = 3000;
    private LoadInterceptor interceptor;
    private ExecutorService executor;
    private DbDialectFactory dbDialectFactory;
    private ConfigClientService configClientService;
    private int batchSize = 50;
    private boolean useBatch = true;
    private LoadStatsTracker loadStatsTracker;

    public DbLoadContext load(RowBatch rowBatch, WeightController controller) {
        Assert.notNull((Object)rowBatch);
        Identity identity = rowBatch.getIdentity();
        DbLoadContext context = this.buildContext(identity);
        try {
            List<EventData> datas = rowBatch.getDatas();
            context.setPrepareDatas(datas);
            datas = context.getPrepareDatas();
            if (datas == null || datas.size() == 0) {
                logger.info("##no eventdata for load, return");
                return context;
            }
            context.setDataMediaSource(ConfigHelper.findDataMedia((Pipeline)context.getPipeline(), (Long)((EventData)datas.get(0)).getTableId()).getSource());
            this.interceptor.prepare(context);
            datas = context.getPrepareDatas();
            if (this.isDdlDatas(datas)) {
                this.doDdl(context, datas);
            } else {
                WeightBuckets<EventData> buckets = this.buildWeightBuckets(context, datas);
                List<Long> weights = buckets.weights();
                controller.start(weights);
                if (CollectionUtils.isEmpty(datas)) {
                    logger.info("##no eventdata for load");
                }
                this.adjustPoolSize(context);
                this.adjustConfig(context);
                for (int i = 0; i < weights.size(); ++i) {
                    Long weight = weights.get(i);
                    controller.await(weight.intValue());
                    List<EventData> items = buckets.getItems(weight);
                    logger.debug("##start load for weight:" + weight);
                    items = DbLoadMerger.merge(items);
                    DbLoadData loadData = new DbLoadData();
                    this.doBefore(items, context, loadData);
                    this.doLoad(context, loadData);
                    controller.single(weight.intValue());
                    logger.debug("##end load for weight:" + weight);
                }
            }
            this.interceptor.commit(context);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.interceptor.error(context);
        }
        catch (Exception e) {
            this.interceptor.error(context);
            throw new LoadException(e);
        }
        return context;
    }

    private DbLoadContext buildContext(Identity identity) {
        DbLoadContext context = new DbLoadContext();
        context.setIdentity(identity);
        Channel channel = this.configClientService.findChannel(Long.valueOf(identity.getChannelId()));
        Pipeline pipeline = this.configClientService.findPipeline(Long.valueOf(identity.getPipelineId()));
        context.setChannel(channel);
        context.setPipeline(pipeline);
        return context;
    }

    private boolean isDdlDatas(List<EventData> eventDatas) {
        boolean result = false;
        for (EventData eventData : eventDatas) {
            if (!(result |= eventData.getEventType().isDdl()) || eventData.getEventType().isDdl()) continue;
            throw new LoadException("ddl/dml can't be in one batch, it's may be a bug , pls submit issues.", DbLoadDumper.dumpEventDatas(eventDatas));
        }
        return result;
    }

    private WeightBuckets<EventData> buildWeightBuckets(DbLoadContext context, List<EventData> datas) {
        WeightBuckets<EventData> buckets = new WeightBuckets<EventData>();
        for (EventData data : datas) {
            DataMediaPair pair = ConfigHelper.findDataMediaPair((Pipeline)context.getPipeline(), (Long)data.getPairId());
            buckets.addItem(pair.getPushWeight(), data);
        }
        return buckets;
    }

    private void doBefore(List<EventData> items, DbLoadContext context, DbLoadData loadData) {
        for (EventData item : items) {
            boolean filter = this.interceptor.before(context, item);
            if (filter) continue;
            loadData.merge(item);
        }
    }

    private void doLoad(DbLoadContext context, DbLoadData loadData) {
        ArrayList<List<EventData>> batchDatas = new ArrayList<List<EventData>>();
        for (DbLoadData.TableLoadData tableData : loadData.getTables()) {
            if (this.useBatch) {
                batchDatas.addAll(this.split(tableData.getDeleteDatas()));
                continue;
            }
            for (EventData data : tableData.getDeleteDatas()) {
                batchDatas.add(Arrays.asList(data));
            }
        }
        if (context.getPipeline().getParameters().isDryRun().booleanValue()) {
            this.doDryRun(context, batchDatas, true);
        } else {
            this.doTwoPhase(context, batchDatas, true);
        }
        batchDatas.clear();
        for (DbLoadData.TableLoadData tableData : loadData.getTables()) {
            if (this.useBatch) {
                batchDatas.addAll(this.split(tableData.getInsertDatas()));
                batchDatas.addAll(this.split(tableData.getUpadateDatas()));
                continue;
            }
            for (EventData data : tableData.getInsertDatas()) {
                batchDatas.add(Arrays.asList(data));
            }
            for (EventData data : tableData.getUpadateDatas()) {
                batchDatas.add(Arrays.asList(data));
            }
        }
        if (context.getPipeline().getParameters().isDryRun().booleanValue()) {
            this.doDryRun(context, batchDatas, true);
        } else {
            this.doTwoPhase(context, batchDatas, true);
        }
        batchDatas.clear();
    }

    private List<List<EventData>> split(List<EventData> datas) {
        ArrayList<List<EventData>> result = new ArrayList<List<EventData>>();
        if (datas == null || datas.size() == 0) {
            return result;
        }
        int[] bits = new int[datas.size()];
        for (int i = 0; i < bits.length; ++i) {
            while (i < bits.length && bits[i] == 1) {
                ++i;
            }
            if (i >= bits.length) break;
            ArrayList<EventData> batch = new ArrayList<EventData>();
            bits[i] = 1;
            batch.add(datas.get(i));
            for (int j = i + 1; j < bits.length && batch.size() < this.batchSize; ++j) {
                if (bits[j] != 0 || !this.canBatch(datas.get(i), datas.get(j))) continue;
                batch.add(datas.get(j));
                bits[j] = 1;
            }
            result.add(batch);
        }
        return result;
    }

    private boolean canBatch(EventData source, EventData target) {
        return source.getSql() == target.getSql();
    }

    private void doDryRun(DbLoadContext context, List<List<EventData>> totalRows, boolean canBatch) {
        for (List<EventData> rows : totalRows) {
            if (CollectionUtils.isEmpty(rows)) continue;
            for (EventData row : rows) {
                this.processStat(row, context);
            }
            context.getProcessedDatas().addAll(rows);
        }
    }

    private void doDdl(DbLoadContext context, List<EventData> eventDatas) {
        for (final EventData data : eventDatas) {
            DataMedia dataMedia = ConfigHelper.findDataMedia((Pipeline)context.getPipeline(), (Long)data.getTableId());
            final DbDialect dbDialect = this.dbDialectFactory.getDbDialect(context.getIdentity().getPipelineId(), (DbMediaSource)dataMedia.getSource());
            Boolean skipDdlException = context.getPipeline().getParameters().getSkipDdlException();
            try {
                Boolean result = (Boolean)dbDialect.getJdbcTemplate().execute((StatementCallback)new StatementCallback<Boolean>(){

                    public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessException {
                        Boolean result = true;
                        if (dbDialect instanceof MysqlDialect && StringUtils.isNotEmpty((String)data.getDdlSchemaName())) {
                            result = result & stmt.execute("use `" + data.getDdlSchemaName() + "`");
                        }
                        result = result & stmt.execute(data.getSql());
                        return result;
                    }
                });
                if (result.booleanValue()) {
                    context.getProcessedDatas().add(data);
                    continue;
                }
                context.getFailedDatas().add(data);
            }
            catch (Throwable e) {
                if (skipDdlException.booleanValue()) {
                    logger.warn("skip exception for ddl : {} , caused by {}", (Object)data, (Object)ExceptionUtils.getFullStackTrace((Throwable)e));
                    continue;
                }
                throw new LoadException(e);
            }
        }
    }

    /*
     * WARNING - void declaration
     */
    private void doTwoPhase(DbLoadContext context, List<List<EventData>> totalRows, boolean canBatch) {
        ArrayList<Future<Exception>> results = new ArrayList<Future<Exception>>();
        for (List<EventData> rows : totalRows) {
            if (CollectionUtils.isEmpty(rows)) continue;
            results.add(this.executor.submit(new DbLoadWorker(context, rows, canBatch)));
        }
        boolean partFailed = false;
        for (int i = 0; i < results.size(); ++i) {
            void var8_11;
            Future result = (Future)results.get(i);
            Object var8_12 = null;
            try {
                Exception exception = (Exception)result.get();
                for (EventData data : totalRows.get(i)) {
                    this.interceptor.after(context, data);
                }
            }
            catch (Exception e) {
                Exception exception = e;
            }
            if (var8_11 == null) continue;
            logger.warn("##load phase one failed!", (Throwable)var8_11);
            partFailed = true;
        }
        if (partFailed) {
            ArrayList<EventData> retryEventDatas = new ArrayList<EventData>();
            for (List list : totalRows) {
                retryEventDatas.addAll(list);
            }
            context.getFailedDatas().clear();
            Boolean skipLoadException = context.getPipeline().getParameters().getSkipLoadException();
            if (skipLoadException != null && skipLoadException.booleanValue()) {
                for (EventData retryEventData : retryEventDatas) {
                    DbLoadWorker worker = new DbLoadWorker(context, Arrays.asList(retryEventData), false);
                    try {
                        Exception ex = worker.call();
                        if (ex == null) continue;
                        logger.warn("skip exception for data : {} , caused by {}", (Object)retryEventData, (Object)ExceptionUtils.getFullStackTrace((Throwable)ex));
                    }
                    catch (Exception ex) {
                        logger.warn("skip exception for data : {} , caused by {}", (Object)retryEventData, (Object)ExceptionUtils.getFullStackTrace((Throwable)ex));
                    }
                }
            } else {
                DbLoadWorker dbLoadWorker = new DbLoadWorker(context, retryEventDatas, false);
                try {
                    Exception ex = dbLoadWorker.call();
                    if (ex != null) {
                        throw ex;
                    }
                }
                catch (Exception ex) {
                    logger.error("##load phase two failed!", (Throwable)ex);
                    throw new LoadException(ex);
                }
            }
            for (EventData data : retryEventDatas) {
                this.interceptor.after(context, data);
            }
        }
    }

    private void adjustPoolSize(DbLoadContext context) {
        Pipeline pipeline = context.getPipeline();
        int newPoolSize = pipeline.getParameters().getLoadPoolSize();
        if (newPoolSize != this.poolSize) {
            this.poolSize = newPoolSize;
            if (this.executor instanceof ThreadPoolExecutor) {
                ThreadPoolExecutor pool = (ThreadPoolExecutor)this.executor;
                pool.setCorePoolSize(newPoolSize);
                pool.setMaximumPoolSize(newPoolSize);
            }
        }
    }

    private void adjustConfig(DbLoadContext context) {
        Pipeline pipeline = context.getPipeline();
        this.useBatch = pipeline.getParameters().isUseBatch();
    }

    public void afterPropertiesSet() throws Exception {
        this.executor = new ThreadPoolExecutor(this.poolSize, this.poolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(this.poolSize * 4), (ThreadFactory)new NamedThreadFactory(WORKER_NAME), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void destroy() throws Exception {
        this.executor.shutdownNow();
    }

    private void processStat(EventData data, DbLoadContext context) {
        LoadStatsTracker.LoadThroughput throughput = this.loadStatsTracker.getStat(context.getIdentity());
        LoadStatsTracker.LoadCounter counter = throughput.getStat(data.getPairId());
        EventType type = data.getEventType();
        if (type.isInsert()) {
            counter.getInsertCount().incrementAndGet();
        } else if (type.isUpdate()) {
            counter.getUpdateCount().incrementAndGet();
        } else if (type.isDelete()) {
            counter.getDeleteCount().incrementAndGet();
        }
        counter.getRowCount().incrementAndGet();
        counter.getRowSize().addAndGet(this.calculateSize(data));
    }

    private long calculateSize(EventData data) {
        return data.getSize();
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public void setRetry(int retry) {
        this.retry = retry;
    }

    public void setRetryWait(int retryWait) {
        this.retryWait = retryWait;
    }

    public void setInterceptor(LoadInterceptor interceptor) {
        this.interceptor = interceptor;
    }

    public void setDbDialectFactory(DbDialectFactory dbDialectFactory) {
        this.dbDialectFactory = dbDialectFactory;
    }

    public void setConfigClientService(ConfigClientService configClientService) {
        this.configClientService = configClientService;
    }

    public void setLoadStatsTracker(LoadStatsTracker loadStatsTracker) {
        this.loadStatsTracker = loadStatsTracker;
    }

    public void setUseBatch(boolean useBatch) {
        this.useBatch = useBatch;
    }

    class DbLoadWorker
    implements Callable<Exception> {
        private DbLoadContext context;
        private DbDialect dbDialect;
        private List<EventData> datas;
        private boolean canBatch;
        private List<EventData> allFailedDatas = new ArrayList<EventData>();
        private List<EventData> allProcesedDatas = new ArrayList<EventData>();
        private List<EventData> processedDatas = new ArrayList<EventData>();
        private List<EventData> failedDatas = new ArrayList<EventData>();

        public DbLoadWorker(DbLoadContext context, List<EventData> datas, boolean canBatch) {
            this.context = context;
            this.datas = datas;
            this.canBatch = canBatch;
            EventData data = datas.get(0);
            DataMedia dataMedia = ConfigHelper.findDataMedia((Pipeline)context.getPipeline(), (Long)data.getTableId());
            this.dbDialect = DbLoadAction.this.dbDialectFactory.getDbDialect(context.getIdentity().getPipelineId(), (DbMediaSource)dataMedia.getSource());
        }

        @Override
        public Exception call() throws Exception {
            try {
                Thread.currentThread().setName(String.format(DbLoadAction.WORKER_NAME_FORMAT, this.context.getPipeline().getId(), this.context.getPipeline().getName()));
                Exception exception = this.doCall();
                return exception;
            }
            finally {
                Thread.currentThread().setName(DbLoadAction.WORKER_NAME);
            }
        }

        private Exception doCall() {
            LoadException error = null;
            ExecuteResult exeResult = null;
            int index = 0;
            block7: while (index < this.datas.size()) {
                final ArrayList<EventData> splitDatas = new ArrayList<EventData>();
                if (DbLoadAction.this.useBatch && this.canBatch) {
                    int end = index + DbLoadAction.this.batchSize > this.datas.size() ? this.datas.size() : index + DbLoadAction.this.batchSize;
                    splitDatas.addAll(this.datas.subList(index, end));
                    index = end;
                } else {
                    splitDatas.add(this.datas.get(index));
                    ++index;
                }
                int retryCount = 0;
                while (true) {
                    try {
                        if (!CollectionUtils.isEmpty(this.failedDatas)) {
                            splitDatas.clear();
                            splitDatas.addAll(this.failedDatas);
                        } else {
                            this.failedDatas.addAll(splitDatas);
                        }
                        final LobCreator lobCreator = this.dbDialect.getLobHandler().getLobCreator();
                        if (DbLoadAction.this.useBatch && this.canBatch) {
                            final String sql = ((EventData)splitDatas.get(0)).getSql();
                            int[] affects = new int[splitDatas.size()];
                            affects = (int[])this.dbDialect.getTransactionTemplate().execute(new TransactionCallback(){

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                public Object doInTransaction(TransactionStatus status) {
                                    try {
                                        DbLoadWorker.this.failedDatas.clear();
                                        DbLoadWorker.this.processedDatas.clear();
                                        DbLoadAction.this.interceptor.transactionBegin(DbLoadWorker.this.context, splitDatas, DbLoadWorker.this.dbDialect);
                                        JdbcTemplate template = DbLoadWorker.this.dbDialect.getJdbcTemplate();
                                        int[] affects = template.batchUpdate(sql, new BatchPreparedStatementSetter(){

                                            public void setValues(PreparedStatement ps, int idx) throws SQLException {
                                                DbLoadWorker.this.doPreparedStatement(ps, DbLoadWorker.this.dbDialect, lobCreator, (EventData)splitDatas.get(idx));
                                            }

                                            public int getBatchSize() {
                                                return splitDatas.size();
                                            }
                                        });
                                        DbLoadAction.this.interceptor.transactionEnd(DbLoadWorker.this.context, splitDatas, DbLoadWorker.this.dbDialect);
                                        int[] nArray = affects;
                                        return nArray;
                                    }
                                    finally {
                                        lobCreator.close();
                                    }
                                }
                            });
                            for (int i = 0; i < splitDatas.size(); ++i) {
                                this.processStat((EventData)splitDatas.get(i), affects[i], true);
                            }
                        } else {
                            final EventData data = (EventData)splitDatas.get(0);
                            int affect = 0;
                            affect = (Integer)this.dbDialect.getTransactionTemplate().execute(new TransactionCallback(){

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                public Object doInTransaction(TransactionStatus status) {
                                    try {
                                        DbLoadWorker.this.failedDatas.clear();
                                        DbLoadWorker.this.processedDatas.clear();
                                        DbLoadAction.this.interceptor.transactionBegin(DbLoadWorker.this.context, Arrays.asList(data), DbLoadWorker.this.dbDialect);
                                        JdbcTemplate template = DbLoadWorker.this.dbDialect.getJdbcTemplate();
                                        int affect = template.update(data.getSql(), new PreparedStatementSetter(){

                                            public void setValues(PreparedStatement ps) throws SQLException {
                                                DbLoadWorker.this.doPreparedStatement(ps, DbLoadWorker.this.dbDialect, lobCreator, data);
                                            }
                                        });
                                        DbLoadAction.this.interceptor.transactionEnd(DbLoadWorker.this.context, Arrays.asList(data), DbLoadWorker.this.dbDialect);
                                        Integer n = affect;
                                        return n;
                                    }
                                    finally {
                                        lobCreator.close();
                                    }
                                }
                            });
                            this.processStat(data, affect, false);
                        }
                        error = null;
                        exeResult = ExecuteResult.SUCCESS;
                    }
                    catch (DeadlockLoserDataAccessException ex) {
                        error = new LoadException(ExceptionUtils.getFullStackTrace((Throwable)ex), DbLoadDumper.dumpEventDatas(splitDatas));
                        exeResult = ExecuteResult.RETRY;
                    }
                    catch (DataIntegrityViolationException ex) {
                        error = new LoadException(ExceptionUtils.getFullStackTrace((Throwable)ex), DbLoadDumper.dumpEventDatas(splitDatas));
                        exeResult = ExecuteResult.ERROR;
                    }
                    catch (RuntimeException ex) {
                        error = new LoadException(ExceptionUtils.getFullStackTrace((Throwable)ex), DbLoadDumper.dumpEventDatas(splitDatas));
                        exeResult = ExecuteResult.ERROR;
                    }
                    catch (Throwable ex) {
                        error = new LoadException(ExceptionUtils.getFullStackTrace((Throwable)ex), DbLoadDumper.dumpEventDatas(splitDatas));
                        exeResult = ExecuteResult.ERROR;
                    }
                    if (ExecuteResult.SUCCESS == exeResult) {
                        this.allFailedDatas.addAll(this.failedDatas);
                        this.allProcesedDatas.addAll(this.processedDatas);
                        this.failedDatas.clear();
                        this.processedDatas.clear();
                        continue block7;
                    }
                    if (ExecuteResult.RETRY != exeResult) break;
                    this.processedDatas.clear();
                    this.failedDatas.clear();
                    this.failedDatas.addAll(splitDatas);
                    if (++retryCount >= DbLoadAction.this.retry) {
                        this.processFailedDatas(index);
                        throw new LoadException(String.format("execute [%s] retry %s times failed", this.context.getIdentity().toString(), retryCount), (Throwable)((Object)error));
                    }
                    try {
                        int wait = retryCount * DbLoadAction.this.retryWait;
                        wait = wait < DbLoadAction.this.retryWait ? DbLoadAction.this.retryWait : wait;
                        Thread.sleep(wait);
                    }
                    catch (InterruptedException ex) {
                        Thread.interrupted();
                        this.processFailedDatas(index);
                        throw new LoadException(ex);
                    }
                }
                this.processedDatas.clear();
                this.failedDatas.clear();
                this.failedDatas.addAll(splitDatas);
                this.processFailedDatas(index);
                throw error;
            }
            this.context.getFailedDatas().addAll(this.allFailedDatas);
            this.context.getProcessedDatas().addAll(this.allProcesedDatas);
            return null;
        }

        private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobCreator lobCreator, EventData data) throws SQLException {
            EventType type = data.getEventType();
            ArrayList columns = new ArrayList();
            if (type.isInsert()) {
                columns.addAll(data.getColumns());
                columns.addAll(data.getKeys());
            } else if (type.isDelete()) {
                columns.addAll(data.getKeys());
            } else if (type.isUpdate()) {
                boolean existOldKeys = !CollectionUtils.isEmpty((Collection)data.getOldKeys());
                columns.addAll(data.getUpdatedColumns());
                if (existOldKeys && dbDialect.isDRDS()) {
                    columns.addAll(data.getUpdatedKeys());
                } else {
                    columns.addAll(data.getKeys());
                }
                if (existOldKeys) {
                    columns.addAll(data.getOldKeys());
                }
            }
            Table table = dbDialect.findTable(data.getSchemaName(), data.getTableName());
            HashMap<String, Boolean> isRequiredMap = new HashMap<String, Boolean>();
            for (Column tableColumn : table.getColumns()) {
                isRequiredMap.put(StringUtils.lowerCase((String)tableColumn.getName()), tableColumn.isRequired());
            }
            for (int i = 0; i < columns.size(); ++i) {
                int paramIndex = i + 1;
                EventColumn column = (EventColumn)columns.get(i);
                int sqlType = column.getColumnType();
                Boolean isRequired = (Boolean)isRequiredMap.get(StringUtils.lowerCase((String)column.getColumnName()));
                if (isRequired == null) {
                    table = dbDialect.findTable(data.getSchemaName(), data.getTableName(), false);
                    isRequiredMap = new HashMap();
                    for (Column tableColumn : table.getColumns()) {
                        isRequiredMap.put(StringUtils.lowerCase((String)tableColumn.getName()), tableColumn.isRequired());
                    }
                    isRequired = (Boolean)isRequiredMap.get(StringUtils.lowerCase((String)column.getColumnName()));
                    if (isRequired == null) {
                        throw new LoadException(String.format("column name %s is not found in Table[%s]", column.getColumnName(), table.toString()));
                    }
                }
                Object param = null;
                param = dbDialect instanceof MysqlDialect && (sqlType == 92 || sqlType == 93 || sqlType == 91) ? column.getColumnValue() : SqlUtils.stringToSqlValue(column.getColumnValue(), sqlType, isRequired, dbDialect.isEmptyStringNulled());
                try {
                    switch (sqlType) {
                        case 2005: {
                            lobCreator.setClobAsString(ps, paramIndex, (String)param);
                            break;
                        }
                        case 2004: {
                            lobCreator.setBlobAsBytes(ps, paramIndex, (byte[])param);
                            break;
                        }
                        case 91: 
                        case 92: 
                        case 93: {
                            if (dbDialect instanceof MysqlDialect) {
                                ps.setObject(paramIndex, param);
                                break;
                            }
                            StatementCreatorUtils.setParameterValue((PreparedStatement)ps, (int)paramIndex, (int)sqlType, null, (Object)param);
                            break;
                        }
                        case -7: {
                            if (dbDialect instanceof MysqlDialect) {
                                StatementCreatorUtils.setParameterValue((PreparedStatement)ps, (int)paramIndex, (int)3, null, (Object)param);
                                break;
                            }
                            StatementCreatorUtils.setParameterValue((PreparedStatement)ps, (int)paramIndex, (int)sqlType, null, (Object)param);
                            break;
                        }
                        default: {
                            StatementCreatorUtils.setParameterValue((PreparedStatement)ps, (int)paramIndex, (int)sqlType, null, (Object)param);
                            break;
                        }
                    }
                    continue;
                }
                catch (SQLException ex) {
                    logger.error("## SetParam error , [pairId={}, sqltype={}, value={}]", new Object[]{data.getPairId(), sqlType, param});
                    throw ex;
                }
            }
        }

        private void processStat(EventData data, int affect, boolean batch) {
            if (batch && affect < 1 && affect != -2) {
                this.failedDatas.add(data);
            } else if (!batch && affect < 1) {
                this.failedDatas.add(data);
            } else {
                this.processedDatas.add(data);
                DbLoadAction.this.processStat(data, this.context);
            }
        }

        private void processFailedDatas(int index) {
            this.allFailedDatas.addAll(this.failedDatas);
            this.context.getFailedDatas().addAll(this.allFailedDatas);
            while (index < this.datas.size()) {
                this.context.getFailedDatas().add(this.datas.get(index));
                ++index;
            }
            this.allProcesedDatas.addAll(this.processedDatas);
            this.context.getProcessedDatas().addAll(this.allProcesedDatas);
        }
    }

    static enum ExecuteResult {
        SUCCESS,
        ERROR,
        RETRY;

    }
}

