package com.alibaba.otter.node.etl.extract.extractor;

import com.alibaba.otter.node.etl.OtterConstants;
import com.alibaba.otter.node.etl.common.datasource.DataSourceService;
import com.alibaba.otter.node.etl.extract.exceptions.ExtractException;
import com.alibaba.otter.shared.common.model.config.ConfigHelper;
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.utils.extension.ExtensionFactory;
import com.alibaba.otter.shared.common.utils.thread.ExecutorTemplate;
import com.alibaba.otter.shared.common.utils.thread.ExecutorTemplateGetter;
import com.alibaba.otter.shared.etl.extend.processor.EventProcessor;
import com.alibaba.otter.shared.etl.extend.processor.support.DataSourceFetcher;
import com.alibaba.otter.shared.etl.extend.processor.support.DataSourceFetcherAware;
import com.alibaba.otter.shared.etl.model.DbBatch;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.RowBatch;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.sql.DataSource;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/node/etl/extract/extractor/ProcessorExtractor.class */
public class ProcessorExtractor extends AbstractExtractor<DbBatch> {
    private ExtensionFactory extensionFactory;
    private DataSourceService dataSourceService;
    private ExecutorTemplateGetter executorTemplateGetter;

    @Override // com.alibaba.otter.node.etl.extract.extractor.OtterExtractor
    public void extract(DbBatch dbBatch) throws ExtractException {
        try {
            RowBatch rowBatch = dbBatch.getRowBatch();
            final Pipeline pipeline = getPipeline(Long.valueOf(rowBatch.getIdentity().getPipelineId()));
            List<EventData> datas = rowBatch.getDatas();
            final Set synchronizedSet = Collections.synchronizedSet(new HashSet());
            ExecutorTemplate executorTemplate = this.executorTemplateGetter.get();
            executorTemplate.start();
            executorTemplate.adjustPoolSize(pipeline.getParameters().getExtractPoolSize().intValue());
            for (final EventData eventData : datas) {
                List findDataMediaPairByMediaId = ConfigHelper.findDataMediaPairByMediaId(pipeline, Long.valueOf(eventData.getTableId()));
                if (findDataMediaPairByMediaId == null) {
                    throw new ExtractException("ERROR ## the dataMediaId = " + eventData.getTableId() + " dataMediaPair is null,please check");
                }
                Iterator it = findDataMediaPairByMediaId.iterator();
                while (true) {
                    if (it.hasNext()) {
                        DataMediaPair dataMediaPair = (DataMediaPair) it.next();
                        if (dataMediaPair.isExistFilter()) {
                            final DataSourceFetcherAware dataSourceFetcherAware = (EventProcessor) this.extensionFactory.getExtension(EventProcessor.class, dataMediaPair.getFilterData());
                            if (dataSourceFetcherAware instanceof DataSourceFetcherAware) {
                                dataSourceFetcherAware.setDataSourceFetcher(new DataSourceFetcher() { // from class: com.alibaba.otter.node.etl.extract.extractor.ProcessorExtractor.1
                                    public DataSource fetch(Long l) {
                                        return (DataSource) ProcessorExtractor.this.dataSourceService.getDataSource(pipeline.getId().longValue(), ConfigHelper.findDataMedia(pipeline, l).getSource());
                                    }
                                });
                                executorTemplate.submit(new Runnable() { // from class: com.alibaba.otter.node.etl.extract.extractor.ProcessorExtractor.2
                                    @Override // java.lang.Runnable
                                    public void run() {
                                        MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(pipeline.getId()));
                                        if (dataSourceFetcherAware.process(eventData)) {
                                            return;
                                        }
                                        synchronizedSet.add(eventData);
                                    }
                                });
                            } else if (!dataSourceFetcherAware.process(eventData)) {
                                synchronizedSet.add(eventData);
                                break;
                            }
                        }
                    }
                }
            }
            executorTemplate.waitForResult();
            if (!CollectionUtils.isEmpty(synchronizedSet)) {
                datas.removeAll(synchronizedSet);
            }
            if (executorTemplate != null) {
                this.executorTemplateGetter.release(executorTemplate);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                this.executorTemplateGetter.release((ExecutorTemplate) null);
            }
            throw th;
        }
    }

    public void setExtensionFactory(ExtensionFactory extensionFactory) {
        this.extensionFactory = extensionFactory;
    }

    public void setDataSourceService(DataSourceService dataSourceService) {
        this.dataSourceService = dataSourceService;
    }

    public void setExecutorTemplateGetter(ExecutorTemplateGetter executorTemplateGetter) {
        this.executorTemplateGetter = executorTemplateGetter;
    }
}
