package org.apache.storm.streams.processors;

import java.util.Iterator;
import java.util.Set;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.streams.ProcessorNode;
import org.apache.storm.streams.WindowNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/streams/processors/ForwardingProcessorContext.class */
public class ForwardingProcessorContext implements ProcessorContext {
    private static final Logger LOG = LoggerFactory.getLogger(ForwardingProcessorContext.class);
    private final ProcessorNode processorNode;
    private final Multimap<String, ProcessorNode> streamToChildren;
    private final Set<String> streams;

    public ForwardingProcessorContext(ProcessorNode processorNode, Multimap<String, ProcessorNode> multimap) {
        this.processorNode = processorNode;
        this.streamToChildren = multimap;
        this.streams = multimap.keySet();
    }

    @Override // org.apache.storm.streams.processors.ProcessorContext
    public <T> void forward(T t) {
        if (WindowNode.PUNCTUATION.equals(t)) {
            finishAllStreams();
        } else {
            executeAllStreams(t);
        }
    }

    @Override // org.apache.storm.streams.processors.ProcessorContext
    public <T> void forward(T t, String str) {
        if (WindowNode.PUNCTUATION.equals(t)) {
            finish(str);
        } else {
            execute(t, str);
        }
    }

    @Override // org.apache.storm.streams.processors.ProcessorContext
    public boolean isWindowed() {
        return this.processorNode.isWindowed();
    }

    @Override // org.apache.storm.streams.processors.ProcessorContext
    public Set<String> getWindowedParentStreams() {
        return this.processorNode.getWindowedParentStreams();
    }

    private void finishAllStreams() {
        Iterator<String> it = this.streams.iterator();
        while (it.hasNext()) {
            finish(it.next());
        }
    }

    private <T> void finish(String str) {
        for (ProcessorNode processorNode : this.streamToChildren.get(str)) {
            LOG.debug("Punctuating processor: {}", processorNode);
            processorNode.getProcessor().punctuate(str);
        }
    }

    private <T> void executeAllStreams(T t) {
        Iterator<String> it = this.streams.iterator();
        while (it.hasNext()) {
            execute(t, it.next());
        }
    }

    private <T> void execute(T t, String str) {
        for (ProcessorNode processorNode : this.streamToChildren.get(str)) {
            LOG.debug("Forward input: {} to processor node: {}", t, processorNode);
            processorNode.getProcessor().execute(t, str);
        }
    }
}
