package com.alibaba.otter.node.etl.select;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.node.common.statistics.StatisticsClientService;
import com.alibaba.otter.node.etl.OtterConstants;
import com.alibaba.otter.node.etl.common.jmx.StageAggregation;
import com.alibaba.otter.node.etl.common.task.GlobalTask;
import com.alibaba.otter.node.etl.extract.SetlFuture;
import com.alibaba.otter.node.etl.select.exceptions.SelectException;
import com.alibaba.otter.node.etl.select.selector.Message;
import com.alibaba.otter.node.etl.select.selector.OtterSelector;
import com.alibaba.otter.node.etl.select.selector.OtterSelectorFactory;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.model.config.channel.Channel;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.common.model.statistics.delay.DelayCount;
import com.alibaba.otter.shared.common.utils.lock.BooleanMutex;
import com.alibaba.otter.shared.etl.model.DbBatch;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.Identity;
import com.alibaba.otter.shared.etl.model.RowBatch;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/node/etl/select/SelectTask.class */
public class SelectTask extends GlobalTask {
    private volatile boolean isStart;
    private StatisticsClientService statisticsClientService;
    private OtterSelectorFactory otterSelectorFactory;
    private OtterSelector<Message> otterSelector;
    private ExecutorService executor;
    private BlockingQueue<BatchTermin> batchBuffer;
    private boolean needCheck;
    private BooleanMutex canStartSelector;
    private AtomicInteger rversion;
    private long lastResetTime;

    /* loaded from: input_file:com/alibaba/otter/node/etl/select/SelectTask$BatchTermin.class */
    public static class BatchTermin {
        private Long batchId;
        private Long processId;
        private boolean needWait;

        public BatchTermin(Long l, Long l2) {
            this(l, l2, true);
        }

        public BatchTermin(Long l, boolean z) {
            this(l, -1L, z);
        }

        public BatchTermin(Long l, Long l2, boolean z) {
            this.batchId = -1L;
            this.processId = -1L;
            this.needWait = true;
            this.batchId = l;
            this.processId = l2;
            this.needWait = z;
        }

        public Long getBatchId() {
            return this.batchId;
        }

        public void setBatchId(Long l) {
            this.batchId = l;
        }

        public Long getProcessId() {
            return this.processId;
        }

        public void setProcessId(Long l) {
            this.processId = l;
        }

        public boolean isNeedWait() {
            return this.needWait;
        }

        public void setNeedWait(boolean z) {
            this.needWait = z;
        }

        public String toString() {
            return "BatchTermin [batchId=" + this.batchId + ", needWait=" + this.needWait + ", processId=" + this.processId + "]";
        }
    }

