/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;

public class WriteConsistencyVectorTest {
    private ProcessorContextImpl context;
    private final StreamsConfig streamsConfig = this.streamsConfigMock();
    private final RecordCollector recordCollector = (RecordCollector)EasyMock.mock(RecordCollector.class);
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap((byte[])"key".getBytes());
    private static final long VALUE = 42L;
    private static final byte[] VALUE_BYTES = String.valueOf(42L).getBytes();
    private static final long TIMESTAMP = 21L;
    private static final long STREAM_TIME = 50L;
    private static final String REGISTERED_STORE_NAME = "registered-store";
    private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1);
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final Integer INPUT_PARTITION = 0;
    private static final Long INPUT_OFFSET = 100L;

    @Before
    public void setup() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.mock(ProcessorStateManager.class);
        EasyMock.expect((Object)stateManager.taskType()).andStubReturn((Object)Task.TaskType.ACTIVE);
        EasyMock.expect((Object)stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).andStubReturn((Object)CHANGELOG_PARTITION);
        EasyMock.replay((Object[])new Object[]{stateManager});
        this.context = new ProcessorContextImpl((TaskId)EasyMock.mock(TaskId.class), this.streamsConfig, stateManager, (StreamsMetricsImpl)EasyMock.mock(StreamsMetricsImpl.class), (ThreadCache)EasyMock.mock(ThreadCache.class));
        StreamTask task = (StreamTask)EasyMock.mock(StreamTask.class);
        EasyMock.expect((Object)task.streamTime()).andReturn((Object)50L);
        EasyMock.expect((Object)task.recordCollector()).andStubReturn((Object)this.recordCollector);
        EasyMock.replay((Object[])new Object[]{task});
        this.context.transitionToActive(task, null, null);
        this.context.setCurrentNode(new ProcessorNode("fake", (Processor)null, new HashSet<String>(Arrays.asList("LocalKeyValueStore", "LocalTimestampedKeyValueStore", "LocalWindowStore", "LocalTimestampedWindowStore", "LocalSessionStore"))));
    }

    @Test
    public void shouldSendConsistencyVectorToChangelogTopic() {
        Position position = Position.emptyPosition();
        position.withComponent(INPUT_TOPIC_NAME, INPUT_PARTITION.intValue(), INPUT_OFFSET.longValue());
        this.context.setRecordContext(new ProcessorRecordContext(-1L, INPUT_OFFSET.longValue(), INPUT_PARTITION.intValue(), INPUT_TOPIC_NAME, (Headers)new RecordHeaders()));
        RecordHeaders headers = new RecordHeaders();
        headers.add((Header)ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        headers.add((Header)new RecordHeader("c", PositionSerde.serialize((Position)position).array()));
        this.recordCollector.send(CHANGELOG_PARTITION.topic(), (Object)KEY_BYTES, (Object)VALUE_BYTES, (Headers)headers, Integer.valueOf(CHANGELOG_PARTITION.partition()), Long.valueOf(21L), (Serializer)InternalProcessorContext.BYTES_KEY_SERIALIZER, (Serializer)InternalProcessorContext.BYTEARRAY_VALUE_SERIALIZER);
        StreamTask task = (StreamTask)EasyMock.createNiceMock(StreamTask.class);
        EasyMock.replay((Object[])new Object[]{this.recordCollector, task});
        this.context.transitionToActive(task, this.recordCollector, null);
        this.context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, 21L, position);
        EasyMock.verify((Object[])new Object[]{this.recordCollector});
    }

    private StreamsConfig streamsConfigMock() {
        StreamsConfig streamsConfig = (StreamsConfig)EasyMock.mock(StreamsConfig.class);
        HashMap<String, Boolean> myValues = new HashMap<String, Boolean>();
        myValues.put("__iq.consistency.offset.vector.enabled__", true);
        EasyMock.expect((Object)streamsConfig.originals()).andStubReturn(myValues);
        EasyMock.expect((Object)streamsConfig.values()).andStubReturn(Collections.emptyMap());
        EasyMock.expect((Object)streamsConfig.getString("application.id")).andStubReturn((Object)"add-id");
        EasyMock.expect((Object)streamsConfig.defaultValueSerde()).andStubReturn((Object)Serdes.ByteArray());
        EasyMock.expect((Object)streamsConfig.defaultKeySerde()).andStubReturn((Object)Serdes.ByteArray());
        EasyMock.replay((Object[])new Object[]{streamsConfig});
        return streamsConfig;
    }
}

