package com.alibaba.otter.node.etl.select.selector.canal;

import com.alibaba.otter.canal.sink.AbstractCanalEventDownStreamHandler;
import com.alibaba.otter.canal.store.model.Event;
import com.alibaba.otter.node.etl.OtterConstants;
import com.alibaba.otter.shared.arbitrate.ArbitrateEventService;
import com.alibaba.otter.shared.arbitrate.model.MainStemEventData;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.utils.thread.NamedThreadFactory;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/alibaba/otter/node/etl/select/selector/canal/OtterDownStreamHandler.class */
public class OtterDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {
    private static final Logger logger = LoggerFactory.getLogger(OtterDownStreamHandler.class);
    private static final String DETECTING_FAILED_MESSAGE = "pid:%s canal elapsed %s seconds no data";
    private Long pipelineId;
    private ArbitrateEventService arbitrateEventService;
    private Integer detectingIntervalInSeconds;
    private ScheduledExecutorService scheduler = null;
    private ScheduledFuture future = null;
    private AtomicBoolean working = new AtomicBoolean(false);
    private volatile Long lastEventExecuteTime = 0L;
    private int detectingThresoldCount = 10;
    private int detectingExpCount = 1;
    private AtomicLong detectingFailedCount = new AtomicLong(0);
    private AtomicLong detectingSuccessedCount = new AtomicLong(0);
    private ReentrantLock lock = new ReentrantLock();

    public void stop() {
        try {
            this.lock.lock();
            super.stop();
            if (this.working.compareAndSet(true, false)) {
                stopDetecting();
            }
        } finally {
            this.lock.unlock();
        }
    }

    public List<Event> before(List<Event> list) {
        this.lastEventExecuteTime = Long.valueOf(System.currentTimeMillis());
        if (super.isStart()) {
            try {
                if (this.working.compareAndSet(false, true)) {
                    this.lock.lock();
                    if (super.isStart()) {
                        startDetecting();
                    } else {
                        this.working.set(false);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                this.lock.unlock();
            }
        }
        return (List) super.before(list);
    }

    public List<Event> retry(List<Event> list) {
        this.lastEventExecuteTime = Long.valueOf(System.currentTimeMillis());
        return (List) super.retry(list);
    }

    public List<Event> after(List<Event> list) {
        return (List) super.after(list);
    }

    private void startDetecting() {
        MainStemEventData mainStemEventData = new MainStemEventData();
        mainStemEventData.setPipelineId(this.pipelineId);
        mainStemEventData.setStatus(MainStemEventData.Status.OVERTAKE);
        this.arbitrateEventService.mainStemEvent().single(mainStemEventData);
        this.scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory(String.format("pipelineId = %s , CanalDetecting", String.valueOf(this.pipelineId))));
        this.future = this.scheduler.scheduleAtFixedRate(new Runnable() { // from class: com.alibaba.otter.node.etl.select.selector.canal.OtterDownStreamHandler.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    try {
                        MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(OtterDownStreamHandler.this.pipelineId));
                        if (OtterDownStreamHandler.this.isDelayed(Long.valueOf(System.currentTimeMillis()), OtterDownStreamHandler.this.lastEventExecuteTime)) {
                            OtterDownStreamHandler.this.notifyFailed();
                        } else {
                            OtterDownStreamHandler.this.notifySuccessed();
                        }
                        MDC.remove(OtterConstants.splitPipelineLogFileKey);
                    } catch (Exception e) {
                        OtterDownStreamHandler.logger.error("heartbeat check failed!", e);
                        MDC.remove(OtterConstants.splitPipelineLogFileKey);
                    }
                } catch (Throwable th) {
                    MDC.remove(OtterConstants.splitPipelineLogFileKey);
                    throw th;
                }
            }
        }, this.detectingIntervalInSeconds.intValue(), this.detectingIntervalInSeconds.intValue(), TimeUnit.SECONDS);
    }

    private void stopDetecting() {
        ((ScheduledThreadPoolExecutor) this.scheduler).remove((Runnable) this.future);
        this.scheduler.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyFailed() {
        this.detectingSuccessedCount.set(0L);
        long incrementAndGet = this.detectingFailedCount.incrementAndGet();
        if (incrementAndGet == 1) {
            this.detectingExpCount = 1;
            notifyMainstemStatus(MainStemEventData.Status.TAKEING);
        }
        if (incrementAndGet >= this.detectingThresoldCount * this.detectingExpCount * this.detectingExpCount) {
            notifyMainstemStatus(MainStemEventData.Status.TAKEING);
            this.detectingExpCount++;
            TerminEventData terminEventData = new TerminEventData();
            terminEventData.setPipelineId(this.pipelineId);
            terminEventData.setType(TerminEventData.TerminType.WARNING);
            terminEventData.setCode("mainstem");
            terminEventData.setDesc(String.format(DETECTING_FAILED_MESSAGE, this.pipelineId, String.valueOf(this.detectingIntervalInSeconds.intValue() * incrementAndGet)));
            this.arbitrateEventService.terminEvent().single(terminEventData);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifySuccessed() {
        this.detectingFailedCount.set(0L);
        long incrementAndGet = this.detectingSuccessedCount.incrementAndGet();
        if (incrementAndGet == 1) {
            this.detectingExpCount = 1;
            notifyMainstemStatus(MainStemEventData.Status.OVERTAKE);
        }
        if (incrementAndGet >= this.detectingThresoldCount * this.detectingExpCount * this.detectingExpCount) {
            this.detectingExpCount++;
            notifyMainstemStatus(MainStemEventData.Status.OVERTAKE);
        }
    }

    private void notifyMainstemStatus(MainStemEventData.Status status) {
        MainStemEventData mainStemEventData = new MainStemEventData();
        mainStemEventData.setPipelineId(this.pipelineId);
        mainStemEventData.setStatus(status);
        this.arbitrateEventService.mainStemEvent().single(mainStemEventData);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isDelayed(Long l, Long l2) {
        return l.longValue() - l2.longValue() > ((long) ((this.detectingIntervalInSeconds.intValue() * 2) * 1000));
    }

    public void setPipelineId(Long l) {
        this.pipelineId = l;
    }

    public void setDetectingIntervalInSeconds(Integer num) {
        this.detectingIntervalInSeconds = num;
    }

    public void setArbitrateEventService(ArbitrateEventService arbitrateEventService) {
        this.arbitrateEventService = arbitrateEventService;
    }

    public void setDetectingThresoldCount(int i) {
        this.detectingThresoldCount = i;
    }
}
