package com.alibaba.otter.node.etl.extract;

import com.alibaba.otter.node.etl.OtterConstants;
import com.alibaba.otter.node.etl.common.jmx.StageAggregation;
import com.alibaba.otter.node.etl.common.pipe.PipeKey;
import com.alibaba.otter.node.etl.common.task.GlobalTask;
import com.alibaba.otter.node.etl.conflict.FileBatchConflictDetectService;
import com.alibaba.otter.node.etl.extract.extractor.OtterExtractorFactory;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.etl.model.DbBatch;
import java.util.List;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/node/etl/extract/ExtractTask.class */
public class ExtractTask extends GlobalTask {
    private OtterExtractorFactory otterExtractorFactory;
    private FileBatchConflictDetectService fileBatchConflictDetectService;

    public ExtractTask(Long l) {
        super(l);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(this.pipelineId));
        while (this.running) {
            try {
                final EtlEventData await = this.arbitrateEventService.extractEvent().await(this.pipelineId);
                this.executorService.execute(new SetlFuture(StageType.EXTRACT, await.getProcessId(), this.pendingFuture, new Runnable() { // from class: com.alibaba.otter.node.etl.extract.ExtractTask.1
                    @Override // java.lang.Runnable
                    public void run() {
                        boolean isProfiling = ExtractTask.this.isProfiling();
                        Long l = null;
                        if (isProfiling) {
                            l = Long.valueOf(System.currentTimeMillis());
                        }
                        MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(ExtractTask.this.pipelineId));
                        String name = Thread.currentThread().getName();
                        Thread.currentThread().setName(ExtractTask.this.createTaskName(ExtractTask.this.pipelineId.longValue(), "ExtractWorker"));
                        try {
                            try {
                                ExtractTask.this.pipeline = ExtractTask.this.configClientService.findPipeline(ExtractTask.this.pipelineId);
                                List<PipeKey> list = (List) await.getDesc();
                                long longValue = await.getNextNid().longValue();
                                DbBatch dbBatch = ExtractTask.this.rowDataPipeDelegate.get(list);
                                if (dbBatch == null) {
                                    ExtractTask.this.processMissData(ExtractTask.this.pipelineId.longValue(), "extract miss data with keys:" + list.toString());
                                    Thread.currentThread().setName(name);
                                    MDC.remove(OtterConstants.splitPipelineLogFileKey);
                                    return;
                                }
                                ExtractTask.this.otterExtractorFactory.extract(dbBatch);
                                if (dbBatch.getFileBatch() != null && !CollectionUtils.isEmpty(dbBatch.getFileBatch().getFiles()) && ExtractTask.this.pipeline.getParameters().getFileDetect().booleanValue()) {
                                    dbBatch.setFileBatch(ExtractTask.this.fileBatchConflictDetectService.detect(dbBatch.getFileBatch(), Long.valueOf(longValue)));
                                }
                                await.setDesc(ExtractTask.this.rowDataPipeDelegate.put(dbBatch, Long.valueOf(longValue)));
                                if (isProfiling) {
                                    ExtractTask.this.stageAggregationCollector.push(ExtractTask.this.pipelineId, StageType.EXTRACT, new StageAggregation.AggregationItem(l, Long.valueOf(System.currentTimeMillis())));
                                }
                                ExtractTask.this.arbitrateEventService.extractEvent().single(await);
                                Thread.currentThread().setName(name);
                                MDC.remove(OtterConstants.splitPipelineLogFileKey);
                            } catch (Throwable th) {
                                if (ExtractTask.this.isInterrupt(th)) {
                                    ExtractTask.this.logger.info(String.format("[%d] extractwork executor is interrrupt! data:%s", ExtractTask.this.pipelineId, await), th);
                                } else {
                                    ExtractTask.this.logger.error(String.format("[%d] extractwork executor is error! data:%s", ExtractTask.this.pipelineId, await), th);
                                    ExtractTask.this.sendRollbackTermin(ExtractTask.this.pipelineId.longValue(), th);
                                }
                                Thread.currentThread().setName(name);
                                MDC.remove(OtterConstants.splitPipelineLogFileKey);
                            }
                        } catch (Throwable th2) {
                            Thread.currentThread().setName(name);
                            MDC.remove(OtterConstants.splitPipelineLogFileKey);
                            throw th2;
                        }
                    }
                }));
            } catch (Throwable th) {
                if (isInterrupt(th)) {
                    this.logger.info(String.format("[%s] extractTask is interrupted!", this.pipelineId), th);
                    return;
                } else {
                    this.logger.error(String.format("[%s] extractTask is error!", this.pipelineId), th);
                    sendRollbackTermin(this.pipelineId.longValue(), th);
                }
            }
        }
    }

    public void setOtterExtractorFactory(OtterExtractorFactory otterExtractorFactory) {
        this.otterExtractorFactory = otterExtractorFactory;
    }

    public void setFileBatchConflictDetectService(FileBatchConflictDetectService fileBatchConflictDetectService) {
        this.fileBatchConflictDetectService = fileBatchConflictDetectService;
    }
}
