/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ProcessorTopologyTest {
    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
    private static final String INPUT_TOPIC_1 = "input-topic-1";
    private static final String INPUT_TOPIC_2 = "input-topic-2";
    private static final String OUTPUT_TOPIC_1 = "output-topic-1";
    private static final String OUTPUT_TOPIC_2 = "output-topic-2";
    private static final String THROUGH_TOPIC_1 = "through-topic-1";
    private static long timestamp = 1000L;
    private final TopologyBuilder builder = new TopologyBuilder();
    private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
    private ProcessorTopologyTestDriver driver;
    private StreamsConfig config;

    @Before
    public void setup() {
        File localState = TestUtils.tempDirectory();
        Properties props = new Properties();
        props.setProperty("application.id", "processor-topology-test");
        props.setProperty("bootstrap.servers", "localhost:9091");
        props.setProperty("state.dir", localState.getAbsolutePath());
        props.setProperty("default.key.serde", Serdes.String().getClass().getName());
        props.setProperty("default.value.serde", Serdes.String().getClass().getName());
        props.setProperty("default.timestamp.extractor", CustomTimestampExtractor.class.getName());
        this.config = new StreamsConfig((Map)props);
    }

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Test
    public void testTopologyMetadata() {
        this.builder.setApplicationId("X");
        this.builder.addSource("source-1", new String[]{"topic-1"});
        this.builder.addSource("source-2", new String[]{"topic-2", "topic-3"});
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        this.builder.addSink("sink-1", "topic-3", new String[]{"processor-1"});
        this.builder.addSink("sink-2", "topic-4", new String[]{"processor-1", "processor-2"});
        ProcessorTopology topology = this.builder.build(null);
        Assert.assertEquals((long)6L, (long)topology.processors().size());
        Assert.assertEquals((long)2L, (long)topology.sources().size());
        Assert.assertEquals((long)3L, (long)topology.sourceTopics().size());
        Assert.assertNotNull((Object)topology.source("topic-1"));
        Assert.assertNotNull((Object)topology.source("topic-2"));
        Assert.assertNotNull((Object)topology.source("topic-3"));
        Assert.assertEquals((Object)topology.source("topic-2"), (Object)topology.source("topic-3"));
    }

    @Test
    public void testDrivingSimpleTopology() {
        int partition = 10;
        this.driver = new ProcessorTopologyTestDriver(this.config, this.createSimpleTopology(partition));
        this.driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
        this.assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition);
        this.assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", partition);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5", partition);
    }

    @Test
    public void testDrivingMultiplexingTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, this.createMultiplexingTopology());
        this.driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
        this.driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
        this.driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
    }

    @Test
    public void testDrivingMultiplexByNameTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, this.createMultiplexByNameTopology());
        this.driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
        this.driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
        this.driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
    }

    @Test
    public void testDrivingStatefulTopology() {
        String storeName = "entries";
        this.driver = new ProcessorTopologyTestDriver(this.config, this.createStatefulTopology(storeName));
        this.driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNoOutputRecord(OUTPUT_TOPIC_1);
        KeyValueStore store = this.driver.getKeyValueStore("entries");
        Assert.assertEquals((Object)"value4", (Object)store.get((Object)"key1"));
        Assert.assertEquals((Object)"value2", (Object)store.get((Object)"key2"));
        Assert.assertEquals((Object)"value3", (Object)store.get((Object)"key3"));
        Assert.assertNull((Object)store.get((Object)"key4"));
    }

    @Test
    public void shouldDriveGlobalStore() throws Exception {
        StateStoreSupplier storeSupplier = Stores.create((String)"my-store").withStringKeys().withStringValues().inMemory().disableLogging().build();
        String global = "global";
        String topic = "topic";
        TopologyBuilder topologyBuilder = this.builder.addGlobalStore(storeSupplier, "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor", this.define((Processor)new StatefulProcessor("my-store")));
        this.driver = new ProcessorTopologyTestDriver(this.config, topologyBuilder);
        KeyValueStore globalStore = (KeyValueStore)topologyBuilder.globalStateStores().get("my-store");
        this.driver.process("topic", "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process("topic", "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        Assert.assertEquals((Object)"value1", (Object)globalStore.get((Object)"key1"));
        Assert.assertEquals((Object)"value2", (Object)globalStore.get((Object)"key2"));
    }

    @Test
    public void testDrivingSimpleMultiSourceTopology() {
        int partition = 10;
        this.driver = new ProcessorTopologyTestDriver(this.config, this.createSimpleMultiSourceTopology(partition));
        this.driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
        this.assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.driver.process(INPUT_TOPIC_2, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition);
        this.assertNoOutputRecord(OUTPUT_TOPIC_1);
    }

    @Test
    public void testDrivingForwardToSourceTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, this.createForwardToSourceTopology());
        this.driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2");
        this.assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3");
    }

    @Test
    public void testDrivingInternalRepartitioningTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, this.createInternalRepartitioningTopology());
        this.driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
        this.assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
    }

    @Test
    public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, this.createInternalRepartitioningWithValueTimestampTopology());
        this.driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC_1, "key3", "value3@3000", STRING_SERIALIZER, STRING_SERIALIZER);
        MatcherAssert.assertThat(this.driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), (Matcher)CoreMatchers.equalTo((Object)new ProducerRecord(OUTPUT_TOPIC_1, null, Long.valueOf(1000L), (Object)"key1", (Object)"value1")));
        MatcherAssert.assertThat(this.driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), (Matcher)CoreMatchers.equalTo((Object)new ProducerRecord(OUTPUT_TOPIC_1, null, Long.valueOf(2000L), (Object)"key2", (Object)"value2")));
        MatcherAssert.assertThat(this.driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), (Matcher)CoreMatchers.equalTo((Object)new ProducerRecord(OUTPUT_TOPIC_1, null, Long.valueOf(3000L), (Object)"key3", (Object)"value3")));
    }

    @Test
    public void shouldCreateStringWithSourceAndTopics() throws Exception {
        this.builder.addSource("source", new String[]{"topic1", "topic2"});
        ProcessorTopology topology = this.builder.build(null);
        String result = topology.toString();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
    }

    @Test
    public void shouldCreateStringWithMultipleSourcesAndTopics() throws Exception {
        this.builder.addSource("source", new String[]{"topic1", "topic2"});
        this.builder.addSource("source2", new String[]{"t", "t1", "t2"});
        ProcessorTopology topology = this.builder.build(null);
        String result = topology.toString();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"source2:\n\t\ttopics:\t\t[t, t1, t2]\n"));
    }

    @Test
    public void shouldCreateStringWithProcessors() throws Exception {
        this.builder.addSource("source", new String[]{"t"}).addProcessor("processor", (ProcessorSupplier)this.mockProcessorSupplier, new String[]{"source"}).addProcessor("other", (ProcessorSupplier)this.mockProcessorSupplier, new String[]{"source"});
        ProcessorTopology topology = this.builder.build(null);
        String result = topology.toString();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"\t\tchildren:\t[processor, other]"));
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"processor:\n"));
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"other:\n"));
    }

    @Test
    public void shouldRecursivelyPrintChildren() throws Exception {
        this.builder.addSource("source", new String[]{"t"}).addProcessor("processor", (ProcessorSupplier)this.mockProcessorSupplier, new String[]{"source"}).addProcessor("child-one", (ProcessorSupplier)this.mockProcessorSupplier, new String[]{"processor"}).addProcessor("child-one-one", (ProcessorSupplier)this.mockProcessorSupplier, new String[]{"child-one"}).addProcessor("child-two", (ProcessorSupplier)this.mockProcessorSupplier, new String[]{"processor"}).addProcessor("child-two-one", (ProcessorSupplier)this.mockProcessorSupplier, new String[]{"child-two"});
        String result = this.builder.build(null).toString();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"child-one:\n\t\tchildren:\t[child-one-one]"));
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"child-two:\n\t\tchildren:\t[child-two-one]"));
    }

    private void assertNextOutputRecord(String topic, String key, String value) {
        ProducerRecord<String, String> record = this.driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
        Assert.assertEquals((Object)topic, (Object)record.topic());
        Assert.assertEquals((Object)key, (Object)record.key());
        Assert.assertEquals((Object)value, (Object)record.value());
        Assert.assertNull((Object)record.partition());
    }

    private void assertNextOutputRecord(String topic, String key, String value, Integer partition) {
        ProducerRecord<String, String> record = this.driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
        Assert.assertEquals((Object)topic, (Object)record.topic());
        Assert.assertEquals((Object)key, (Object)record.key());
        Assert.assertEquals((Object)value, (Object)record.value());
        Assert.assertEquals((Object)partition, (Object)record.partition());
    }

    private void assertNoOutputRecord(String topic) {
        Assert.assertNull(this.driver.readOutput(topic));
    }

    private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
        return new StreamPartitioner<Object, Object>(){

            public Integer partition(Object key, Object value, int numPartitions) {
                return partition;
            }
        };
    }

    private TopologyBuilder createSimpleTopology(int partition) {
        return this.builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", this.define((Processor)new ForwardingProcessor()), new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, this.constantPartitioner(partition), new String[]{"processor"});
    }

    private TopologyBuilder createMultiplexingTopology() {
        return this.builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", this.define((Processor)new MultiplexingProcessor(2)), new String[]{"source"}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"processor"}).addSink("sink2", OUTPUT_TOPIC_2, new String[]{"processor"});
    }

    private TopologyBuilder createMultiplexByNameTopology() {
        return this.builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", this.define((Processor)new MultiplexByNameProcessor(2)), new String[]{"source"}).addSink("sink0", OUTPUT_TOPIC_1, new String[]{"processor"}).addSink("sink1", OUTPUT_TOPIC_2, new String[]{"processor"});
    }

    private TopologyBuilder createStatefulTopology(String storeName) {
        return this.builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", this.define((Processor)new StatefulProcessor(storeName)), new String[]{"source"}).addStateStore(Stores.create((String)storeName).withStringKeys().withStringValues().inMemory().build(), new String[]{"processor"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor"});
    }

    private TopologyBuilder createInternalRepartitioningTopology() {
        return this.builder.addSource("source", new String[]{INPUT_TOPIC_1}).addInternalTopic(THROUGH_TOPIC_1).addSink("sink0", THROUGH_TOPIC_1, new String[]{"source"}).addSource("source1", new String[]{THROUGH_TOPIC_1}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"source1"});
    }

    private TopologyBuilder createInternalRepartitioningWithValueTimestampTopology() {
        return this.builder.addSource("source", new String[]{INPUT_TOPIC_1}).addInternalTopic(THROUGH_TOPIC_1).addProcessor("processor", this.define((Processor)new ValueTimestampProcessor()), new String[]{"source"}).addSink("sink0", THROUGH_TOPIC_1, new String[]{"processor"}).addSource("source1", new String[]{THROUGH_TOPIC_1}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"source1"});
    }

    private TopologyBuilder createForwardToSourceTopology() {
        return this.builder.addSource("source-1", new String[]{INPUT_TOPIC_1}).addSink("sink-1", OUTPUT_TOPIC_1, new String[]{"source-1"}).addSource("source-2", new String[]{OUTPUT_TOPIC_1}).addSink("sink-2", OUTPUT_TOPIC_2, new String[]{"source-2"});
    }

    private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
        return this.builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor-1", this.define((Processor)new ForwardingProcessor()), new String[]{"source-1"}).addSink("sink-1", OUTPUT_TOPIC_1, this.constantPartitioner(partition), new String[]{"processor-1"}).addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_2}).addProcessor("processor-2", this.define((Processor)new ForwardingProcessor()), new String[]{"source-2"}).addSink("sink-2", OUTPUT_TOPIC_2, this.constantPartitioner(partition), new String[]{"processor-2"});
    }

    private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
        return new ProcessorSupplier<K, V>(){

            public Processor<K, V> get() {
                return processor;
            }
        };
    }

    public static class CustomTimestampExtractor
    implements TimestampExtractor {
        public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
            if (record.value().toString().matches(".*@[0-9]+")) {
                return Long.parseLong(record.value().toString().split("@")[1]);
            }
            if (record.timestamp() > 0L) {
                return record.timestamp();
            }
            return timestamp;
        }
    }

    protected static class StatefulProcessor
    extends AbstractProcessor<String, String> {
        private KeyValueStore<String, String> store;
        private final String storeName;

        public StatefulProcessor(String storeName) {
            this.storeName = storeName;
        }

        public void init(ProcessorContext context) {
            super.init(context);
            this.store = (KeyValueStore)context.getStateStore(this.storeName);
        }

        public void process(String key, String value) {
            this.store.put((Object)key, (Object)value);
        }

        public void punctuate(long streamTime) {
            int count = 0;
            try (KeyValueIterator iter = this.store.all();){
                while (iter.hasNext()) {
                    iter.next();
                    ++count;
                }
            }
            this.context().forward((Object)Long.toString(streamTime), (Object)count);
        }

        public void close() {
            this.store.close();
        }
    }

    protected static class MultiplexByNameProcessor
    extends AbstractProcessor<String, String> {
        private final int numChildren;

        public MultiplexByNameProcessor(int numChildren) {
            this.numChildren = numChildren;
        }

        public void process(String key, String value) {
            for (int i = 0; i != this.numChildren; ++i) {
                this.context().forward((Object)key, (Object)(value + "(" + (i + 1) + ")"), "sink" + i);
            }
        }

        public void punctuate(long streamTime) {
            for (int i = 0; i != this.numChildren; ++i) {
                this.context().forward((Object)Long.toString(streamTime), (Object)("punctuate(" + (i + 1) + ")"), "sink" + i);
            }
        }
    }

    protected static class MultiplexingProcessor
    extends AbstractProcessor<String, String> {
        private final int numChildren;

        public MultiplexingProcessor(int numChildren) {
            this.numChildren = numChildren;
        }

        public void process(String key, String value) {
            for (int i = 0; i != this.numChildren; ++i) {
                this.context().forward((Object)key, (Object)(value + "(" + (i + 1) + ")"), i);
            }
        }

        public void punctuate(long streamTime) {
            for (int i = 0; i != this.numChildren; ++i) {
                this.context().forward((Object)Long.toString(streamTime), (Object)("punctuate(" + (i + 1) + ")"), i);
            }
        }
    }

    protected static class ValueTimestampProcessor
    extends AbstractProcessor<String, String> {
        protected ValueTimestampProcessor() {
        }

        public void process(String key, String value) {
            this.context().forward((Object)key, (Object)value.split("@")[0]);
        }
    }

    protected static class ForwardingProcessor
    extends AbstractProcessor<String, String> {
        protected ForwardingProcessor() {
        }

        public void process(String key, String value) {
            this.context().forward((Object)key, (Object)value);
        }

        public void punctuate(long streamTime) {
            this.context().forward((Object)Long.toString(streamTime), (Object)"punctuate");
        }
    }
}

