/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.dataproxy.node;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.MonitoringType;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.flume.node.MaterializedConfiguration;
import org.apache.flume.node.PollingPropertiesFileConfigurationProvider;
import org.apache.flume.node.PollingZooKeeperConfigurationProvider;
import org.apache.flume.node.PropertiesFileConfigurationProvider;
import org.apache.flume.node.StaticZooKeeperConfigurationProvider;
import org.apache.flume.util.SSLUtil;
import org.apache.inlong.common.metric.MetricObserver;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.heartbeat.HeartbeatManager;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.sdk.commons.admin.AdminTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Application {
    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    private static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
    private static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
    private final List<LifecycleAware> components;
    private final LifecycleSupervisor supervisor;
    private final ReentrantLock lifecycleLock = new ReentrantLock();
    private MaterializedConfiguration materializedConfiguration;
    private MonitorService monitorServer;
    private AdminTask adminTask;

    public Application() {
        this(new ArrayList<LifecycleAware>(0));
    }

    public Application(List<LifecycleAware> components) {
        this.components = components;
        this.supervisor = new LifecycleSupervisor();
    }

    public static void main(String[] args) {
        try {
            Application application;
            boolean isZkConfigured;
            SSLUtil.initGlobalSSLParameters();
            Options options = new Options();
            Option option = new Option("n", "name", true, "the name of this agent");
            option.setRequired(true);
            options.addOption(option);
            option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)");
            option.setRequired(false);
            options.addOption(option);
            option = new Option(null, "no-reload-conf", false, "do not reload config file if changed");
            options.addOption(option);
            option = new Option("z", "zkConnString", true, "specify the ZooKeeper connection to use (required if -f missing)");
            option.setRequired(false);
            options.addOption(option);
            option = new Option("p", "zkBasePath", true, "specify the base path in ZooKeeper for agent configs");
            option.setRequired(false);
            options.addOption(option);
            option = new Option("h", "help", false, "display help text");
            options.addOption(option);
            option = new Option(null, "load-conf-from-manager", false, "load configuration data from manager");
            option.setRequired(false);
            options.addOption(option);
            GnuParser parser = new GnuParser();
            CommandLine commandLine = parser.parse(options, args);
            if (commandLine.hasOption('h')) {
                new HelpFormatter().printHelp("flume-ng agent", options, true);
                return;
            }
            String agentName = commandLine.getOptionValue('n');
            boolean reload = !commandLine.hasOption("no-reload-conf");
            boolean bl = isZkConfigured = commandLine.hasOption('z') || commandLine.hasOption("zkConnString");
            if (isZkConfigured) {
                String zkConnectionStr = commandLine.getOptionValue('z');
                String baseZkPath = commandLine.getOptionValue('p');
                if (reload) {
                    EventBus eventBus = new EventBus(agentName + "-event-bus");
                    ArrayList components = Lists.newArrayList();
                    PollingZooKeeperConfigurationProvider zProvider = new PollingZooKeeperConfigurationProvider(agentName, zkConnectionStr, baseZkPath, eventBus);
                    components.add(zProvider);
                    application = new Application(components);
                    eventBus.register((Object)application);
                } else {
                    StaticZooKeeperConfigurationProvider zProvider = new StaticZooKeeperConfigurationProvider(agentName, zkConnectionStr, baseZkPath);
                    application = new Application();
                    application.handleConfigurationEvent(zProvider.getConfiguration());
                }
            } else {
                File configurationFile = new File(commandLine.getOptionValue('f'));
                if (!configurationFile.exists() && System.getProperty("flume.called.from.service") == null) {
                    String path = configurationFile.getPath();
                    try {
                        path = configurationFile.getCanonicalPath();
                    }
                    catch (IOException ex) {
                        LOGGER.error("failed to read canonical path for file: " + path, (Throwable)ex);
                    }
                    throw new ParseException("configuration file does not exist: " + path);
                }
                ArrayList components = Lists.newArrayList();
                if (reload) {
                    EventBus eventBus = new EventBus(agentName + "-event-bus");
                    PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30);
                    components.add(configurationProvider);
                    application = new Application(components);
                    eventBus.register((Object)application);
                } else {
                    PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile);
                    application = new Application();
                    application.handleConfigurationEvent(configurationProvider.getConfiguration());
                }
            }
            MetricObserver.init(CommonConfigHolder.getInstance().getProperties());
            AuditUtils.initAudit();
            final Application appReference = application;
            Runtime.getRuntime().addShutdownHook(new Thread("data-proxy-shutdown-hook"){

                @Override
                public void run() {
                    AuditUtils.send();
                    appReference.stop();
                }
            });
            application.start();
            Thread.sleep(5000L);
        }
        catch (Exception e) {
            LOGGER.error("fatal error occurred while running data-proxy: ", (Throwable)e);
        }
    }

    public void start() {
        this.lifecycleLock.lock();
        try {
            for (LifecycleAware component : this.components) {
                this.supervisor.supervise(component, (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            this.adminTask = new AdminTask(new Context(CommonConfigHolder.getInstance().getProperties()));
            this.adminTask.start();
            HeartbeatManager heartbeatManager = new HeartbeatManager();
            heartbeatManager.start();
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Subscribe
    public void handleConfigurationEvent(MaterializedConfiguration conf) {
        try {
            this.lifecycleLock.lockInterruptibly();
            this.stopAllComponents();
            this.startAllComponents(conf);
        }
        catch (InterruptedException e) {
            LOGGER.info("interrupted while handle the configuration event");
        }
        finally {
            if (this.lifecycleLock.isHeldByCurrentThread()) {
                this.lifecycleLock.unlock();
            }
        }
    }

    public void stop() {
        this.lifecycleLock.lock();
        this.stopAllComponents();
        try {
            this.supervisor.stop();
            if (this.monitorServer != null) {
                this.monitorServer.stop();
            }
            if (this.adminTask != null) {
                this.adminTask.stop();
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    private void stopAllComponents() {
        LOGGER.info("shutting down configuration: {}", (Object)this.materializedConfiguration);
        if (this.materializedConfiguration != null) {
            for (Map.Entry entry : this.materializedConfiguration.getSourceRunners().entrySet()) {
                try {
                    LOGGER.info("stopping source " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    LOGGER.error("error while stopping source " + entry.getValue(), (Throwable)e);
                }
            }
            for (Map.Entry entry : this.materializedConfiguration.getSinkRunners().entrySet()) {
                try {
                    LOGGER.info("stopping sink " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    LOGGER.error("error while stopping sink " + entry.getValue(), (Throwable)e);
                }
            }
            for (Map.Entry entry : this.materializedConfiguration.getChannels().entrySet()) {
                try {
                    LOGGER.info("stopping channel " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    LOGGER.error("error while stopping channel " + entry.getValue(), (Throwable)e);
                }
            }
        }
        LOGGER.info("shutting down monitor server: {}", (Object)this.monitorServer);
        if (this.monitorServer != null) {
            this.monitorServer.stop();
        }
    }

    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
        LOGGER.info("Starting new configuration:{}", (Object)materializedConfiguration);
        this.materializedConfiguration = materializedConfiguration;
        for (Map.Entry entry : materializedConfiguration.getChannels().entrySet()) {
            try {
                LOGGER.info("starting channel " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                LOGGER.error("error while starting channel " + entry.getValue(), (Throwable)e);
            }
        }
        for (Channel ch : materializedConfiguration.getChannels().values()) {
            while (ch.getLifecycleState() != LifecycleState.START && !this.supervisor.isComponentInErrorState((LifecycleAware)ch)) {
                try {
                    LOGGER.info("sleeping for 500 ms to wait for channel: {} to start", (Object)ch.getName());
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    LOGGER.error("interrupted while waiting for channel to start: ", (Throwable)e);
                    Throwables.propagate((Throwable)e);
                }
            }
        }
        for (Map.Entry entry : materializedConfiguration.getSinkRunners().entrySet()) {
            try {
                LOGGER.info("starting sink " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                LOGGER.error("error while starting sink: " + entry.getValue(), (Throwable)e);
            }
        }
        for (Map.Entry entry : materializedConfiguration.getSourceRunners().entrySet()) {
            try {
                LOGGER.info("starting source " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                LOGGER.error("error while starting source: " + entry.getValue(), (Throwable)e);
            }
        }
        this.loadMonitoring();
    }

    private void loadMonitoring() {
        block5: {
            Properties systemProps = System.getProperties();
            Set<String> keys = systemProps.stringPropertyNames();
            try {
                Class klass;
                if (!keys.contains(CONF_MONITOR_CLASS)) break block5;
                String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
                try {
                    klass = MonitoringType.valueOf((String)monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
                }
                catch (Exception e) {
                    klass = Class.forName(monitorType);
                }
                this.monitorServer = (MonitorService)klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                Context context = new Context();
                for (String key : keys) {
                    if (!key.startsWith(CONF_MONITOR_PREFIX)) continue;
                    context.put(key.substring(CONF_MONITOR_PREFIX.length()), systemProps.getProperty(key));
                }
                this.monitorServer.configure(context);
                this.monitorServer.start();
            }
            catch (Exception e) {
                LOGGER.warn("starting monitoring error, the monitoring might not be available: ", (Throwable)e);
            }
        }
    }
}

