/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.List;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TimestampSupplier;
import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;

public class ProcessorContextImpl
extends AbstractProcessorContext
implements RecordCollector.Supplier {
    private final StreamTask task;
    private final RecordCollector collector;
    private TimestampSupplier streamTimeSupplier;
    private final ToInternal toInternal = new ToInternal();
    private static final To SEND_TO_ALL = To.all();

    ProcessorContextImpl(TaskId id, StreamTask task, StreamsConfig config, RecordCollector collector, ProcessorStateManager stateMgr, StreamsMetricsImpl metrics, ThreadCache cache) {
        super(id, config, metrics, stateMgr, cache);
        this.task = task;
        this.collector = collector;
    }

    public ProcessorStateManager getStateMgr() {
        return (ProcessorStateManager)this.stateManager;
    }

    @Override
    public RecordCollector recordCollector() {
        return this.collector;
    }

    @Override
    public StateStore getStateStore(String name) {
        if (this.currentNode() == null) {
            throw new StreamsException("Accessing from an unknown node");
        }
        StateStore global = this.stateManager.getGlobalStore(name);
        if (global != null) {
            return global;
        }
        if (!this.currentNode().stateStores.contains(name)) {
            throw new StreamsException("Processor " + this.currentNode().name() + " has no access to StateStore " + name + " as the store is not connected to the processor. If you add stores manually via '.addStateStore()' " + "make sure to connect the added store to the processor by providing the processor name to " + "'.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. " + "DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' " + "to connect the store to the corresponding operator. If you do not add stores manually, " + "please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
        }
        return this.stateManager.getStore(name);
    }

    @Override
    public <K, V> void forward(K key, V value) {
        this.forward(key, value, SEND_TO_ALL);
    }

    @Override
    public <K, V> void forward(K key, V value, int childIndex) {
        this.forward(key, value, To.child(this.currentNode().children().get(childIndex).name()));
    }

    @Override
    public <K, V> void forward(K key, V value, String childName) {
        this.forward(key, value, To.child(childName));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <K, V> void forward(K key, V value, To to) {
        this.toInternal.update(to);
        if (this.toInternal.hasTimestamp()) {
            this.recordContext.setTimestamp(this.toInternal.timestamp());
        }
        ProcessorNode previousNode = this.currentNode();
        try {
            List<ProcessorNode<?, ?>> children = this.currentNode().children();
            String sendTo = this.toInternal.child();
            if (sendTo != null) {
                ProcessorNode child = this.currentNode().getChild(sendTo);
                if (child == null) {
                    throw new StreamsException("Unknown downstream node: " + sendTo + " either does not exist or is not" + " connected to this processor.");
                }
                this.forward(child, key, value);
            } else if (children.size() == 1) {
                ProcessorNode<?, ?> child = children.get(0);
                this.forward(child, key, value);
            } else {
                for (ProcessorNode<?, ?> child : children) {
                    this.forward(child, key, value);
                }
            }
        }
        finally {
            this.setCurrentNode(previousNode);
        }
    }

    private <K, V> void forward(ProcessorNode child, K key, V value) {
        this.setCurrentNode(child);
        child.process(key, value);
    }

    @Override
    public void commit() {
        this.task.requestCommit();
    }

    @Override
    @Deprecated
    public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
        return this.task.schedule(interval, type, callback);
    }

    @Override
    public Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback) throws IllegalArgumentException {
        ApiUtils.validateMillisecondDuration(interval, "interval");
        return this.schedule(interval.toMillis(), type, callback);
    }

    void setStreamTimeSupplier(TimestampSupplier streamTimeSupplier) {
        this.streamTimeSupplier = streamTimeSupplier;
    }

    @Override
    public long streamTime() {
        return this.streamTimeSupplier.get();
    }
}

