/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.StoreChangeLogger;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.Assert;
import org.junit.Test;

public class StoreChangeLoggerTest {
    private final String topic = "topic";
    private final Map<Integer, ValueAndTimestamp<String>> logged = new HashMap<Integer, ValueAndTimestamp<String>>();
    private final Map<Integer, Headers> loggedHeaders = new HashMap<Integer, Headers>();
    private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes((String)"topic", Integer.class, String.class), (RecordCollector)new RecordCollectorImpl("StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")){

        public <K1, V1> void send(String topic, K1 key, V1 value, Headers headers, Integer partition, Long timestamp, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
            StoreChangeLoggerTest.this.logged.put((Integer)key, ValueAndTimestamp.make((Object)((String)value), (long)timestamp));
            StoreChangeLoggerTest.this.loggedHeaders.put((Integer)key, headers);
        }

        public <K1, V1> void send(String topic, K1 key, V1 value, Headers headers, Long timestamp, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, StreamPartitioner<? super K1, ? super V1> partitioner) {
            throw new UnsupportedOperationException();
        }
    });
    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger("topic", (ProcessorContext)this.context, StateSerdes.withBuiltinTypes((String)"topic", Integer.class, String.class));

    @Test
    public void testAddRemove() {
        this.context.setTime(1L);
        this.changeLogger.logChange((Object)0, (Object)"zero");
        this.context.setTime(5L);
        this.changeLogger.logChange((Object)1, (Object)"one");
        this.changeLogger.logChange((Object)2, (Object)"two");
        this.changeLogger.logChange((Object)3, (Object)"three", 42L);
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"zero", (long)1L), this.logged.get(0));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"one", (long)5L), this.logged.get(1));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"two", (long)5L), this.logged.get(2));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"three", (long)42L), this.logged.get(3));
        this.changeLogger.logChange((Object)0, null);
        Assert.assertNull(this.logged.get(0));
    }

    @Test
    public void shouldNotSendRecordHeadersToChangelogTopic() {
        this.context.headers().add((Header)new RecordHeader("key", "value".getBytes()));
        this.changeLogger.logChange((Object)0, (Object)"zero");
        this.changeLogger.logChange((Object)0, (Object)"zero", 42L);
        Assert.assertNull((Object)this.loggedHeaders.get(0));
    }
}

