/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.select.selector.canal;

import com.alibaba.otter.canal.sink.AbstractCanalEventDownStreamHandler;
import com.alibaba.otter.canal.store.model.Event;
import com.alibaba.otter.shared.arbitrate.ArbitrateEventService;
import com.alibaba.otter.shared.arbitrate.model.MainStemEventData;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.utils.thread.NamedThreadFactory;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class OtterDownStreamHandler
extends AbstractCanalEventDownStreamHandler<List<Event>> {
    private static final Logger logger = LoggerFactory.getLogger(OtterDownStreamHandler.class);
    private static final String DETECTING_FAILED_MESSAGE = "pid:%s canal elapsed %s seconds no data";
    private Long pipelineId;
    private ArbitrateEventService arbitrateEventService;
    private ScheduledExecutorService scheduler = null;
    private ScheduledFuture future = null;
    private AtomicBoolean working = new AtomicBoolean(false);
    private Integer detectingIntervalInSeconds;
    private volatile Long lastEventExecuteTime = 0L;
    private int detectingThresoldCount = 10;
    private int detectingExpCount = 1;
    private AtomicLong detectingFailedCount = new AtomicLong(0L);
    private AtomicLong detectingSuccessedCount = new AtomicLong(0L);
    private ReentrantLock lock = new ReentrantLock();

    public void stop() {
        try {
            this.lock.lock();
            super.stop();
            if (this.working.compareAndSet(true, false)) {
                this.stopDetecting();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public List<Event> before(List<Event> events) {
        this.lastEventExecuteTime = System.currentTimeMillis();
        if (super.isStart() && this.working.compareAndSet(false, true)) {
            try {
                this.lock.lock();
                if (super.isStart()) {
                    this.startDetecting();
                } else {
                    this.working.set(false);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                this.lock.unlock();
            }
        }
        return (List)super.before(events);
    }

    public List<Event> retry(List<Event> events) {
        this.lastEventExecuteTime = System.currentTimeMillis();
        return (List)super.retry(events);
    }

    public List<Event> after(List<Event> events) {
        return (List)super.after(events);
    }

    private void startDetecting() {
        MainStemEventData mainStemData = new MainStemEventData();
        mainStemData.setPipelineId(this.pipelineId);
        mainStemData.setStatus(MainStemEventData.Status.OVERTAKE);
        this.arbitrateEventService.mainStemEvent().single(mainStemData);
        String schedulerName = String.format("pipelineId = %s , CanalDetecting", String.valueOf(this.pipelineId));
        this.scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new NamedThreadFactory(schedulerName));
        this.future = this.scheduler.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MDC.put((String)"otter", (String)String.valueOf(OtterDownStreamHandler.this.pipelineId));
                    if (OtterDownStreamHandler.this.isDelayed(System.currentTimeMillis(), OtterDownStreamHandler.this.lastEventExecuteTime)) {
                        OtterDownStreamHandler.this.notifyFailed();
                    } else {
                        OtterDownStreamHandler.this.notifySuccessed();
                    }
                }
                catch (Exception e) {
                    logger.error("heartbeat check failed!", (Throwable)e);
                }
                finally {
                    MDC.remove((String)"otter");
                }
            }
        }, this.detectingIntervalInSeconds.intValue(), this.detectingIntervalInSeconds.intValue(), TimeUnit.SECONDS);
    }

    private void stopDetecting() {
        ((ScheduledThreadPoolExecutor)this.scheduler).remove((Runnable)((Object)this.future));
        this.scheduler.shutdownNow();
    }

    private void notifyFailed() {
        this.detectingSuccessedCount.set(0L);
        long failedCount = this.detectingFailedCount.incrementAndGet();
        if (failedCount == 1L) {
            this.detectingExpCount = 1;
            this.notifyMainstemStatus(MainStemEventData.Status.TAKEING);
        }
        if (failedCount >= (long)(this.detectingThresoldCount * this.detectingExpCount * this.detectingExpCount)) {
            this.notifyMainstemStatus(MainStemEventData.Status.TAKEING);
            ++this.detectingExpCount;
            TerminEventData errorEventData = new TerminEventData();
            errorEventData.setPipelineId(this.pipelineId);
            errorEventData.setType(TerminEventData.TerminType.WARNING);
            errorEventData.setCode("mainstem");
            errorEventData.setDesc(String.format(DETECTING_FAILED_MESSAGE, this.pipelineId, String.valueOf((long)this.detectingIntervalInSeconds.intValue() * failedCount)));
            this.arbitrateEventService.terminEvent().single(errorEventData);
        }
    }

    private void notifySuccessed() {
        this.detectingFailedCount.set(0L);
        long successedCount = this.detectingSuccessedCount.incrementAndGet();
        if (successedCount == 1L) {
            this.detectingExpCount = 1;
            this.notifyMainstemStatus(MainStemEventData.Status.OVERTAKE);
        }
        if (successedCount >= (long)(this.detectingThresoldCount * this.detectingExpCount * this.detectingExpCount)) {
            ++this.detectingExpCount;
            this.notifyMainstemStatus(MainStemEventData.Status.OVERTAKE);
        }
    }

    private void notifyMainstemStatus(MainStemEventData.Status status) {
        MainStemEventData mainStemData = new MainStemEventData();
        mainStemData.setPipelineId(this.pipelineId);
        mainStemData.setStatus(status);
        this.arbitrateEventService.mainStemEvent().single(mainStemData);
    }

    private boolean isDelayed(Long detectingExecuteTime, Long lastExecuteTime) {
        long delayTime = detectingExecuteTime - lastExecuteTime;
        return delayTime > (long)(this.detectingIntervalInSeconds * 2 * 1000);
    }

    public void setPipelineId(Long pipelineId) {
        this.pipelineId = pipelineId;
    }

    public void setDetectingIntervalInSeconds(Integer detectingIntervalInSeconds) {
        this.detectingIntervalInSeconds = detectingIntervalInSeconds;
    }

    public void setArbitrateEventService(ArbitrateEventService arbitrateEventService) {
        this.arbitrateEventService = arbitrateEventService;
    }

    public void setDetectingThresoldCount(int detectingThresoldCount) {
        this.detectingThresoldCount = detectingThresoldCount;
    }
}

