/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.List;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

public class ProcessorContextImpl
extends AbstractProcessorContext
implements RecordCollector.Supplier {
    private final StreamTask task;
    private final RecordCollector collector;
    private final ToInternal toInternal = new ToInternal();
    private static final To SEND_TO_ALL = To.all();

    ProcessorContextImpl(TaskId id, StreamTask task, StreamsConfig config, RecordCollector collector, ProcessorStateManager stateMgr, StreamsMetricsImpl metrics, ThreadCache cache) {
        super(id, config, metrics, stateMgr, cache);
        this.task = task;
        this.collector = collector;
    }

    public ProcessorStateManager getStateMgr() {
        return (ProcessorStateManager)this.stateManager;
    }

    @Override
    public RecordCollector recordCollector() {
        return this.collector;
    }

    @Override
    public StateStore getStateStore(String name) {
        if (this.currentNode() == null) {
            throw new StreamsException("Accessing from an unknown node");
        }
        StateStore global = this.stateManager.getGlobalStore(name);
        if (global != null) {
            if (global instanceof KeyValueStore) {
                return new KeyValueStoreReadOnlyDecorator((KeyValueStore)global);
            }
            if (global instanceof WindowStore) {
                return new WindowStoreReadOnlyDecorator((WindowStore)global);
            }
            if (global instanceof SessionStore) {
                return new SessionStoreReadOnlyDecorator((SessionStore)global);
            }
            return global;
        }
        if (!this.currentNode().stateStores.contains(name)) {
            throw new StreamsException("Processor " + this.currentNode().name() + " has no access to StateStore " + name + " as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
        }
        StateStore store = this.stateManager.getStore(name);
        if (store instanceof KeyValueStore) {
            return new KeyValueStoreReadWriteDecorator((KeyValueStore)store);
        }
        if (store instanceof WindowStore) {
            return new WindowStoreReadWriteDecorator((WindowStore)store);
        }
        if (store instanceof SessionStore) {
            return new SessionStoreReadWriteDecorator((SessionStore)store);
        }
        return store;
    }

    @Override
    public <K, V> void forward(K key, V value) {
        this.forward(key, value, SEND_TO_ALL);
    }

    @Override
    public <K, V> void forward(K key, V value, int childIndex) {
        this.forward(key, value, To.child(this.currentNode().children().get(childIndex).name()));
    }

    @Override
    public <K, V> void forward(K key, V value, String childName) {
        this.forward(key, value, To.child(childName));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <K, V> void forward(K key, V value, To to) {
        ProcessorNode previousNode = this.currentNode();
        long currentTimestamp = this.recordContext.timestamp();
        try {
            String sendTo;
            this.toInternal.update(to);
            if (this.toInternal.hasTimestamp()) {
                this.recordContext.setTimestamp(this.toInternal.timestamp());
            }
            if ((sendTo = this.toInternal.child()) == null) {
                List<ProcessorNode<?, ?>> children = this.currentNode().children();
                for (ProcessorNode<?, ?> child : children) {
                    this.forward(child, key, value);
                }
            } else {
                ProcessorNode child = this.currentNode().getChild(sendTo);
                if (child == null) {
                    throw new StreamsException("Unknown downstream node: " + sendTo + " either does not exist or is not connected to this processor.");
                }
                this.forward(child, key, value);
            }
        }
        finally {
            this.recordContext.setTimestamp(currentTimestamp);
            this.setCurrentNode(previousNode);
        }
    }

    private <K, V> void forward(ProcessorNode child, K key, V value) {
        this.setCurrentNode(child);
        child.process(key, value);
    }

    @Override
    public void commit() {
        this.task.requestCommit();
    }

    @Override
    @Deprecated
    public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
        if (interval < 1L) {
            throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
        }
        return this.task.schedule(interval, type, callback);
    }

    @Override
    public Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback) throws IllegalArgumentException {
        String msgPrefix = ApiUtils.prepareMillisCheckFailMsgPrefix(interval, "interval");
        return this.schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
    }

    private static class SessionStoreReadWriteDecorator<K, AGG>
    extends StateStoreReadWriteDecorator<SessionStore<K, AGG>>
    implements SessionStore<K, AGG> {
        private SessionStoreReadWriteDecorator(SessionStore<K, AGG> inner) {
            super(inner, null);
        }

        @Override
        public KeyValueIterator<Windowed<K>, AGG> findSessions(K key, long earliestSessionEndTime, long latestSessionStartTime) {
            return ((SessionStore)this.wrapped()).findSessions(key, earliestSessionEndTime, latestSessionStartTime);
        }

        @Override
        public KeyValueIterator<Windowed<K>, AGG> findSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
            return ((SessionStore)this.wrapped()).findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
        }

        @Override
        public void remove(Windowed<K> sessionKey) {
            ((SessionStore)this.wrapped()).remove(sessionKey);
        }

        @Override
        public void put(Windowed<K> sessionKey, AGG aggregate) {
            ((SessionStore)this.wrapped()).put(sessionKey, aggregate);
        }

        @Override
        public AGG fetchSession(K key, long startTime, long endTime) {
            return ((SessionStore)this.wrapped()).fetchSession(key, startTime, endTime);
        }

        @Override
        public KeyValueIterator<Windowed<K>, AGG> fetch(K key) {
            return ((SessionStore)this.wrapped()).fetch(key);
        }

        @Override
        public KeyValueIterator<Windowed<K>, AGG> fetch(K from, K to) {
            return ((SessionStore)this.wrapped()).fetch(from, to);
        }
    }

    private static class WindowStoreReadWriteDecorator<K, V>
    extends StateStoreReadWriteDecorator<WindowStore<K, V>>
    implements WindowStore<K, V> {
        private WindowStoreReadWriteDecorator(WindowStore<K, V> inner) {
            super(inner, null);
        }

        @Override
        public void put(K key, V value) {
            ((WindowStore)this.wrapped()).put(key, value);
        }

        @Override
        public void put(K key, V value, long windowStartTimestamp) {
            ((WindowStore)this.wrapped()).put(key, value, windowStartTimestamp);
        }

        @Override
        public V fetch(K key, long time) {
            return ((WindowStore)this.wrapped()).fetch(key, time);
        }

        @Override
        @Deprecated
        public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
            return ((WindowStore)this.wrapped()).fetch(key, timeFrom, timeTo);
        }

        @Override
        @Deprecated
        public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
            return ((WindowStore)this.wrapped()).fetch(from, to, timeFrom, timeTo);
        }

        @Override
        public KeyValueIterator<Windowed<K>, V> all() {
            return ((WindowStore)this.wrapped()).all();
        }

        @Override
        @Deprecated
        public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
            return ((WindowStore)this.wrapped()).fetchAll(timeFrom, timeTo);
        }
    }

    private static class KeyValueStoreReadWriteDecorator<K, V>
    extends StateStoreReadWriteDecorator<KeyValueStore<K, V>>
    implements KeyValueStore<K, V> {
        private KeyValueStoreReadWriteDecorator(KeyValueStore<K, V> inner) {
            super(inner, null);
        }

        @Override
        public V get(K key) {
            return ((KeyValueStore)this.wrapped()).get(key);
        }

        @Override
        public KeyValueIterator<K, V> range(K from, K to) {
            return ((KeyValueStore)this.wrapped()).range(from, to);
        }

        @Override
        public KeyValueIterator<K, V> all() {
            return ((KeyValueStore)this.wrapped()).all();
        }

        @Override
        public long approximateNumEntries() {
            return ((KeyValueStore)this.wrapped()).approximateNumEntries();
        }

        @Override
        public void put(K key, V value) {
            ((KeyValueStore)this.wrapped()).put(key, value);
        }

        @Override
        public V putIfAbsent(K key, V value) {
            return ((KeyValueStore)this.wrapped()).putIfAbsent(key, value);
        }

        @Override
        public void putAll(List<KeyValue<K, V>> entries) {
            ((KeyValueStore)this.wrapped()).putAll(entries);
        }

        @Override
        public V delete(K key) {
            return ((KeyValueStore)this.wrapped()).delete(key);
        }
    }

    private static abstract class StateStoreReadWriteDecorator<T extends StateStore>
    extends WrappedStateStore<T> {
        static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";

        private StateStoreReadWriteDecorator(T inner) {
            super(inner);
        }

        @Override
        public void init(ProcessorContext context, StateStore root) {
            throw new UnsupportedOperationException(ERROR_MESSAGE);
        }

        @Override
        public void close() {
            throw new UnsupportedOperationException(ERROR_MESSAGE);
        }

        /* synthetic */ StateStoreReadWriteDecorator(StateStore x0, 1 x1) {
            this(x0);
        }
    }

    private static class SessionStoreReadOnlyDecorator<K, AGG>
    extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>>
    implements SessionStore<K, AGG> {
        private SessionStoreReadOnlyDecorator(SessionStore<K, AGG> inner) {
            super(inner, null);
        }

        @Override
        public KeyValueIterator<Windowed<K>, AGG> findSessions(K key, long earliestSessionEndTime, long latestSessionStartTime) {
            return ((SessionStore)this.wrapped()).findSessions(key, earliestSessionEndTime, latestSessionStartTime);
        }

        @Override
        public KeyValueIterator<Windowed<K>, AGG> findSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
            return ((SessionStore)this.wrapped()).findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
        }

        @Override
        public void remove(Windowed sessionKey) {
            throw new UnsupportedOperationException("Global store is read only");
        }

        @Override
        public void put(Windowed<K> sessionKey, AGG aggregate) {
            throw new UnsupportedOperationException("Global store is read only");
        }

        @Override
        public AGG fetchSession(K key, long startTime, long endTime) {
            return ((SessionStore)this.wrapped()).fetchSession(key, startTime, endTime);
        }

        @Override
        public KeyValueIterator<Windowed<K>, AGG> fetch(K key) {
            return ((SessionStore)this.wrapped()).fetch(key);
        }

        @Override
        public KeyValueIterator<Windowed<K>, AGG> fetch(K from, K to) {
            return ((SessionStore)this.wrapped()).fetch(from, to);
        }
    }

    private static class WindowStoreReadOnlyDecorator<K, V>
    extends StateStoreReadOnlyDecorator<WindowStore<K, V>>
    implements WindowStore<K, V> {
        private WindowStoreReadOnlyDecorator(WindowStore<K, V> inner) {
            super(inner, null);
        }

        @Override
        public void put(K key, V value) {
            throw new UnsupportedOperationException("Global store is read only");
        }

        @Override
        public void put(K key, V value, long windowStartTimestamp) {
            throw new UnsupportedOperationException("Global store is read only");
        }

        @Override
        public V fetch(K key, long time) {
            return ((WindowStore)this.wrapped()).fetch(key, time);
        }

        @Override
        @Deprecated
        public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
            return ((WindowStore)this.wrapped()).fetch(key, timeFrom, timeTo);
        }

        @Override
        @Deprecated
        public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
            return ((WindowStore)this.wrapped()).fetch(from, to, timeFrom, timeTo);
        }

        @Override
        public KeyValueIterator<Windowed<K>, V> all() {
            return ((WindowStore)this.wrapped()).all();
        }

        @Override
        @Deprecated
        public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
            return ((WindowStore)this.wrapped()).fetchAll(timeFrom, timeTo);
        }
    }

    private static class KeyValueStoreReadOnlyDecorator<K, V>
    extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>>
    implements KeyValueStore<K, V> {
        private KeyValueStoreReadOnlyDecorator(KeyValueStore<K, V> inner) {
            super(inner, null);
        }

        @Override
        public V get(K key) {
            return ((KeyValueStore)this.wrapped()).get(key);
        }

        @Override
        public KeyValueIterator<K, V> range(K from, K to) {
            return ((KeyValueStore)this.wrapped()).range(from, to);
        }

        @Override
        public KeyValueIterator<K, V> all() {
            return ((KeyValueStore)this.wrapped()).all();
        }

        @Override
        public long approximateNumEntries() {
            return ((KeyValueStore)this.wrapped()).approximateNumEntries();
        }

        @Override
        public void put(K key, V value) {
            throw new UnsupportedOperationException("Global store is read only");
        }

        @Override
        public V putIfAbsent(K key, V value) {
            throw new UnsupportedOperationException("Global store is read only");
        }

        @Override
        public void putAll(List entries) {
            throw new UnsupportedOperationException("Global store is read only");
        }

        @Override
        public V delete(K key) {
            throw new UnsupportedOperationException("Global store is read only");
        }
    }

    private static abstract class StateStoreReadOnlyDecorator<T extends StateStore>
    extends WrappedStateStore<T> {
        static final String ERROR_MESSAGE = "Global store is read only";

        private StateStoreReadOnlyDecorator(T inner) {
            super(inner);
        }

        @Override
        public void flush() {
            throw new UnsupportedOperationException(ERROR_MESSAGE);
        }

        @Override
        public void init(ProcessorContext context, StateStore root) {
            throw new UnsupportedOperationException(ERROR_MESSAGE);
        }

        @Override
        public void close() {
            throw new UnsupportedOperationException(ERROR_MESSAGE);
        }

        /* synthetic */ StateStoreReadOnlyDecorator(StateStore x0, 1 x1) {
            this(x0);
        }
    }
}

