/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.load;

import com.alibaba.otter.node.etl.common.jmx.StageAggregation;
import com.alibaba.otter.node.etl.common.task.GlobalTask;
import com.alibaba.otter.node.etl.extract.SetlFuture;
import com.alibaba.otter.node.etl.load.loader.LoadContext;
import com.alibaba.otter.node.etl.load.loader.OtterLoaderFactory;
import com.alibaba.otter.node.etl.load.loader.db.context.DbLoadContext;
import com.alibaba.otter.node.etl.load.loader.interceptor.LoadInterceptor;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.etl.model.DbBatch;
import java.util.List;
import org.slf4j.MDC;

public class LoadTask
extends GlobalTask {
    private OtterLoaderFactory otterLoaderFactory;
    private LoadInterceptor dbLoadInterceptor;

    public LoadTask(Long pipelineId) {
        super(pipelineId);
    }

    @Override
    public void run() {
        MDC.put((String)"otter", (String)String.valueOf(this.pipelineId));
        while (this.running) {
            try {
                final EtlEventData etlEventData = this.arbitrateEventService.loadEvent().await(this.pipelineId);
                Runnable task = new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        boolean profiling = LoadTask.this.isProfiling();
                        Long profilingStartTime = null;
                        if (profiling) {
                            profilingStartTime = System.currentTimeMillis();
                        }
                        MDC.put((String)"otter", (String)String.valueOf(LoadTask.this.pipelineId));
                        String currentName = Thread.currentThread().getName();
                        Thread.currentThread().setName(LoadTask.this.createTaskName(LoadTask.this.pipelineId, "LoadWorker"));
                        List<LoadContext> processedContexts = null;
                        try {
                            List keys = (List)etlEventData.getDesc();
                            DbBatch dbBatch = LoadTask.this.rowDataPipeDelegate.get(keys);
                            if (dbBatch == null) {
                                LoadTask.this.processMissData(LoadTask.this.pipelineId, "load miss data with keys:" + keys.toString());
                                return;
                            }
                            LoadTask.this.otterLoaderFactory.setStartTime(dbBatch.getRowBatch().getIdentity(), etlEventData.getStartTime());
                            processedContexts = LoadTask.this.otterLoaderFactory.load(dbBatch);
                            if (profiling) {
                                Long profilingEndTime = System.currentTimeMillis();
                                LoadTask.this.stageAggregationCollector.push(LoadTask.this.pipelineId, StageType.LOAD, new StageAggregation.AggregationItem(profilingStartTime, profilingEndTime));
                            }
                            LoadTask.this.arbitrateEventService.loadEvent().single(etlEventData);
                        }
                        catch (Throwable e) {
                            if (!LoadTask.this.isInterrupt(e)) {
                                LoadTask.this.logger.error(String.format("[%s] loadWork executor is error! data:%s", LoadTask.this.pipelineId, etlEventData), e);
                            } else {
                                LoadTask.this.logger.info(String.format("[%s] loadWork executor is interrrupt! data:%s", LoadTask.this.pipelineId, etlEventData), e);
                            }
                            if (processedContexts != null) {
                                for (LoadContext context : processedContexts) {
                                    try {
                                        if (!(context instanceof DbLoadContext)) continue;
                                        LoadTask.this.dbLoadInterceptor.error((DbLoadContext)context);
                                    }
                                    catch (Throwable ie) {
                                        LoadTask.this.logger.error(String.format("[%s] interceptor process error failed!", LoadTask.this.pipelineId), ie);
                                    }
                                }
                            }
                            if (!LoadTask.this.isInterrupt(e)) {
                                LoadTask.this.sendRollbackTermin(LoadTask.this.pipelineId, e);
                            }
                        }
                        finally {
                            Thread.currentThread().setName(currentName);
                            MDC.remove((String)"otter");
                        }
                    }
                };
                SetlFuture extractFuture = new SetlFuture(StageType.LOAD, etlEventData.getProcessId(), this.pendingFuture, task);
                this.executorService.execute(extractFuture);
            }
            catch (Throwable e) {
                if (this.isInterrupt(e)) {
                    this.logger.info(String.format("[%s] loadTask is interrupted!", this.pipelineId), e);
                    return;
                }
                this.logger.error(String.format("[%s] loadTask is error!", this.pipelineId), e);
                this.sendRollbackTermin((long)this.pipelineId, e);
            }
        }
    }

    public void setOtterLoaderFactory(OtterLoaderFactory otterLoaderFactory) {
        this.otterLoaderFactory = otterLoaderFactory;
    }

    public void setDbLoadInterceptor(LoadInterceptor dbLoadInterceptor) {
        this.dbLoadInterceptor = dbLoadInterceptor;
    }
}

