/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class BatchExecutionStateBackendTest
extends TestLogger {
    @Rule
    public final ExpectedException expectedException = ExpectedException.none();

    private <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) {
        return new BatchExecutionKeyedStateBackend(keySerializer, new KeyGroupRange(0, 9), new ExecutionConfig());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddNull() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.add(null);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddAllNullEntries() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            state.addAll(adding);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddAllNull() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.addAll(null);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateUpdateNullEntries() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            state.update(adding);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateUpdateNull() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.update(null);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    @Test
    public void testListStateAPIs() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            Assert.assertNull((Object)state.get());
            state.addAll(Collections.emptyList());
            Assert.assertNull((Object)state.get());
            state.addAll(Arrays.asList(3L, 4L));
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            state.addAll(new ArrayList());
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            state.addAll(Arrays.asList(5L, 6L));
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            state.addAll(new ArrayList());
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            state.update(Arrays.asList(1L, 2L));
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{1L, 2L}));
        }
    }

    @Test
    public void testListStateMergingOverThreeNamespaces() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testListStateMergingWithEmptyNamespace() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testListStateMergingEmpty() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testListStateMergingAllInTargetNamespace() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testListStateMergingInASingleNamespace() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testReducingStateAddAndGet() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            ReducingState state = (ReducingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
        }
    }

    @Test
    public void testReducingStateMergingOverThreeNamespaces() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testReducingStateMergingWithEmpty() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testReducingStateMergingEmpty() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testReducingStateMergingInTargetNamespace() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testReducingStateMergingInASingleNamespace() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithMutableAccumulatorOverThreeNamespaces() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithMutableAccumulatorWithEmpty() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithMutableAccumulatorEmpty() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithMutableAccumulatorInTargetNamespace() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithMutableAccumulatorInASingleNamespace() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithImmutableAccumulatorOverThreeNamespaces() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithImmutableAccumulatorWithEmpty() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithImmutableAccumulatorEmpty() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithImmutableAccumulatorInTargetNamespace() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    @Test
    public void testAggregatingStateMergingWithImmutableAccumulatorInASingleNamespace() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertNull((Object)state.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateIsEmpty() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int i;
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertTrue((boolean)state.isEmpty());
            int stateSize = 1024;
            for (i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
                Assert.assertFalse((boolean)state.isEmpty());
            }
            for (i = 0; i < stateSize; ++i) {
                Assert.assertFalse((boolean)state.isEmpty());
                state.remove((Object)i);
            }
            Assert.assertTrue((boolean)state.isEmpty());
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateIteratorArbitraryAccess() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            int stateSize = 4096;
            for (int i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
            }
            Iterator iterator = state.iterator();
            int iteratorCount = 0;
            while (iterator.hasNext()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                Assert.assertEquals((long)iteratorCount, (long)((Integer)entry.getKey()).intValue());
                switch (ThreadLocalRandom.current().nextInt() % 3) {
                    case 0: {
                        iterator.remove();
                        try {
                            iterator.remove();
                            Assert.fail();
                        }
                        catch (IllegalStateException illegalStateException) {}
                        break;
                    }
                    case 1: {
                        iterator.hasNext();
                        iterator.remove();
                        break;
                    }
                }
                ++iteratorCount;
            }
            Assert.assertEquals((long)stateSize, (long)iteratorCount);
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testValueStateNullAsDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.value());
        state.update((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.value());
        state.clear();
        Assert.assertNull((Object)state.value());
        backend.dispose();
    }

    @Test
    public void testValueStateDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, (Object)"Hello");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Hello", (Object)state.value());
        state.update((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.value());
        state.clear();
        Assert.assertEquals((Object)"Hello", (Object)state.value());
        backend.dispose();
    }

    @Test
    public void testReducingStateDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        state.add((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.get());
        state.clear();
        Assert.assertNull((Object)state.get());
        backend.dispose();
    }

    @Test
    public void testListStateDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        state.update(Arrays.asList("Ciao", "Bello"));
        Assert.assertThat((Object)state.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"Ciao", "Bello"}));
        state.clear();
        Assert.assertNull((Object)state.get());
        backend.dispose();
    }

    @Test
    public void testMapStateDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        MapStateDescriptor kvId = new MapStateDescriptor("id", String.class, String.class);
        MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNotNull((Object)state.entries());
        Assert.assertFalse((boolean)state.entries().iterator().hasNext());
        state.put((Object)"Ciao", (Object)"Hello");
        state.put((Object)"Bello", (Object)"Nice");
        Assert.assertNotNull((Object)state.entries());
        Assert.assertEquals((Object)state.get((Object)"Ciao"), (Object)"Hello");
        Assert.assertEquals((Object)state.get((Object)"Bello"), (Object)"Nice");
        state.clear();
        Assert.assertNotNull((Object)state.entries());
        Assert.assertFalse((boolean)state.entries().iterator().hasNext());
        backend.dispose();
    }

    private static final class MutableLong {
        long value;

        private MutableLong() {
        }
    }

    private static class ImmutableAggregatingAddingFunction
    implements AggregateFunction<Long, Long, Long> {
        private ImmutableAggregatingAddingFunction() {
        }

        public Long createAccumulator() {
            return 0L;
        }

        public Long add(Long value, Long accumulator) {
            accumulator = accumulator + value;
            return accumulator;
        }

        public Long getResult(Long accumulator) {
            return accumulator;
        }

        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    private static class MutableAggregatingAddingFunction
    implements AggregateFunction<Long, MutableLong, Long> {
        private MutableAggregatingAddingFunction() {
        }

        public MutableLong createAccumulator() {
            return new MutableLong();
        }

        public MutableLong add(Long value, MutableLong accumulator) {
            accumulator.value += value.longValue();
            return accumulator;
        }

        public Long getResult(MutableLong accumulator) {
            return accumulator.value;
        }

        public MutableLong merge(MutableLong a, MutableLong b) {
            a.value += b.value;
            return a;
        }
    }

    private static class AppendingReduce
    implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1 + "," + value2;
        }
    }
}

