/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.worker;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import akka.routing.RouterConfig;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.worker.actors.ProcessorTrackerActor;
import tech.powerjob.worker.actors.TaskTrackerActor;
import tech.powerjob.worker.actors.TroubleshootingActor;
import tech.powerjob.worker.actors.WorkerActor;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService;
import tech.powerjob.worker.background.WorkerHealthReporter;
import tech.powerjob.worker.common.PowerBannerPrinter;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.SpringUtils;
import tech.powerjob.worker.core.executor.ExecutorManager;
import tech.powerjob.worker.persistence.TaskPersistenceService;

public class PowerJobWorker
implements ApplicationContextAware,
InitializingBean,
DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(PowerJobWorker.class);
    private final WorkerRuntime workerRuntime = new WorkerRuntime();
    private final AtomicBoolean initialized = new AtomicBoolean();

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringUtils.inject(applicationContext);
    }

    public void afterPropertiesSet() throws Exception {
        this.init();
    }

    public void init() throws Exception {
        if (!this.initialized.compareAndSet(false, true)) {
            log.warn("[PowerJobWorker] please do not repeat the initialization");
            return;
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        log.info("[PowerJobWorker] start to initialize PowerJobWorker...");
        PowerJobWorkerConfig config = this.workerRuntime.getWorkerConfig();
        log.info("[PowerJobWorker] worker config: {}", (Object)JsonUtils.toJSONString((Object)config));
        CommonUtils.requireNonNull((Object)config, (String)"can't find OhMyConfig, please set OhMyConfig first");
        try {
            PowerBannerPrinter.print();
            if (!config.isEnableTestMode()) {
                this.assertAppName();
            } else {
                log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
            }
            String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
            this.workerRuntime.setWorkerAddress(workerAddress);
            ExecutorManager executorManager = new ExecutorManager(this.workerRuntime.getWorkerConfig());
            this.workerRuntime.setExecutorManager(executorManager);
            HashMap overrideConfig = Maps.newHashMap();
            overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost());
            overrideConfig.put("akka.remote.artery.canonical.port", config.getPort());
            Config akkaBasicConfig = ConfigFactory.load((String)"oms-worker.akka.conf");
            Config akkaFinalConfig = ConfigFactory.parseMap((Map)overrideConfig).withFallback((ConfigMergeable)akkaBasicConfig);
            int cores = Runtime.getRuntime().availableProcessors();
            ActorSystem actorSystem = ActorSystem.create((String)"oms", (Config)akkaFinalConfig);
            this.workerRuntime.setActorSystem(actorSystem);
            ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(this.workerRuntime.getAppId(), this.workerRuntime.getWorkerConfig());
            serverDiscoveryService.start(this.workerRuntime.getExecutorManager().getCoreExecutor());
            this.workerRuntime.setServerDiscoveryService(serverDiscoveryService);
            ActorRef taskTrackerActorRef = actorSystem.actorOf(TaskTrackerActor.props(this.workerRuntime).withDispatcher("akka.task-tracker-dispatcher").withRouter((RouterConfig)new RoundRobinPool(cores * 2)), "task_tracker");
            actorSystem.actorOf(ProcessorTrackerActor.props(this.workerRuntime).withDispatcher("akka.processor-tracker-dispatcher").withRouter((RouterConfig)new RoundRobinPool(cores)), "processor_tracker");
            actorSystem.actorOf(WorkerActor.props(taskTrackerActorRef).withDispatcher("akka.worker-common-dispatcher").withRouter((RouterConfig)new RoundRobinPool(cores)), "worker");
            ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class, (Object[])new Object[0]), "troubleshooting");
            actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
            log.info("[PowerJobWorker] akka-remote listening address: {}", (Object)workerAddress);
            log.info("[PowerJobWorker] akka ActorSystem({}) initialized successfully.", (Object)actorSystem);
            OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService);
            this.workerRuntime.setOmsLogHandler(omsLogHandler);
            TaskPersistenceService taskPersistenceService = new TaskPersistenceService(this.workerRuntime.getWorkerConfig().getStoreStrategy());
            taskPersistenceService.init();
            this.workerRuntime.setTaskPersistenceService(taskPersistenceService);
            log.info("[PowerJobWorker] local storage initialized successfully.");
            this.workerRuntime.getExecutorManager().getCoreExecutor().scheduleAtFixedRate(new WorkerHealthReporter(this.workerRuntime), 0L, config.getHealthReportInterval().intValue(), TimeUnit.SECONDS);
            this.workerRuntime.getExecutorManager().getCoreExecutor().scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0L, 5L, TimeUnit.SECONDS);
            log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", (Object)stopwatch);
        }
        catch (Exception e) {
            log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", (Object)stopwatch, (Object)e);
            throw e;
        }
    }

    public void setConfig(PowerJobWorkerConfig config) {
        this.workerRuntime.setWorkerConfig(config);
    }

    private void assertAppName() {
        PowerJobWorkerConfig config = this.workerRuntime.getWorkerConfig();
        String appName = config.getAppName();
        Objects.requireNonNull(appName, "appName can't be empty!");
        String url = "http://%s/server/assert?appName=%s";
        for (String server : config.getServerAddress()) {
            String realUrl = String.format(url, server, appName);
            try {
                String resultDTOStr = (String)CommonUtils.executeWithRetry0(() -> HttpUtils.get((String)realUrl));
                ResultDTO resultDTO = (ResultDTO)JsonUtils.parseObject((String)resultDTOStr, ResultDTO.class);
                if (resultDTO.isSuccess()) {
                    Long appId = Long.valueOf(resultDTO.getData().toString());
                    log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", (Object)appName, (Object)appId);
                    this.workerRuntime.setAppId(appId);
                    return;
                }
                log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", (Object)appName);
                throw new PowerJobException(resultDTO.getMessage());
            }
            catch (PowerJobException oe) {
                throw oe;
            }
            catch (Exception ignore) {
                log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", (Object)realUrl);
            }
        }
        log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
        throw new PowerJobException("no server available!");
    }

    public void destroy() throws Exception {
        this.workerRuntime.getExecutorManager().shutdown();
        this.workerRuntime.getActorSystem().terminate();
    }
}

