/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.plugins;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ThreadUtils;
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.integration.bus.BusUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.plugins.AbstractPlugin;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.ModuleType;
import org.springframework.xd.module.core.Module;

public abstract class AbstractMessageBusBinderPlugin
extends AbstractPlugin {
    protected static final String MODULE_INPUT_CHANNEL = "input";
    protected static final String MODULE_OUTPUT_CHANNEL = "output";
    protected static final String JOB_CHANNEL_PREFIX = "job:";
    protected final MessageBus messageBus;
    private volatile PathChildrenCache taps;
    private final TapListener tapListener = new TapListener();
    private final Map<String, MessageChannel> tappableChannels = new HashMap<String, MessageChannel>();

    public AbstractMessageBusBinderPlugin(MessageBus messageBus) {
        this(messageBus, null);
    }

    public AbstractMessageBusBinderPlugin(MessageBus messageBus, ZooKeeperConnection zkConnection) {
        Assert.notNull((Object)messageBus, (String)"MessageBus must not be null.");
        this.messageBus = messageBus;
        if (zkConnection != null) {
            if (zkConnection.isConnected()) {
                this.startTapListener(zkConnection.getClient());
            }
            zkConnection.addListener(new TapLifecycleConnectionListener());
        }
    }

    private void startTapListener(CuratorFramework client) {
        String tapPath = Paths.build("taps");
        Paths.ensurePath(client, tapPath);
        this.taps = new PathChildrenCache(client, tapPath, true, ThreadUtils.newThreadFactory((String)"TapsPathChildrenCache"));
        this.taps.getListenable().addListener((Object)this.tapListener);
        try {
            this.taps.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        }
        catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e, "failed to start TapListener");
        }
    }

    protected final void bindConsumerAndProducers(Module module) {
        MessageChannel inputChannel;
        MessageChannel outputChannel;
        boolean trackHistory = module.getDeploymentProperties() != null ? module.getDeploymentProperties().getTrackHistory() : false;
        Properties[] properties = this.extractConsumerProducerProperties(module);
        Map<String, Object> historyProperties = null;
        if (trackHistory) {
            historyProperties = this.extractHistoryProperties(module);
            this.addHistoryTag(module, historyProperties);
        }
        if ((outputChannel = (MessageChannel)module.getComponent(MODULE_OUTPUT_CHANNEL, MessageChannel.class)) != null) {
            this.bindMessageProducer(outputChannel, this.getOutputChannelName(module), properties[1]);
            String tapChannelName = this.buildTapChannelName(module);
            this.tappableChannels.put(tapChannelName, outputChannel);
            if (this.isTapActive(tapChannelName)) {
                this.createAndBindTapChannel(tapChannelName, outputChannel);
            }
            if (trackHistory) {
                this.track(module, outputChannel, historyProperties);
            }
        }
        if ((inputChannel = (MessageChannel)module.getComponent(MODULE_INPUT_CHANNEL, MessageChannel.class)) != null) {
            this.bindMessageConsumer(inputChannel, this.getInputChannelName(module), module.getDescriptor().getGroup(), properties[0]);
            if (trackHistory && module.getType().equals((Object)ModuleType.sink)) {
                this.track(module, inputChannel, historyProperties);
            }
        }
    }

    private void addHistoryTag(Module module, Map<String, Object> historyProperties) {
        String historyTag = module.getDescriptor().getModuleLabel();
        if (module.getDescriptor().getSinkChannelName() != null) {
            historyTag = historyTag + ">" + module.getDescriptor().getSinkChannelName();
        }
        if (module.getDescriptor().getSourceChannelName() != null) {
            historyTag = module.getDescriptor().getSourceChannelName() + ">" + historyTag;
        }
        historyProperties.put("module", historyTag);
    }

    private void track(Module module, MessageChannel channel, final Map<String, Object> historyProps) {
        DefaultMessageBuilderFactory messageBuilderFactory;
        Object object = messageBuilderFactory = module.getComponent("messageBuilderFactory", MessageBuilderFactory.class) == null ? new DefaultMessageBuilderFactory() : (MessageBuilderFactory)module.getComponent("messageBuilderFactory", MessageBuilderFactory.class);
        if (channel instanceof ChannelInterceptorAware) {
            ((ChannelInterceptorAware)channel).addInterceptor((ChannelInterceptor)new ChannelInterceptorAdapter((MessageBuilderFactory)messageBuilderFactory){
                final /* synthetic */ MessageBuilderFactory val$messageBuilderFactory;
                {
                    this.val$messageBuilderFactory = messageBuilderFactory;
                }

                public Message<?> preSend(Message<?> message, MessageChannel channel) {
                    ArrayList history = (ArrayList)message.getHeaders().get((Object)"xdHistory");
                    history = history == null ? new ArrayList(1) : new ArrayList(history);
                    LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
                    map.putAll(historyProps);
                    map.put("thread", Thread.currentThread().getName());
                    history.add(map);
                    Message out = this.val$messageBuilderFactory.fromMessage(message).setHeader("xdHistory", history).build();
                    map.put("timestamp", out.getHeaders().getTimestamp());
                    return out;
                }
            });
        }
    }

    protected final Properties[] extractConsumerProducerProperties(Module module) {
        Properties consumerProperties = new Properties();
        Properties producerProperties = new Properties();
        String consumerKeyPrefix = "consumer.";
        String producerKeyPrefix = "producer.";
        if (module.getDeploymentProperties() != null) {
            for (Map.Entry entry : module.getDeploymentProperties().entrySet()) {
                if (((String)entry.getKey()).startsWith(consumerKeyPrefix)) {
                    consumerProperties.put(((String)entry.getKey()).substring(consumerKeyPrefix.length()), entry.getValue());
                    continue;
                }
                if (!((String)entry.getKey()).startsWith(producerKeyPrefix)) continue;
                producerProperties.put(((String)entry.getKey()).substring(producerKeyPrefix.length()), entry.getValue());
            }
        }
        return new Properties[]{consumerProperties, producerProperties};
    }

    protected final Map<String, Object> extractHistoryProperties(Module module) {
        LinkedHashMap<String, Object> properties = new LinkedHashMap<String, Object>();
        if (module.getProperties() != null) {
            for (Map.Entry<Object, Object> entry : module.getProperties().entrySet()) {
                if (!(entry.getKey() instanceof String)) continue;
                String key = (String)entry.getKey();
                if (key.startsWith("xd.container.")) {
                    if ((key = key.substring("xd.container.".length())).equals("id")) {
                        key = "container.id";
                    }
                    properties.put(key, entry.getValue());
                    continue;
                }
                if (!key.equals("xd.stream.name")) continue;
                properties.put(key.substring(3), entry.getValue());
            }
        }
        return properties;
    }

    @Override
    public void beforeShutdown(Module module) {
        this.unbindConsumer(module);
    }

    @Override
    public void removeModule(Module module) {
        super.removeModule(module);
        this.unbindProducers(module);
    }

    protected abstract String getInputChannelName(Module var1);

    protected abstract String getOutputChannelName(Module var1);

    protected abstract String buildTapChannelName(Module var1);

    private void bindMessageConsumer(MessageChannel inputChannel, String inputChannelName, String group, Properties consumerProperties) {
        if (BusUtils.isChannelPubSub((String)inputChannelName)) {
            String channelToBind = inputChannelName;
            if (this.messageBus.isCapable(MessageBus.Capability.DURABLE_PUBSUB)) {
                channelToBind = BusUtils.addGroupToPubSub((String)group, (String)inputChannelName);
            }
            this.messageBus.bindPubSubConsumer(channelToBind, inputChannel, consumerProperties);
        } else {
            this.messageBus.bindConsumer(inputChannelName, inputChannel, consumerProperties);
        }
    }

    private void bindMessageProducer(MessageChannel outputChannel, String outputChannelName, Properties producerProperties) {
        if (BusUtils.isChannelPubSub((String)outputChannelName)) {
            this.messageBus.bindPubSubProducer(outputChannelName, outputChannel, producerProperties);
        } else {
            this.messageBus.bindProducer(outputChannelName, outputChannel, producerProperties);
        }
    }

    private void createAndBindTapChannel(String tapChannelName, MessageChannel outputChannel) {
        this.logger.info("creating and binding tap channel for {}", (Object)tapChannelName);
        if (outputChannel instanceof ChannelInterceptorAware) {
            DirectChannel tapChannel = new DirectChannel();
            tapChannel.setBeanName(tapChannelName + ".tap.bridge");
            this.messageBus.bindPubSubProducer(tapChannelName, (MessageChannel)tapChannel, null);
            this.tapOutputChannel((MessageChannel)tapChannel, (ChannelInterceptorAware)outputChannel);
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("output channel is not interceptor aware. Tap will not be created.");
        }
    }

    private MessageChannel tapOutputChannel(MessageChannel tapChannel, ChannelInterceptorAware outputChannel) {
        outputChannel.addInterceptor((ChannelInterceptor)new WireTap(tapChannel));
        return tapChannel;
    }

    protected void unbindConsumer(Module module) {
        MessageChannel inputChannel = (MessageChannel)module.getComponent(MODULE_INPUT_CHANNEL, MessageChannel.class);
        if (inputChannel != null) {
            String channelToUnbind = this.getInputChannelName(module);
            if (this.messageBus.isCapable(MessageBus.Capability.DURABLE_PUBSUB)) {
                channelToUnbind = BusUtils.addGroupToPubSub((String)module.getDescriptor().getGroup(), (String)channelToUnbind);
            }
            this.messageBus.unbindConsumer(channelToUnbind, inputChannel);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Unbound consumer for " + module.toString());
            }
        }
    }

    protected void unbindProducers(Module module) {
        MessageChannel outputChannel = (MessageChannel)module.getComponent(MODULE_OUTPUT_CHANNEL, MessageChannel.class);
        if (outputChannel != null) {
            this.messageBus.unbindProducer(this.getOutputChannelName(module), outputChannel);
            String tapChannelName = this.buildTapChannelName(module);
            this.unbindTapChannel(tapChannelName);
            this.tappableChannels.remove(tapChannelName);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Unbound producer(s) for " + module.toString());
            }
        }
    }

    private void unbindTapChannel(String tapChannelName) {
        MessageChannel tappedChannel = this.tappableChannels.get(tapChannelName);
        if (tappedChannel instanceof ChannelInterceptorAware) {
            ChannelInterceptorAware interceptorAware = (ChannelInterceptorAware)tappedChannel;
            ArrayList<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>();
            for (ChannelInterceptor interceptor : interceptorAware.getChannelInterceptors()) {
                if (interceptor instanceof WireTap) {
                    ((WireTap)interceptor).stop();
                    continue;
                }
                interceptors.add(interceptor);
            }
            interceptorAware.setInterceptors(interceptors);
            this.messageBus.unbindProducers(tapChannelName);
        }
    }

    @Override
    public int getOrder() {
        return 0;
    }

    private void onTapAdded(ChildData data) {
        String tapChannelName = this.buildTapChannelNameFromPath(data.getPath());
        MessageChannel outputChannel = this.tappableChannels.get(tapChannelName);
        if (outputChannel != null) {
            this.createAndBindTapChannel(tapChannelName, outputChannel);
        }
    }

    private void onTapRemoved(ChildData data) {
        this.unbindTapChannel(this.buildTapChannelNameFromPath(data.getPath()));
    }

    private boolean isTapActive(String tapChannelName) {
        Assert.state((this.taps != null ? 1 : 0) != 0, (String)"tap cache not started");
        List currentTaps = this.taps.getCurrentData();
        for (ChildData data : currentTaps) {
            if (!this.buildTapChannelNameFromPath(data.getPath()).equals(tapChannelName)) continue;
            return true;
        }
        return false;
    }

    private String buildTapChannelNameFromPath(String path) {
        return "tap:" + Paths.stripPath(path);
    }

    class TapLifecycleConnectionListener
    implements ZooKeeperConnectionListener {
        TapLifecycleConnectionListener() {
        }

        @Override
        public void onDisconnect(CuratorFramework client) {
            AbstractMessageBusBinderPlugin.this.taps.getListenable().removeListener((Object)AbstractMessageBusBinderPlugin.this.tapListener);
            try {
                AbstractMessageBusBinderPlugin.this.taps.close();
            }
            catch (Exception e) {
                throw ZooKeeperUtils.wrapThrowable(e);
            }
        }

        @Override
        public void onSuspend(CuratorFramework client) {
        }

        @Override
        public void onConnect(CuratorFramework client) {
            AbstractMessageBusBinderPlugin.this.startTapListener(client);
        }

        @Override
        public void onResume(CuratorFramework client) {
        }
    }

    class TapListener
    implements PathChildrenCacheListener {
        TapListener() {
        }

        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            ZooKeeperUtils.logCacheEvent(AbstractMessageBusBinderPlugin.this.logger, event);
            switch (event.getType()) {
                case INITIALIZED: {
                    break;
                }
                case CHILD_ADDED: {
                    AbstractMessageBusBinderPlugin.this.onTapAdded(event.getData());
                    break;
                }
                case CHILD_REMOVED: {
                    AbstractMessageBusBinderPlugin.this.onTapRemoved(event.getData());
                    break;
                }
            }
        }
    }
}

