/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.engine.system;

import java.util.List;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
import org.apache.dolphinscheduler.server.master.engine.system.event.AbstractSystemEvent;
import org.apache.dolphinscheduler.server.master.engine.system.event.ISystemEventHandler;
import org.apache.dolphinscheduler.server.master.failover.FailoverCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SystemEventBusFireWorker
extends BaseDaemonThread
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SystemEventBusFireWorker.class);
    @Autowired
    private SystemEventBus systemEventBus;
    @Autowired
    private FailoverCoordinator failoverCoordinator;
    @Autowired
    private List<ISystemEventHandler> systemEventHandlers;
    private static boolean flag = false;

    public SystemEventBusFireWorker() {
        super("SystemEventBusFireWorker");
    }

    public void start() {
        flag = true;
        super.start();
        log.info("SystemEventBusFireWorker started");
    }

    public void run() {
        while (flag) {
            AbstractSystemEvent systemEvent;
            try {
                systemEvent = this.systemEventBus.take();
            }
            catch (InterruptedException interruptedException) {
                Thread.currentThread().interrupt();
                log.warn("SystemEventBusFireWorker has been interrupted", (Throwable)interruptedException);
                break;
            }
            if (ServerLifeCycleManager.isStopped()) {
                log.info("SystemEventBusFireWorker has been stopped");
                break;
            }
            try {
                this.fireSystemEvent(systemEvent);
            }
            catch (Exception ex) {
                this.systemEventBus.publish(systemEvent);
                log.error("Fire SystemEvent: {} failed", (Object)systemEvent, (Object)ex);
                ThreadUtils.sleep((long)10000L);
            }
        }
    }

    private void fireSystemEvent(AbstractSystemEvent systemEvent) {
        StopWatch stopWatch = StopWatch.createStarted();
        List<ISystemEventHandler> matchedSystemEventHandlers = this.systemEventHandlers.stream().filter(systemEventHandler -> systemEventHandler.matchState() == systemEvent.getEventType()).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(matchedSystemEventHandlers)) {
            log.error("No matched SystemEventHandler for SystemEvent: {}", (Object)systemEvent);
            return;
        }
        matchedSystemEventHandlers.forEach(systemEventHandler -> systemEventHandler.handle(systemEvent));
        stopWatch.stop();
        log.info("Fire SystemEvent: {} cost: {} ms", (Object)systemEvent, (Object)stopWatch.getTime());
    }

    @Override
    public void close() {
        flag = false;
        log.info("SystemEventBusFireWorker closed");
    }
}

