/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.SingletonNoOpValueTransformer;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=EasyMockRunner.class)
public class KTableTransformValuesTest {
    private static final String QUERYABLE_NAME = "queryable-store";
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String STORE_NAME = "someStore";
    private static final String OTHER_STORE_NAME = "otherStore";
    private static final Consumed<String, String> CONSUMED = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    private TopologyTestDriver driver;
    private MockProcessorSupplier<String, String> capture;
    private StreamsBuilder builder;
    @Mock(value=MockType.NICE)
    private KTableImpl<String, String, String> parent;
    @Mock(value=MockType.NICE)
    private InternalProcessorContext context;
    @Mock(value=MockType.NICE)
    private KTableValueGetterSupplier<String, String> parentGetterSupplier;
    @Mock(value=MockType.NICE)
    private KTableValueGetter<String, String> parentGetter;
    @Mock(value=MockType.NICE)
    private KeyValueStore<String, String> stateStore;
    @Mock(value=MockType.NICE)
    private ValueTransformerWithKeySupplier<String, String, String> mockSupplier;
    @Mock(value=MockType.NICE)
    private ValueTransformerWithKey<String, String, String> transformer;

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
            this.driver = null;
        }
    }

    @Before
    public void setUp() {
        this.capture = new MockProcessorSupplier();
        this.builder = new StreamsBuilder();
    }

    @Test
    public void shouldThrowOnGetIfSupplierReturnsNull() {
        KTableTransformValues transformer = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new NullSupplier(), QUERYABLE_NAME);
        try {
            transformer.get();
            Assert.fail((String)"NPE expected");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowOnViewGetIfSupplierReturnsNull() {
        KTableValueGetterSupplier view = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new NullSupplier(), null).view();
        try {
            view.get();
            Assert.fail((String)"NPE expected");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
        SingletonNoOpValueTransformer transformer = new SingletonNoOpValueTransformer();
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, transformer, null);
        Processor processor = transformValues.get();
        processor.init((ProcessorContext)this.context);
        MatcherAssert.assertThat((Object)transformer.context, (Matcher)CoreMatchers.isA(ForwardingDisabledProcessorContext.class));
    }

    @Test
    public void shouldNotSendOldValuesByDefault() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), null);
        Processor processor = transformValues.get();
        processor.init((ProcessorContext)this.context);
        this.context.forward((Object)"Key", (Object)new Change((Object)"Key->newValue!", null));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.context});
        processor.process((Object)"Key", (Object)new Change((Object)"newValue", (Object)"oldValue"));
        EasyMock.verify((Object[])new Object[]{this.context});
    }

    @Test
    public void shouldSendOldValuesIfConfigured() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), null);
        transformValues.enableSendingOldValues();
        Processor processor = transformValues.get();
        processor.init((ProcessorContext)this.context);
        this.context.forward((Object)"Key", (Object)new Change((Object)"Key->newValue!", (Object)"Key->oldValue!"));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.context});
        processor.process((Object)"Key", (Object)new Change((Object)"newValue", (Object)"oldValue"));
        EasyMock.verify((Object[])new Object[]{this.context});
    }

    @Test
    public void shouldSetSendOldValuesOnParent() {
        this.parent.enableSendingOldValues();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.parent});
        new KTableTransformValues(this.parent, new SingletonNoOpValueTransformer(), QUERYABLE_NAME).enableSendingOldValues();
        EasyMock.verify((Object[])new Object[]{this.parent});
    }

    @Test
    public void shouldTransformOnGetIfNotMaterialized() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), null);
        EasyMock.expect((Object)this.parent.valueGetterSupplier()).andReturn(this.parentGetterSupplier);
        EasyMock.expect((Object)this.parentGetterSupplier.get()).andReturn(this.parentGetter);
        EasyMock.expect((Object)this.parentGetter.get((Object)"Key")).andReturn((Object)"Value");
        EasyMock.replay((Object[])new Object[]{this.parent, this.parentGetterSupplier, this.parentGetter});
        KTableValueGetter getter = transformValues.view().get();
        getter.init((ProcessorContext)this.context);
        String result = (String)getter.get((Object)"Key");
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.is((Object)"Key->Value!"));
    }

    @Test
    public void shouldGetFromStateStoreIfMaterialized() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), QUERYABLE_NAME);
        EasyMock.expect((Object)this.context.getStateStore(QUERYABLE_NAME)).andReturn(this.stateStore);
        EasyMock.expect((Object)this.stateStore.get((Object)"Key")).andReturn((Object)"something");
        EasyMock.replay((Object[])new Object[]{this.context, this.stateStore});
        KTableValueGetter getter = transformValues.view().get();
        getter.init((ProcessorContext)this.context);
        String result = (String)getter.get((Object)"Key");
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.is((Object)"something"));
    }

    @Test
    public void shouldGetStoreNamesFromParentIfNotMaterialized() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), null);
        EasyMock.expect((Object)this.parent.valueGetterSupplier()).andReturn(this.parentGetterSupplier);
        EasyMock.expect((Object)this.parentGetterSupplier.storeNames()).andReturn((Object)new String[]{"store1", "store2"});
        EasyMock.replay((Object[])new Object[]{this.parent, this.parentGetterSupplier});
        String[] storeNames = transformValues.view().storeNames();
        MatcherAssert.assertThat((Object)storeNames, (Matcher)CoreMatchers.is((Object)new String[]{"store1", "store2"}));
    }

    @Test
    public void shouldGetQueryableStoreNameIfMaterialized() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), QUERYABLE_NAME);
        String[] storeNames = transformValues.view().storeNames();
        MatcherAssert.assertThat((Object)storeNames, (Matcher)CoreMatchers.is((Object)new String[]{QUERYABLE_NAME}));
    }

    @Test
    public void shouldCloseTransformerOnProcessorClose() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, this.mockSupplier, null);
        EasyMock.expect((Object)this.mockSupplier.get()).andReturn(this.transformer);
        this.transformer.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockSupplier, this.transformer});
        Processor processor = transformValues.get();
        processor.close();
        EasyMock.verify((Object[])new Object[]{this.transformer});
    }

    @Test
    public void shouldCloseTransformerOnGetterClose() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, this.mockSupplier, null);
        EasyMock.expect((Object)this.mockSupplier.get()).andReturn(this.transformer);
        EasyMock.expect((Object)this.parentGetterSupplier.get()).andReturn(this.parentGetter);
        EasyMock.expect((Object)this.parent.valueGetterSupplier()).andReturn(this.parentGetterSupplier);
        this.transformer.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockSupplier, this.transformer, this.parent, this.parentGetterSupplier});
        KTableValueGetter getter = transformValues.view().get();
        getter.close();
        EasyMock.verify((Object[])new Object[]{this.transformer});
    }

    @Test
    public void shouldCloseParentGetterClose() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, this.mockSupplier, null);
        EasyMock.expect((Object)this.parent.valueGetterSupplier()).andReturn(this.parentGetterSupplier);
        EasyMock.expect((Object)this.mockSupplier.get()).andReturn(this.transformer);
        EasyMock.expect((Object)this.parentGetterSupplier.get()).andReturn(this.parentGetter);
        this.parentGetter.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockSupplier, this.parent, this.parentGetterSupplier, this.parentGetter});
        KTableValueGetter getter = transformValues.view().get();
        getter.close();
        EasyMock.verify((Object[])new Object[]{this.parentGetter});
    }

    @Test
    public void shouldTransformValuesWithKey() {
        this.builder.addStateStore(KTableTransformValuesTest.storeBuilder(STORE_NAME)).addStateStore(KTableTransformValuesTest.storeBuilder(OTHER_STORE_NAME)).table(INPUT_TOPIC, CONSUMED).transformValues((ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(STORE_NAME, OTHER_STORE_NAME), new String[]{STORE_NAME, OTHER_STORE_NAME}).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), KTableTransformValuesTest.props());
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"A", (Object)"a", 0L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"B", (Object)"b", 0L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"D", (Object)null, 0L));
        MatcherAssert.assertThat(this.output(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"A:A->a!", "B:B->b!", "D:D->null!"}));
        MatcherAssert.assertThat((String)"Store should not be materialized", (Object)this.driver.getKeyValueStore(QUERYABLE_NAME), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
    }

    @Test
    public void shouldTransformValuesWithKeyAndMaterialize() {
        this.builder.addStateStore(KTableTransformValuesTest.storeBuilder(STORE_NAME)).table(INPUT_TOPIC, CONSUMED).transformValues((ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(STORE_NAME, QUERYABLE_NAME), Materialized.as((String)QUERYABLE_NAME).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()), new String[]{STORE_NAME}).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), KTableTransformValuesTest.props());
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"A", (Object)"a", 0L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"B", (Object)"b", 0L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"C", (Object)null, 0L));
        MatcherAssert.assertThat(this.output(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"A:A->a!", "B:B->b!", "C:C->null!"}));
        KeyValueStore keyValueStore = this.driver.getKeyValueStore(QUERYABLE_NAME);
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"A"), (Matcher)CoreMatchers.is((Object)"A->a!"));
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"B"), (Matcher)CoreMatchers.is((Object)"B->b!"));
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"C"), (Matcher)CoreMatchers.is((Object)"C->null!"));
    }

    @Test
    public void shouldCalculateCorrectOldValuesIfMaterializedEvenIfStateful() {
        this.builder.table(INPUT_TOPIC, CONSUMED).transformValues((ValueTransformerWithKeySupplier)new StatefulTransformerSupplier(), Materialized.as((String)QUERYABLE_NAME).withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()), new String[0]).groupBy(KTableTransformValuesTest.toForceSendingOfOldValues(), Serialized.with((Serde)Serdes.String(), (Serde)Serdes.Integer())).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR).mapValues(KTableTransformValuesTest.mapBackToStrings()).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), KTableTransformValuesTest.props());
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"A", (Object)"ignore", 0L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"A", (Object)"ignored", 0L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"A", (Object)"ignored", 0L));
        MatcherAssert.assertThat(this.output(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"A:1", "A:0", "A:2", "A:0", "A:3"}));
        KeyValueStore keyValueStore = this.driver.getKeyValueStore(QUERYABLE_NAME);
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"A"), (Matcher)CoreMatchers.is((Object)3));
    }

    @Test
    public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() {
        this.builder.table(INPUT_TOPIC, CONSUMED).transformValues((ValueTransformerWithKeySupplier)new StatelessTransformerSupplier(), new String[0]).groupBy(KTableTransformValuesTest.toForceSendingOfOldValues(), Serialized.with((Serde)Serdes.String(), (Serde)Serdes.Integer())).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR).mapValues(KTableTransformValuesTest.mapBackToStrings()).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), KTableTransformValuesTest.props());
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"A", (Object)"a", 0L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"A", (Object)"aa", 0L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC, (Object)"A", (Object)"aaa", 0L));
        MatcherAssert.assertThat(this.output(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"A:1", "A:0", "A:2", "A:0", "A:3"}));
    }

    private ArrayList<String> output() {
        return this.capture.capturedProcessors((int)1).get((int)0).processed;
    }

    private static KeyValueMapper<String, Integer, KeyValue<String, Integer>> toForceSendingOfOldValues() {
        return new KeyValueMapper<String, Integer, KeyValue<String, Integer>>(){

            public KeyValue<String, Integer> apply(String key, Integer value) {
                return new KeyValue((Object)key, (Object)value);
            }
        };
    }

    private static ValueMapper<Integer, String> mapBackToStrings() {
        return new ValueMapper<Integer, String>(){

            public String apply(Integer value) {
                return value.toString();
            }
        };
    }

    private static StoreBuilder<KeyValueStore<Long, Long>> storeBuilder(String storeName) {
        return Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)storeName), (Serde)Serdes.Long(), (Serde)Serdes.Long());
    }

    public static Properties props() {
        Properties props = new Properties();
        props.setProperty("application.id", "kstream-transform-values-test");
        props.setProperty("bootstrap.servers", "localhost:9091");
        props.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        props.setProperty("default.key.serde", Serdes.Integer().getClass().getName());
        props.setProperty("default.value.serde", Serdes.Integer().getClass().getName());
        return props;
    }

    private static void throwIfStoresNotAvailable(ProcessorContext context, List<String> expectedStoredNames) {
        ArrayList<String> missing = new ArrayList<String>();
        for (String storedName : expectedStoredNames) {
            if (context.getStateStore(storedName) != null) continue;
            missing.add(storedName);
        }
        if (!missing.isEmpty()) {
            throw new AssertionError((Object)("State stores are not accessible: " + missing));
        }
    }

    private static class StatelessTransformer
    implements ValueTransformerWithKey<String, String, Integer> {
        private StatelessTransformer() {
        }

        public void init(ProcessorContext context) {
        }

        public Integer transform(String readOnlyKey, String value) {
            return value == null ? null : Integer.valueOf(value.length());
        }

        public void close() {
        }
    }

    private static class StatelessTransformerSupplier
    implements ValueTransformerWithKeySupplier<String, String, Integer> {
        private StatelessTransformerSupplier() {
        }

        public ValueTransformerWithKey<String, String, Integer> get() {
            return new StatelessTransformer();
        }
    }

    private static class StatefulTransformer
    implements ValueTransformerWithKey<String, String, Integer> {
        private int counter;

        private StatefulTransformer() {
        }

        public void init(ProcessorContext context) {
        }

        public Integer transform(String readOnlyKey, String value) {
            return ++this.counter;
        }

        public void close() {
        }
    }

    private static class StatefulTransformerSupplier
    implements ValueTransformerWithKeySupplier<String, String, Integer> {
        private StatefulTransformerSupplier() {
        }

        public ValueTransformerWithKey<String, String, Integer> get() {
            return new StatefulTransformer();
        }
    }

    private static class NullSupplier
    implements ValueTransformerWithKeySupplier<String, String, String> {
        private NullSupplier() {
        }

        public ValueTransformerWithKey<String, String, String> get() {
            return null;
        }
    }

    public static class ExclamationValueTransformer
    implements ValueTransformerWithKey<Object, String, String> {
        private final List<String> expectedStoredNames;

        ExclamationValueTransformer(List<String> expectedStoredNames) {
            this.expectedStoredNames = expectedStoredNames;
        }

        public void init(ProcessorContext context) {
            KTableTransformValuesTest.throwIfStoresNotAvailable(context, this.expectedStoredNames);
        }

        public String transform(Object readOnlyKey, String value) {
            return readOnlyKey.toString() + "->" + value + "!";
        }

        public void close() {
        }
    }

    public static class ExclamationValueTransformerSupplier
    implements ValueTransformerWithKeySupplier<Object, String, String> {
        private final List<String> expectedStoredNames;

        ExclamationValueTransformerSupplier(String ... expectedStoreNames) {
            this.expectedStoredNames = Arrays.asList(expectedStoreNames);
        }

        public ExclamationValueTransformer get() {
            return new ExclamationValueTransformer(this.expectedStoredNames);
        }
    }
}

