/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

public class KStreamNewProcessorApiTest {
    @Test
    @DisplayName(value="Should attach the state store using ConnectedStoreProvider")
    void shouldGetStateStoreWithConnectedStoreProvider() {
        this.runTest(false);
    }

    @Test
    @DisplayName(value="Should attach the state store StreamBuilder.addStateStore")
    void shouldGetStateStoreWithStreamBuilder() {
        this.runTest(true);
    }

    private void runTest(boolean shouldAddStoreDirectly) {
        StreamsBuilder builder = new StreamsBuilder();
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store"), (Serde)Serdes.String(), (Serde)Serdes.String());
        if (shouldAddStoreDirectly) {
            builder.addStateStore(storeBuilder);
        }
        builder.stream("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).processValues((FixedKeyProcessorSupplier)new TransformerSupplier(shouldAddStoreDirectly ? null : storeBuilder), new String[]{"store"}).to("output", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        List<KeyValue> words = Arrays.asList(KeyValue.pair((Object)"a", (Object)"foo"), KeyValue.pair((Object)"b", (Object)"bar"), KeyValue.pair((Object)"c", (Object)"baz"));
        try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build());){
            TestInputTopic testDriverInputTopic = testDriver.createInputTopic("input", Serdes.String().serializer(), Serdes.String().serializer());
            words.forEach(clk -> testDriverInputTopic.pipeInput(clk.key, clk.value));
            List<String> expectedOutput = Arrays.asList("fooUpdated", "barUpdated", "bazUpdated");
            Deserializer keyDeserializer = Serdes.String().deserializer();
            ArrayList actualOutput = new ArrayList(testDriver.createOutputTopic("output", keyDeserializer, Serdes.String().deserializer()).readValuesToList());
            KeyValueStore stateStore = testDriver.getKeyValueStore("store");
            Assertions.assertEquals(expectedOutput, actualOutput);
            Assertions.assertEquals((Object)stateStore.get((Object)"a"), (Object)"fooUpdated");
            Assertions.assertEquals((Object)stateStore.get((Object)"b"), (Object)"barUpdated");
            Assertions.assertEquals((Object)stateStore.get((Object)"c"), (Object)"bazUpdated");
        }
    }

    private static class TransformerSupplier
    implements FixedKeyProcessorSupplier<String, String, String> {
        private final StoreBuilder<?> storeBuilder;

        public TransformerSupplier(StoreBuilder<?> storeBuilder) {
            this.storeBuilder = storeBuilder;
        }

        public ContextualFixedKeyProcessor<String, String, String> get() {
            return new ContextualFixedKeyProcessor<String, String, String>(){
                KeyValueStore<String, String> store;
                FixedKeyProcessorContext<String, String> context;

                public void init(FixedKeyProcessorContext<String, String> context) {
                    super.init(context);
                    this.store = (KeyValueStore)context.getStateStore("store");
                    Objects.requireNonNull(this.store, "State store can't be null");
                    this.context = context;
                }

                public void process(FixedKeyRecord<String, String> record) {
                    this.store.putIfAbsent(record.key(), (Object)((String)record.value() + "Updated"));
                    this.context().forward(record.withValue((Object)((String)record.value() + "Updated")));
                }

                public void close() {
                }
            };
        }

        public Set<StoreBuilder<?>> stores() {
            if (this.storeBuilder != null) {
                return Collections.singleton(this.storeBuilder);
            }
            return null;
        }
    }
}

