/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener;
import org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.easymock.EasyMock;
import org.junit.Test;

public class TimestampedTupleForwarderTest {
    @Test
    public void shouldSetFlushListenerOnWrappedStateStore() {
        this.setFlushListener(true);
        this.setFlushListener(false);
    }

    private void setFlushListener(boolean sendOldValues) {
        WrappedStateStore store = (WrappedStateStore)EasyMock.mock(WrappedStateStore.class);
        TimestampedCacheFlushListener flushListener = (TimestampedCacheFlushListener)EasyMock.mock(TimestampedCacheFlushListener.class);
        EasyMock.expect((Object)store.setFlushListener((CacheFlushListener)flushListener, sendOldValues)).andReturn((Object)false);
        EasyMock.replay((Object[])new Object[]{store});
        new TimestampedTupleForwarder((StateStore)store, null, flushListener, sendOldValues);
        EasyMock.verify((Object[])new Object[]{store});
    }

    @Test
    public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
        this.shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
        this.shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
    }

    private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(boolean sendOldValues) {
        WrappedStateStore store = (WrappedStateStore)EasyMock.mock(WrappedStateStore.class);
        ProcessorContext context = (ProcessorContext)EasyMock.mock(ProcessorContext.class);
        EasyMock.expect((Object)store.setFlushListener(null, sendOldValues)).andReturn((Object)false);
        if (sendOldValues) {
            context.forward((Object)"key1", (Object)new Change((Object)"newValue1", (Object)"oldValue1"));
            context.forward((Object)"key2", (Object)new Change((Object)"newValue2", (Object)"oldValue2"), To.all().withTimestamp(42L));
        } else {
            context.forward((Object)"key1", (Object)new Change((Object)"newValue1", null));
            context.forward((Object)"key2", (Object)new Change((Object)"newValue2", null), To.all().withTimestamp(42L));
        }
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{store, context});
        TimestampedTupleForwarder forwarder = new TimestampedTupleForwarder((StateStore)store, context, null, sendOldValues);
        forwarder.maybeForward((Object)"key1", (Object)"newValue1", (Object)"oldValue1");
        forwarder.maybeForward((Object)"key2", (Object)"newValue2", (Object)"oldValue2", 42L);
        EasyMock.verify((Object[])new Object[]{store, context});
    }

    @Test
    public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
        WrappedStateStore store = (WrappedStateStore)EasyMock.mock(WrappedStateStore.class);
        ProcessorContext context = (ProcessorContext)EasyMock.mock(ProcessorContext.class);
        EasyMock.expect((Object)store.setFlushListener(null, false)).andReturn((Object)true);
        EasyMock.replay((Object[])new Object[]{store, context});
        TimestampedTupleForwarder forwarder = new TimestampedTupleForwarder((StateStore)store, context, null, false);
        forwarder.maybeForward((Object)"key", (Object)"newValue", (Object)"oldValue");
        forwarder.maybeForward((Object)"key", (Object)"newValue", (Object)"oldValue", 42L);
        EasyMock.verify((Object[])new Object[]{store, context});
    }
}

