/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.sdk.commons.admin;

import com.google.common.eventbus.Subscribe;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.flume.node.MaterializedConfiguration;
import org.apache.inlong.sdk.commons.admin.AdminHttpSource;
import org.apache.inlong.sdk.commons.admin.PropertiesConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdminTask {
    public static final Logger LOG = LoggerFactory.getLogger(AdminTask.class);
    public static final String KEY_HOST = "adminTask.host";
    public static final String KEY_PORT = "adminTask.port";
    public static final String KEY_HANDLER = "adminTask.handler";
    public static final String FLUME_ROOT = "admin";
    private Context context;
    private final LifecycleSupervisor supervisor;
    private MaterializedConfiguration materializedConfiguration;
    private final ReentrantLock lifecycleLock = new ReentrantLock();

    public AdminTask(Context context) {
        this.context = context;
        this.supervisor = new LifecycleSupervisor();
    }

    public void start() {
        try {
            Map<String, String> flumeConfiguration = this.generateFlumeConfiguration();
            if (flumeConfiguration == null) {
                return;
            }
            LOG.info("Start admin task,flumeConf:{}", flumeConfiguration);
            PropertiesConfigurationProvider configurationProvider = new PropertiesConfigurationProvider(FLUME_ROOT, flumeConfiguration);
            this.handleConfigurationEvent(configurationProvider.getConfiguration());
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
    }

    private Map<String, String> generateFlumeConfiguration() throws IOException {
        String host = this.context.getString(KEY_HOST);
        if (host == null) {
            LOG.error("Can not start admin task, host is null.");
            return null;
        }
        String port = this.context.getString(KEY_PORT);
        if (port == null) {
            LOG.error("Can not start admin task, port is null.");
            return null;
        }
        String handlerType = this.context.getString(KEY_HANDLER);
        if (handlerType == null) {
            LOG.error("Can not start admin task, handlerType is null.");
            return null;
        }
        String flumeString = String.format("admin.sources=r1\nadmin.sinks=k1\nadmin.channels=c1\nadmin.sources.r1.type=" + AdminHttpSource.class.getName() + "\nadmin.sources.r1.bind=%s\nadmin.sources.r1.port=%s\nadmin.sources.r1.channels=c1\nadmin.sources.r1.handler=%s\nadmin.sinks.k1.type=logger\nadmin.sinks.k1.channel=c1\nadmin.channels.c1.type=memory\nadmin.channels.c1.capacity=1000\nadmin.channels.c1.transactionCapacity=100", host, port, handlerType);
        Properties props = new Properties();
        props.load(new StringReader(flumeString));
        HashMap<String, String> flumeMap = new HashMap<String, String>();
        props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> flumeMap.put(String.valueOf(key), String.valueOf(value))));
        Map subHandlerConfig = this.context.getSubProperties("adminTask.handler.");
        subHandlerConfig.forEach((key, value) -> flumeMap.put("admin.sources.r1.handler." + key, (String)value));
        return flumeMap;
    }

    @Subscribe
    public void handleConfigurationEvent(MaterializedConfiguration conf) {
        try {
            this.lifecycleLock.lockInterruptibly();
            this.stopAllComponents();
            this.startAllComponents(conf);
        }
        catch (InterruptedException e) {
            LOG.info("Interrupted while trying to handle configuration event");
            return;
        }
        finally {
            if (this.lifecycleLock.isHeldByCurrentThread()) {
                this.lifecycleLock.unlock();
            }
        }
    }

    public void stop() {
        this.lifecycleLock.lock();
        this.stopAllComponents();
        try {
            this.supervisor.stop();
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    private void stopAllComponents() {
        if (this.materializedConfiguration != null) {
            LOG.info("Shutting down configuration: {}", (Object)this.materializedConfiguration);
            for (Map.Entry entry : this.materializedConfiguration.getSourceRunners().entrySet()) {
                try {
                    LOG.info("Stopping Source " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    LOG.error("Error while stopping {}", entry.getValue(), (Object)e);
                }
            }
            for (Map.Entry entry : this.materializedConfiguration.getSinkRunners().entrySet()) {
                try {
                    LOG.info("Stopping Sink " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    LOG.error("Error while stopping {}", entry.getValue(), (Object)e);
                }
            }
            for (Map.Entry entry : this.materializedConfiguration.getChannels().entrySet()) {
                try {
                    LOG.info("Stopping Channel " + (String)entry.getKey());
                    this.supervisor.unsupervise((LifecycleAware)entry.getValue());
                }
                catch (Exception e) {
                    LOG.error("Error while stopping {}", entry.getValue(), (Object)e);
                }
            }
        }
    }

    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
        LOG.info("Starting new configuration:{}", (Object)materializedConfiguration);
        this.materializedConfiguration = materializedConfiguration;
        for (Map.Entry entry : materializedConfiguration.getChannels().entrySet()) {
            try {
                LOG.info("Starting Channel " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                LOG.error("Error while starting {}", entry.getValue(), (Object)e);
            }
        }
        for (Channel ch : materializedConfiguration.getChannels().values()) {
            while (ch.getLifecycleState() != LifecycleState.START && !this.supervisor.isComponentInErrorState((LifecycleAware)ch)) {
                try {
                    LOG.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms");
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    LOG.error("Interrupted while waiting for channel to start.", (Throwable)e);
                }
            }
        }
        for (Map.Entry entry : materializedConfiguration.getSinkRunners().entrySet()) {
            try {
                LOG.info("Starting Sink " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                LOG.error("Error while starting {}", entry.getValue(), (Object)e);
            }
        }
        for (Map.Entry entry : materializedConfiguration.getSourceRunners().entrySet()) {
            try {
                LOG.info("Starting Source " + (String)entry.getKey());
                this.supervisor.supervise((LifecycleAware)entry.getValue(), (LifecycleSupervisor.SupervisorPolicy)new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
            catch (Exception e) {
                LOG.error("Error while starting {}", entry.getValue(), (Object)e);
            }
        }
    }

    public static void main(String[] args) {
        try {
            Context context = new Context();
            context.put(KEY_HOST, "127.0.0.1");
            context.put(KEY_PORT, "8080");
            context.put(KEY_HANDLER, "org.apache.inlong.dataproxy.admin.AdminJsonHandler");
            context.put("adminTask.handler.stopService.type", "org.apache.inlong.dataproxy.admin.ProxyServiceAdminEventHandler");
            AdminTask task = new AdminTask(context);
            task.start();
            Thread.sleep(10000L);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