    public SelectTask(Long l) {
        super(l);
        this.isStart = false;
        this.batchBuffer = new LinkedBlockingQueue(50);
        this.needCheck = false;
        this.canStartSelector = new BooleanMutex(false);
        this.rversion = new AtomicInteger(0);
        this.lastResetTime = new Date().getTime();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(this.pipelineId));
        while (this.running) {
            try {
                if (this.isStart) {
                    if (!this.arbitrateEventService.mainStemEvent().check(this.pipelineId)) {
                        stopup(false);
                    }
                    LockSupport.parkNanos(5000000000L);
                } else {
                    startup();
                }
            } catch (Throwable th) {
                if (isInterrupt(th)) {
                    this.logger.info("INFO ## select is interrupt", th);
                    return;
                } else {
                    this.logger.warn("WARN ## select is failed.", th);
                    sendRollbackTermin(this.pipelineId.longValue(), th);
                    try {
                        Thread.sleep(10000L);
                    } catch (InterruptedException e) {
                    }
                }
            } finally {
                this.arbitrateEventService.mainStemEvent().release(this.pipelineId);
            }
        }
    }

    private void startup() throws InterruptedException {
        try {
            this.arbitrateEventService.mainStemEvent().await(this.pipelineId);
            this.executor = Executors.newFixedThreadPool(2);
            this.otterSelector = this.otterSelectorFactory.getSelector(this.pipelineId);
            this.otterSelector.start();
            this.canStartSelector.set(false);
            startProcessTermin();
            startProcessSelect();
            this.isStart = true;
        } catch (Throwable th) {
            if (isInterrupt(th)) {
                this.logger.info("INFO ## this node is interrupt", th);
            } else {
                this.logger.warn("WARN ## this node is crashed.", th);
            }
            this.arbitrateEventService.mainStemEvent().release(this.pipelineId);
        }
    }

    private synchronized void stopup(boolean z) throws InterruptedException {
        if (this.isStart) {
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
            if (this.otterSelector != null && this.otterSelector.isStart()) {
                this.otterSelector.stop();
            }
            if (z) {
                throw new InterruptedException();
            }
            this.isStart = false;
        }
    }

    private void startProcessSelect() {
        this.executor.submit(new Runnable() { // from class: com.alibaba.otter.node.etl.select.SelectTask.1
            @Override // java.lang.Runnable
            public void run() {
                MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(SelectTask.this.pipelineId));
                String name = Thread.currentThread().getName();
                Thread.currentThread().setName(SelectTask.this.createTaskName(SelectTask.this.pipelineId.longValue(), "ProcessSelect"));
                try {
                    SelectTask.this.processSelect();
                    Thread.currentThread().setName(name);
                    MDC.remove(OtterConstants.splitPipelineLogFileKey);
                } catch (Throwable th) {
                    Thread.currentThread().setName(name);
                    MDC.remove(OtterConstants.splitPipelineLogFileKey);
                    throw th;
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processSelect() {
        while (this.running) {
            try {
                this.canStartSelector.get();
                if (this.needCheck) {
                    checkContinueWork();
                }
                this.arbitrateEventService.toolEvent().waitForPermit(this.pipelineId);
                Message<Message> selector = this.otterSelector.selector();
                int i = this.rversion.get();
                if (!this.canStartSelector.state()) {
                    rollback(selector.getId());
                } else if (CollectionUtils.isEmpty(selector.getDatas())) {
                    this.batchBuffer.put(new BatchTermin(selector.getId(), false));
                } else {
                    final EtlEventData await = this.arbitrateEventService.selectEvent().await(this.pipelineId);
                    if (this.rversion.get() != i) {
                        this.logger.warn("rollback happend , should skip this data and get new message.");
                        this.canStartSelector.get();
                        Thread.sleep(10000L);
                        this.arbitrateEventService.toolEvent().waitForPermit(this.pipelineId);
                        selector = this.otterSelector.selector();
                    }
                    final Message<Message> message = selector;
                    this.batchBuffer.put(new BatchTermin(message.getId(), await.getProcessId()));
                    this.executorService.execute(new SetlFuture(StageType.SELECT, await.getProcessId(), this.pendingFuture, new Runnable() { // from class: com.alibaba.otter.node.etl.select.SelectTask.2
                        @Override // java.lang.Runnable
                        public void run() {
                            boolean isProfiling = SelectTask.this.isProfiling();
                            Long l = null;
                            if (isProfiling) {
                                l = Long.valueOf(System.currentTimeMillis());
                            }
                            MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(SelectTask.this.pipelineId));
                            String name = Thread.currentThread().getName();
                            Thread.currentThread().setName(SelectTask.this.createTaskName(SelectTask.this.pipelineId.longValue(), "SelectWorker"));
                            try {
                                try {
                                    SelectTask.this.pipeline = SelectTask.this.configClientService.findPipeline(SelectTask.this.pipelineId);
                                    List datas = message.getDatas();
                                    long longValue = await.getStartTime().longValue();
                                    if (!CollectionUtils.isEmpty(datas)) {
                                        longValue = ((EventData) datas.get(0)).getExecuteTime();
                                    }
                                    Channel findChannelByPipelineId = SelectTask.this.configClientService.findChannelByPipelineId(SelectTask.this.pipelineId);
                                    RowBatch rowBatch = new RowBatch();
                                    Identity identity = new Identity();
                                    identity.setChannelId(findChannelByPipelineId.getId().longValue());
                                    identity.setPipelineId(SelectTask.this.pipelineId.longValue());
                                    identity.setProcessId(await.getProcessId().longValue());
                                    rowBatch.setIdentity(identity);
                                    Iterator it = datas.iterator();
                                    while (it.hasNext()) {
                                        rowBatch.merge((EventData) it.next());
                                    }
                                    await.setDesc(SelectTask.this.rowDataPipeDelegate.put(new DbBatch(rowBatch), Long.valueOf(await.getNextNid().longValue())));
                                    await.setNumber(Long.valueOf(datas.size()));
                                    await.setFirstTime(Long.valueOf(longValue));
                                    await.setBatchId(message.getId());
                                    if (isProfiling) {
                                        SelectTask.this.stageAggregationCollector.push(SelectTask.this.pipelineId, StageType.SELECT, new StageAggregation.AggregationItem(l, Long.valueOf(System.currentTimeMillis())));
                                    }
                                    SelectTask.this.arbitrateEventService.selectEvent().single(await);
                                    Thread.currentThread().setName(name);
                                    MDC.remove(OtterConstants.splitPipelineLogFileKey);
                                } catch (Throwable th) {
                                    if (SelectTask.this.isInterrupt(th)) {
                                        SelectTask.this.logger.info(String.format("[%s] selectwork executor is interrrupt! data:%s", SelectTask.this.pipelineId, await), th);
                                    } else {
                                        SelectTask.this.logger.error(String.format("[%s] selectwork executor is error! data:%s", SelectTask.this.pipelineId, await), th);
                                        SelectTask.this.sendRollbackTermin(SelectTask.this.pipelineId.longValue(), th);
                                    }
                                    Thread.currentThread().setName(name);
                                    MDC.remove(OtterConstants.splitPipelineLogFileKey);
                                }
                            } catch (Throwable th2) {
                                Thread.currentThread().setName(name);
                                MDC.remove(OtterConstants.splitPipelineLogFileKey);
                                throw th2;
                            }
                        }
                    }));
                }
            } catch (Throwable th) {
                if (isInterrupt(th)) {
                    this.logger.info(String.format("[%s] selectTask is interrrupt!", this.pipelineId), th);
                    return;
                } else {
                    this.logger.error(String.format("[%s] selectTask is error!", this.pipelineId), th);
                    sendRollbackTermin(this.pipelineId.longValue(), th);
                }
            }
        }
    }

    private void startProcessTermin() {
        this.executor.submit(new Runnable() { // from class: com.alibaba.otter.node.etl.select.SelectTask.3
            @Override // java.lang.Runnable
            public void run() {
                MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(SelectTask.this.pipelineId));
                String name = Thread.currentThread().getName();
                Thread.currentThread().setName(SelectTask.this.createTaskName(SelectTask.this.pipelineId.longValue(), "ProcessTermin"));
                while (SelectTask.this.running) {
                    try {
                        try {
                            boolean z = true;
                            SelectTask.this.arbitrateEventService.terminEvent().exhaust(SelectTask.this.pipelineId);
                            SelectTask.this.batchBuffer.clear();
                            while (SelectTask.this.running) {
                                if (SelectTask.this.batchBuffer.size() == 0) {
                                    if (!SelectTask.this.canStartSelector.state()) {
                                        SelectTask.this.otterSelector.rollback();
                                    }
                                    z = true;
                                    SelectTask.this.canStartSelector.set(true);
                                }
                                BatchTermin batchTermin = (BatchTermin) SelectTask.this.batchBuffer.take();
                                SelectTask.this.logger.info("start process termin : {}", batchTermin.toString());
                                if (batchTermin.isNeedWait()) {
                                    z = SelectTask.this.processTermin(z, batchTermin.getBatchId(), batchTermin.getProcessId());
                                } else if (z) {
                                    SelectTask.this.ack(batchTermin.getBatchId());
                                    SelectTask.this.sendDelayReset(SelectTask.this.pipelineId.longValue());
                                } else {
                                    SelectTask.this.rollback(batchTermin.getBatchId());
                                }
                                SelectTask.this.logger.info("end process termin : {}  result : {}", batchTermin.toString(), Boolean.valueOf(z));
                            }
                        } catch (CanalException e) {
                            SelectTask.this.logger.info(String.format("[%s] ProcessTermin has an error! retry...", SelectTask.this.pipelineId), e);
                            SelectTask.this.notifyRollback();
                        } catch (SelectException e2) {
                            SelectTask.this.logger.info(String.format("[%s] ProcessTermin has an error! retry...", SelectTask.this.pipelineId), e2);
                            SelectTask.this.notifyRollback();
                        } catch (Throwable th) {
                            if (SelectTask.this.isInterrupt(th)) {
                                SelectTask.this.logger.info(String.format("[%s] ProcessTermin is interrupted!", SelectTask.this.pipelineId), th);
                                Thread.currentThread().setName(name);
                                MDC.remove(OtterConstants.splitPipelineLogFileKey);
                                return;
                            } else {
                                SelectTask.this.logger.error(String.format("[%s] ProcessTermin is error!", SelectTask.this.pipelineId), th);
                                SelectTask.this.notifyRollback();
                                SelectTask.this.sendRollbackTermin(SelectTask.this.pipelineId.longValue(), th);
                            }
                        }
                        try {
                            Thread.sleep(30000L);
                        } catch (InterruptedException e3) {
                        }
                    } catch (Throwable th2) {
                        Thread.currentThread().setName(name);
                        MDC.remove(OtterConstants.splitPipelineLogFileKey);
                        throw th2;
                    }
                }
                Thread.currentThread().setName(name);
                MDC.remove(OtterConstants.splitPipelineLogFileKey);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean processTermin(boolean z, Long l, Long l2) throws InterruptedException {
        int i = 0;
        SelectException selectException = null;
        TerminEventData terminEventData = null;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 30) {
                break;
            }
            terminEventData = this.arbitrateEventService.terminEvent().await(this.pipelineId);
            Long batchId = terminEventData.getBatchId();
            Long processId = terminEventData.getProcessId();
            if (batchId == null && l2.longValue() != -1 && !l2.equals(processId)) {
                selectException = new SelectException("unmatched processId, SelectTask batchId = " + l + " processId = " + l2 + " and Termin Event: " + terminEventData.toString());
                Thread.sleep(1000L);
            } else {
                if (batchId == null || l.longValue() == -1 || l.equals(batchId)) {
                    break;
                }
                selectException = new SelectException("unmatched terminId, SelectTask batchId = " + l + " processId = " + l2 + " and Termin Event: " + terminEventData.toString());
                Thread.sleep(1000L);
            }
        }
        selectException = null;
        if (selectException != null) {
            throw selectException;
        }
        if (this.needCheck) {
            checkContinueWork();
        }
        boolean isNormal = terminEventData.getType().isNormal();
        if (!z && isNormal) {
            throw new SelectException(String.format("last status is rollback , but now [batchId:%d , processId:%d] is ack", l, terminEventData.getProcessId()));
        }
        if (terminEventData.getType().isNormal()) {
            ack(l);
            sendDelayStat(this.pipelineId.longValue(), terminEventData.getEndTime(), terminEventData.getFirstTime());
        } else {
            rollback(l);
        }
        this.arbitrateEventService.terminEvent().ack(terminEventData);
        return isNormal;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rollback(Long l) {
        notifyRollback();
        this.otterSelector.rollback();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ack(Long l) {
        this.canStartSelector.set(true);
        this.otterSelector.ack(l);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyRollback() {
        this.canStartSelector.set(false);
        this.rversion.incrementAndGet();
    }

    private void checkContinueWork() throws InterruptedException {
        if (this.arbitrateEventService.mainStemEvent().check(this.pipelineId)) {
            return;
        }
        this.logger.warn("mainstem is not run in this node");
        stopup(true);
    }

    @Override // com.alibaba.otter.node.etl.common.task.GlobalTask
    public void shutdown() {
        super.shutdown();
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        if (this.otterSelector == null || !this.otterSelector.isStart()) {
            return;
        }
        this.otterSelector.stop();
    }

    private void sendDelayStat(long j, Long l, Long l2) {
        DelayCount delayCount = new DelayCount();
        delayCount.setPipelineId(Long.valueOf(j));
        delayCount.setNumber(0L);
        if (l2 != null && l != null) {
            delayCount.setTime(Long.valueOf(l.longValue() - l2.longValue()));
        }
        this.statisticsClientService.sendResetDelayCount(delayCount);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendDelayReset(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastResetTime > 60000) {
            this.lastResetTime = currentTimeMillis;
            DelayCount delayCount = new DelayCount();
            delayCount.setPipelineId(Long.valueOf(j));
            delayCount.setNumber(0L);
            delayCount.setTime(Long.valueOf(currentTimeMillis - this.otterSelector.lastEntryTime().longValue()));
            this.statisticsClientService.sendResetDelayCount(delayCount);
        }
    }

    public void setOtterSelectorFactory(OtterSelectorFactory otterSelectorFactory) {
        this.otterSelectorFactory = otterSelectorFactory;
    }

    public void setStatisticsClientService(StatisticsClientService statisticsClientService) {
        this.statisticsClientService = statisticsClientService;
    }
}
