/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener;
import org.apache.kafka.streams.kstream.internals.SessionTupleForwarder;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.easymock.EasyMock;
import org.junit.Test;

public class SessionTupleForwarderTest {
    @Test
    public void shouldSetFlushListenerOnWrappedStateStore() {
        this.setFlushListener(true);
        this.setFlushListener(false);
    }

    private void setFlushListener(boolean sendOldValues) {
        WrappedStateStore store = (WrappedStateStore)EasyMock.mock(WrappedStateStore.class);
        SessionCacheFlushListener flushListener = (SessionCacheFlushListener)EasyMock.mock(SessionCacheFlushListener.class);
        EasyMock.expect((Object)store.setFlushListener((CacheFlushListener)flushListener, sendOldValues)).andReturn((Object)false);
        EasyMock.replay((Object[])new Object[]{store});
        new SessionTupleForwarder((StateStore)store, null, (CacheFlushListener)flushListener, sendOldValues);
        EasyMock.verify((Object[])new Object[]{store});
    }

    @Test
    public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
        this.shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
        this.shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
    }

    private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(boolean sendOldValued) {
        WrappedStateStore store = (WrappedStateStore)EasyMock.mock(WrappedStateStore.class);
        ProcessorContext context = (ProcessorContext)EasyMock.mock(ProcessorContext.class);
        EasyMock.expect((Object)store.setFlushListener(null, sendOldValued)).andReturn((Object)false);
        if (sendOldValued) {
            context.forward((Object)new Windowed((Object)"key", (Window)new SessionWindow(21L, 42L)), (Object)new Change((Object)"value", (Object)"oldValue"), To.all().withTimestamp(42L));
        } else {
            context.forward((Object)new Windowed((Object)"key", (Window)new SessionWindow(21L, 42L)), (Object)new Change((Object)"value", null), To.all().withTimestamp(42L));
        }
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{store, context});
        new SessionTupleForwarder((StateStore)store, context, null, sendOldValued).maybeForward(new Windowed((Object)"key", (Window)new SessionWindow(21L, 42L)), (Object)"value", (Object)"oldValue");
        EasyMock.verify((Object[])new Object[]{store, context});
    }

    @Test
    public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
        WrappedStateStore store = (WrappedStateStore)EasyMock.mock(WrappedStateStore.class);
        ProcessorContext context = (ProcessorContext)EasyMock.mock(ProcessorContext.class);
        EasyMock.expect((Object)store.setFlushListener(null, false)).andReturn((Object)true);
        EasyMock.replay((Object[])new Object[]{store, context});
        new SessionTupleForwarder((StateStore)store, context, null, false).maybeForward(new Windowed((Object)"key", (Window)new SessionWindow(21L, 42L)), (Object)"value", (Object)"oldValue");
        EasyMock.verify((Object[])new Object[]{store, context});
    }
}

