/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.UserStateReference;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.transforms.UpdatingCombineFn;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.MultimapState;
import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateContexts;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Ints;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedBytes;
import org.apache.samza.config.Config;
import org.apache.samza.context.TaskContext;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Instant;

public class SamzaStoreStateInternals<@UnknownKeyFor K>
implements StateInternals {
    static final @UnknownKeyFor @NonNull @Initialized String BEAM_STORE = "beamStore";
    private static final @UnknownKeyFor @NonNull @Initialized ThreadLocal<@UnknownKeyFor @NonNull @Initialized SoftReference<@UnknownKeyFor @NonNull @Initialized ByteArrayOutputStream>> threadLocalBaos = new ThreadLocal();
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized KeyValueStore<@UnknownKeyFor @NonNull @Initialized ByteArray, @UnknownKeyFor @NonNull @Initialized StateValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>>> stores;
    private final K key;
    private final @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] keyBytes;
    private final @UnknownKeyFor @NonNull @Initialized int batchGetSize;
    private final @UnknownKeyFor @NonNull @Initialized String stageId;

    private SamzaStoreStateInternals(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized KeyValueStore<@UnknownKeyFor @NonNull @Initialized ByteArray, @UnknownKeyFor @NonNull @Initialized StateValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>>> stores, @Nullable K key, @UnknownKeyFor @NonNull @Initialized byte @Nullable @UnknownKeyFor @Initialized [] keyBytes, @UnknownKeyFor @NonNull @Initialized String stageId, @UnknownKeyFor @NonNull @Initialized int batchGetSize) {
        this.stores = stores;
        this.key = key;
        this.keyBytes = keyBytes;
        this.batchGetSize = batchGetSize;
        this.stageId = stageId;
    }

    static /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized KeyValueStore<@UnknownKeyFor @NonNull @Initialized ByteArray, @UnknownKeyFor @NonNull @Initialized StateValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getBeamStore(@UnknownKeyFor @NonNull @Initialized TaskContext context) {
        return context.getStore(BEAM_STORE);
    }

    static <K> @UnknownKeyFor @NonNull @Initialized Factory<K> createNonKeyedStateInternalsFactory(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized TaskContext context, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions) {
        return SamzaStoreStateInternals.createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptyMap());
    }

    static <K> @UnknownKeyFor @NonNull @Initialized Factory<K> createStateInternalsFactory(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder, @UnknownKeyFor @NonNull @Initialized TaskContext context, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions, @UnknownKeyFor @NonNull @Initialized ExecutableStage executableStage) {
        Map<String, String> stateIdToStoreMap = executableStage.getUserStates().stream().collect(Collectors.toMap(UserStateReference::localName, UserStateReference::localName));
        return SamzaStoreStateInternals.createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIdToStoreMap);
    }

    static <K> @UnknownKeyFor @NonNull @Initialized Factory<K> createStateInternalsFactory(@UnknownKeyFor @NonNull @Initialized String id, @Nullable @UnknownKeyFor @Initialized Coder<K> keyCoder, @UnknownKeyFor @NonNull @Initialized TaskContext context, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> stateIdToStoreMap) {
        Coder<K> stateKeyCoder;
        int batchGetSize = pipelineOptions.getStoreBatchGetSize();
        HashMap stores = new HashMap();
        stores.put(BEAM_STORE, SamzaStoreStateInternals.getBeamStore(context));
        if (keyCoder != null) {
            stateIdToStoreMap.keySet().forEach(stateId -> stores.put((String)stateId, (KeyValueStore<ByteArray, StateValue<?>>)context.getStore((String)stateIdToStoreMap.get(stateId))));
            stateKeyCoder = keyCoder;
        } else {
            stateKeyCoder = VoidCoder.of();
        }
        return new Factory<K>(Objects.toString(id), stores, stateKeyCoder, batchGetSize);
    }

    public K getKey() {
        return this.key;
    }

    public <T extends State> T state(@UnknownKeyFor @NonNull @Initialized StateNamespace stateNamespace, @UnknownKeyFor @NonNull @Initialized StateTag<T> stateTag) {
        return this.state(stateNamespace, stateTag, StateContexts.nullContext());
    }

    public <V extends State> V state(final @UnknownKeyFor @NonNull @Initialized StateNamespace namespace, final @UnknownKeyFor @NonNull @Initialized StateTag<V> address, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized StateContext<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> stateContext) {
        return (V)address.bind(new StateTag.StateBinder(){

            public <T> @UnknownKeyFor @NonNull @Initialized ValueState<T> bindValue(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized ValueState<T>> spec, @UnknownKeyFor @NonNull @Initialized Coder<T> coder) {
                return new SamzaValueState(namespace, address, coder);
            }

            public <T> @UnknownKeyFor @NonNull @Initialized BagState<T> bindBag(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized BagState<T>> spec, @UnknownKeyFor @NonNull @Initialized Coder<T> elemCoder) {
                return new SamzaBagState(namespace, address, elemCoder);
            }

            public <T> @UnknownKeyFor @NonNull @Initialized SetState<T> bindSet(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized SetState<T>> spec, @UnknownKeyFor @NonNull @Initialized Coder<T> elemCoder) {
                return new SamzaSetStateImpl(namespace, address, elemCoder);
            }

            public <KeyT, ValueT> @UnknownKeyFor @NonNull @Initialized MapState<KeyT, ValueT> bindMap(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized MapState<KeyT, ValueT>> spec, @UnknownKeyFor @NonNull @Initialized Coder<KeyT> mapKeyCoder, @UnknownKeyFor @NonNull @Initialized Coder<ValueT> mapValueCoder) {
                return new SamzaMapStateImpl<KeyT, ValueT>(namespace, (StateTag<? extends State>)address, mapKeyCoder, mapValueCoder);
            }

            public <KeyT, ValueT> @UnknownKeyFor @NonNull @Initialized MultimapState<KeyT, ValueT> bindMultimap(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized MultimapState<KeyT, ValueT>> spec, @UnknownKeyFor @NonNull @Initialized Coder<KeyT> keyCoder, @UnknownKeyFor @NonNull @Initialized Coder<ValueT> valueCoder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", MultimapState.class.getSimpleName()));
            }

            public <T> @UnknownKeyFor @NonNull @Initialized OrderedListState<T> bindOrderedList(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized OrderedListState<T>> spec, @UnknownKeyFor @NonNull @Initialized Coder<T> elemCoder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", OrderedListState.class.getSimpleName()));
            }

            public <InputT, AccumT, OutputT> @UnknownKeyFor @NonNull @Initialized CombiningState<InputT, AccumT, OutputT> bindCombiningValue(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized CombiningState<InputT, AccumT, OutputT>> spec, @UnknownKeyFor @NonNull @Initialized Coder<AccumT> accumCoder, // Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                return new SamzaAccumulatorCombiningState<InputT, AccumT, OutputT>(namespace, (StateTag<? extends State>)address, accumCoder, combineFn);
            }

            public <InputT, AccumT, OutputT> @UnknownKeyFor @NonNull @Initialized CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized CombiningState<InputT, AccumT, OutputT>> spec, @UnknownKeyFor @NonNull @Initialized Coder<AccumT> accumCoder, // Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
                throw new UnsupportedOperationException(String.format("%s is not supported", CombiningState.class.getSimpleName()));
            }

            public @UnknownKeyFor @NonNull @Initialized WatermarkHoldState bindWatermark(@UnknownKeyFor @NonNull @Initialized StateTag<@UnknownKeyFor @NonNull @Initialized WatermarkHoldState> spec, @UnknownKeyFor @NonNull @Initialized TimestampCombiner timestampCombiner) {
                return new SamzaWatermarkHoldState(namespace, address, timestampCombiner);
            }
        });
    }

    private static @UnknownKeyFor @NonNull @Initialized ByteArrayOutputStream getThreadLocalBaos() {
        ByteArrayOutputStream baos;
        SoftReference<ByteArrayOutputStream> refBaos = threadLocalBaos.get();
        ByteArrayOutputStream byteArrayOutputStream = baos = refBaos == null ? null : refBaos.get();
        if (baos == null) {
            baos = new ByteArrayOutputStream();
            threadLocalBaos.set(new SoftReference<ByteArrayOutputStream>(baos));
        }
        baos.reset();
        return baos;
    }

    public static class StateValueSerdeFactory
    implements SerdeFactory<StateValue<?>> {
        public /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Serde<@UnknownKeyFor @NonNull @Initialized StateValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getSerde(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized Config config) {
            return new StateValueSerde();
        }

        public static class StateValueSerde
        implements Serde<StateValue<?>> {
            public /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized StateValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> fromBytes(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] bytes) {
                return StateValue.of(bytes);
            }

            public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] toBytes(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized StateValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> stateValue) {
                return stateValue == null ? null : stateValue.getValueBytes();
            }
        }
    }

    public static class StateValue<@UnknownKeyFor T>
    implements Serializable {
        private T value;
        private @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder;
        private @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] valueBytes;

        private StateValue(T value, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder, @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] valueBytes) {
            this.value = value;
            this.valueCoder = valueCoder;
            this.valueBytes = valueBytes;
        }

        public static <T> @UnknownKeyFor @NonNull @Initialized StateValue<T> of(T value, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
            return new StateValue<T>(value, valueCoder, null);
        }

        public static <T> @UnknownKeyFor @NonNull @Initialized StateValue<T> of(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] valueBytes) {
            return new StateValue<Object>(null, null, valueBytes);
        }

        public T getValue(@UnknownKeyFor @NonNull @Initialized Coder<T> coder) {
            if (this.value == null && this.valueBytes != null) {
                if (this.valueCoder == null) {
                    this.valueCoder = coder;
                }
                try {
                    this.value = this.valueCoder.decode((InputStream)new ByteArrayInputStream(this.valueBytes));
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not decode state", e);
                }
            }
            return this.value;
        }

        public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] getValueBytes() {
            if (this.valueBytes == null && this.value != null) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                try {
                    this.valueCoder.encode(this.value, (OutputStream)baos);
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not encode state value: " + this.value, e);
                }
                this.valueBytes = baos.toByteArray();
            }
            return this.valueBytes;
        }
    }

    public static class ByteArraySerdeFactory
    implements SerdeFactory<ByteArray> {
        public @UnknownKeyFor @NonNull @Initialized Serde<@UnknownKeyFor @NonNull @Initialized ByteArray> getSerde(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized Config config) {
            return new ByteArraySerde();
        }

        public static class ByteArraySerde
        implements Serde<ByteArray> {
            public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] toBytes(@UnknownKeyFor @NonNull @Initialized ByteArray byteArray) {
                return byteArray.value;
            }

            public @UnknownKeyFor @NonNull @Initialized ByteArray fromBytes(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] bytes) {
                return ByteArray.of(bytes);
            }
        }
    }

    public static class ByteArray
    implements Serializable,
    Comparable<ByteArray> {
        private final @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] value;

        public static @UnknownKeyFor @NonNull @Initialized ByteArray of(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] value) {
            return new ByteArray(value);
        }

        private ByteArray(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] value) {
            this.value = value;
        }

        public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] getValue() {
            return this.value;
        }

        @EnsuresNonNullIf(expression={"#1"}, result=true)
        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean equals(@Nullable @UnknownKeyFor @Initialized Object o) {
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ByteArray byteArray = (ByteArray)o;
            return Arrays.equals(this.value, byteArray.value);
        }

        @Pure
        public @UnknownKeyFor @NonNull @Initialized int hashCode() {
            return this.value != null ? Arrays.hashCode(this.value) : 0;
        }

        @Override
        @Pure
        public @UnknownKeyFor @NonNull @Initialized int compareTo(@UnknownKeyFor @NonNull @Initialized ByteArray other) {
            return UnsignedBytes.lexicographicalComparator().compare(this.value, other.value);
        }
    }

    private class SamzaWatermarkHoldState
    extends AbstractSamzaState<Instant>
    implements WatermarkHoldState {
        private final @UnknownKeyFor @NonNull @Initialized TimestampCombiner timestampCombiner;

        public <V extends State> SamzaWatermarkHoldState(@UnknownKeyFor @NonNull @Initialized StateNamespace namespace, @UnknownKeyFor @NonNull @Initialized StateTag<V> address, TimestampCombiner timestampCombiner) {
            super(namespace, (StateTag<? extends State>)address, InstantCoder.of());
            this.timestampCombiner = timestampCombiner;
        }

        public void add(@UnknownKeyFor @NonNull @Initialized Instant value) {
            Instant combinedValue;
            Instant currentValue = (Instant)this.readInternal();
            Instant instant = combinedValue = currentValue == null ? value : this.timestampCombiner.combine(new Instant[]{currentValue, value});
            if (!combinedValue.equals((Object)currentValue)) {
                this.writeInternal(combinedValue);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
            return this.isEmptyInternal();
        }

        public @UnknownKeyFor @NonNull @Initialized Instant read() {
            return (Instant)this.readInternal();
        }

        public @UnknownKeyFor @NonNull @Initialized TimestampCombiner getTimestampCombiner() {
            return this.timestampCombiner;
        }

        public @UnknownKeyFor @NonNull @Initialized WatermarkHoldState readLater() {
            return this;
        }

        public void clear() {
            this.clearInternal();
        }
    }

    private class SamzaAccumulatorCombiningState<@UnknownKeyFor InT, @UnknownKeyFor AccumT, @UnknownKeyFor OutT>
    extends AbstractSamzaState<AccumT>
    implements CombiningState<InT, AccumT, OutT> {
        private final // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<InT, AccumT, OutT> combineFn;

        protected SamzaAccumulatorCombiningState(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized StateNamespace namespace, @UnknownKeyFor @NonNull @Initialized StateTag<? extends State> address, @UnknownKeyFor @NonNull @Initialized Coder<AccumT> coder, Combine.CombineFn<InT, AccumT, OutT> combineFn) {
            super(namespace, address, coder);
            this.combineFn = combineFn;
        }

        public void clear() {
            this.clearInternal();
        }

        public void add(InT value) {
            AccumT accum = this.getAccum();
            Object current = this.combineFn.addInput(accum, value);
            this.writeInternal(current);
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
            return this.isEmptyInternal();
        }

        public AccumT getAccum() {
            Object accum = this.readInternal();
            return (AccumT)(accum != null ? accum : this.combineFn.createAccumulator());
        }

        public void addAccum(AccumT accum) {
            AccumT currentAccum = this.getAccum();
            Object mergedAccum = this.mergeAccumulators(Arrays.asList(currentAccum, accum));
            this.writeInternal(mergedAccum);
        }

        public AccumT mergeAccumulators(@UnknownKeyFor @NonNull @Initialized Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators);
        }

        public @UnknownKeyFor @NonNull @Initialized CombiningState<InT, AccumT, OutT> readLater() {
            return this;
        }

        @Nonnull
        public @NonNull OutT read() {
            AccumT accum = this.getAccum();
            Object output = this.combineFn.extractOutput(accum);
            if (this.combineFn instanceof UpdatingCombineFn) {
                AccumT updatedAccum = ((UpdatingCombineFn)this.combineFn).updateAfterFiring(accum);
                this.writeInternal(updatedAccum);
            }
            return (OutT)output;
        }
    }

    private class SamzaMapStateImpl<@UnknownKeyFor KeyT, @UnknownKeyFor ValueT>
    extends AbstractSamzaState<ValueT>
    implements SamzaMapState<KeyT, ValueT> {
        private final @UnknownKeyFor @NonNull @Initialized Coder<KeyT> keyCoder;
        private final @UnknownKeyFor @NonNull @Initialized int storeKeySize;
        private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized KeyValueIterator<@UnknownKeyFor @NonNull @Initialized ByteArray, @UnknownKeyFor @NonNull @Initialized StateValue<ValueT>>> openIterators;
        private @UnknownKeyFor @NonNull @Initialized int maxKeySize;

        protected SamzaMapStateImpl(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized StateNamespace namespace, @UnknownKeyFor @NonNull @Initialized StateTag<? extends State> address, @UnknownKeyFor @NonNull @Initialized Coder<KeyT> keyCoder, Coder<ValueT> valueCoder) {
            super(namespace, address, valueCoder);
            this.openIterators = Collections.synchronizedList(new ArrayList());
            this.keyCoder = keyCoder;
            this.storeKeySize = this.getEncodedStoreKeyBytes().length;
            this.maxKeySize = this.storeKeySize + 100000;
        }

        public void put(KeyT key, ValueT value) {
            ByteArray encodedKey = this.encodeKey(key);
            this.maxKeySize = Math.max(this.maxKeySize, encodedKey.getValue().length);
            this.store.put((Object)encodedKey, StateValue.of(value, this.coder));
        }

        public @Nullable @UnknownKeyFor @Initialized ReadableState<ValueT> computeIfAbsent(KeyT key, @UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @Nullable @Initialized ? super KeyT, @KeyForBottom @NonNull @Initialized ? extends ValueT> mappingFunction) {
            ByteArray encodedKey = this.encodeKey(key);
            Object current = this.decodeValue((StateValue)this.store.get((Object)encodedKey));
            if (current == null) {
                this.put(key, mappingFunction.apply(key));
            }
            return current == null ? null : ReadableStates.immediate(current);
        }

        public void remove(KeyT key) {
            this.store.delete((Object)this.encodeKey(key));
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> get(KeyT key) {
            return this.getOrDefault(key, null);
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> getOrDefault(final KeyT key, final @Nullable ValueT defaultValue) {
            return new ReadableState<ValueT>(){

                public @Nullable ValueT read() {
                    Object value = SamzaMapStateImpl.this.decodeValue((StateValue)SamzaMapStateImpl.this.store.get((Object)SamzaMapStateImpl.this.encodeKey(key)));
                    return value != null ? value : defaultValue;
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> readLater() {
                    return this;
                }
            };
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<KeyT>> keys() {
            return new ReadableState<Iterable<KeyT>>(){

                public @UnknownKeyFor @NonNull @Initialized Iterable<KeyT> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> SamzaMapStateImpl.this.decodeKey((ByteArray)entry.getKey()));
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<KeyT>> readLater() {
                    return this;
                }
            };
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<ValueT>> values() {
            return new ReadableState<Iterable<ValueT>>(){

                public @UnknownKeyFor @NonNull @Initialized Iterable<ValueT> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> SamzaMapStateImpl.this.decodeValue((StateValue)entry.getValue()));
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<ValueT>> readLater() {
                    return this;
                }
            };
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>>> entries() {
            return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>(){

                public @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((ByteArray)entry.getKey()), SamzaMapStateImpl.this.decodeValue((StateValue)entry.getValue())));
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }
            };
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
            final ReadableState<Iterable<KeyT>> keys = this.keys();
            return new ReadableState<Boolean>(){

                public @Nullable @UnknownKeyFor @Initialized Boolean read() {
                    return Iterables.isEmpty((Iterable)((Iterable)keys.read()));
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> readLater() {
                    keys.readLater();
                    return this;
                }
            };
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterator<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>>> readIterator() {
            ByteArray maxKey = this.createMaxKey();
            final KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            this.openIterators.add(kvIter);
            return new ReadableState<Iterator<Map.Entry<KeyT, ValueT>>>(){

                public @Nullable @UnknownKeyFor @Initialized Iterator<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>> read() {
                    return new Iterator<Map.Entry<KeyT, ValueT>>(){

                        @Override
                        @Pure
                        public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
                            boolean hasNext = kvIter.hasNext();
                            if (!hasNext) {
                                kvIter.close();
                                SamzaMapStateImpl.this.openIterators.remove(kvIter);
                            }
                            return hasNext;
                        }

                        @Override
                        public @UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT> next() {
                            Entry entry = (Entry)kvIter.next();
                            return new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((ByteArray)entry.getKey()), SamzaMapStateImpl.this.decodeValue((StateValue)entry.getValue()));
                        }
                    };
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterator<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }
            };
        }

        private <OutputT> @UnknownKeyFor @NonNull @Initialized Iterable<OutputT> createIterable(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Entry<@UnknownKeyFor @NonNull @Initialized ByteArray, @UnknownKeyFor @NonNull @Initialized StateValue<ValueT>>, OutputT> fn) {
            ByteArray maxKey = this.createMaxKey();
            KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            ImmutableList iterable = ImmutableList.copyOf((Iterator)kvIter);
            kvIter.close();
            return new Iterable<OutputT>((List)iterable, fn){
                final /* synthetic */ List val$iterable;
                final /* synthetic */ SerializableFunction val$fn;
                {
                    this.val$iterable = list;
                    this.val$fn = serializableFunction;
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized Iterator<OutputT> iterator() {
                    final Iterator iter = this.val$iterable.iterator();
                    return new Iterator<OutputT>(){

                        @Override
                        @Pure
                        public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
                            return iter.hasNext();
                        }

                        @Override
                        public OutputT next() {
                            return val$fn.apply((Object)((Entry)iter.next()));
                        }
                    };
                }
            };
        }

        public void clear() {
            ByteArray maxKey = this.createMaxKey();
            KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            while (kvIter.hasNext()) {
                this.store.delete((Object)((ByteArray)((Entry)kvIter.next()).getKey()));
            }
            kvIter.close();
        }

        private @UnknownKeyFor @NonNull @Initialized ByteArray encodeKey(KeyT key) {
            try {
                ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
                baos.write(this.getEncodedStoreKeyBytes());
                this.keyCoder.encode(key, (OutputStream)baos);
                return ByteArray.of(baos.toByteArray());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private KeyT decodeKey(@UnknownKeyFor @NonNull @Initialized ByteArray keyBytes) {
            try {
                byte[] realKey = Arrays.copyOfRange(keyBytes.value, this.storeKeySize, keyBytes.value.length);
                return (KeyT)this.keyCoder.decode((InputStream)new ByteArrayInputStream(realKey));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private @UnknownKeyFor @NonNull @Initialized ByteArray createMaxKey() {
            byte[] maxKey = new byte[this.maxKeySize];
            Arrays.fill(maxKey, (byte)-1);
            byte[] encodedKey = this.getEncodedStoreKeyBytes();
            System.arraycopy(encodedKey, 0, maxKey, 0, encodedKey.length);
            return ByteArray.of(maxKey);
        }

        @Override
        public void closeIterators() {
            this.openIterators.forEach(KeyValueIterator::close);
            this.openIterators.clear();
        }
    }

    private class SamzaSetStateImpl<@UnknownKeyFor T>
    implements SamzaSetState<T> {
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized SamzaStoreStateInternals. @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized SamzaMapStateImpl<T, Boolean> mapState;

        private SamzaSetStateImpl(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized StateNamespace namespace, @UnknownKeyFor @NonNull @Initialized StateTag<? extends State> address, Coder<T> coder) {
            this.mapState = new SamzaMapStateImpl(namespace, (StateTag<? extends State>)((StateTag<State>)address), coder, BooleanCoder.of());
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> contains(T t) {
            return this.mapState.get(t);
        }

        public @Nullable @UnknownKeyFor @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> addIfAbsent(T t) {
            return this.mapState.putIfAbsent(t, true);
        }

        public void remove(T t) {
            this.mapState.remove(t);
        }

        public void add(T value) {
            this.mapState.put(value, true);
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public @UnknownKeyFor @NonNull @Initialized Boolean read() {
                    return Iterables.isEmpty((Iterable)((Iterable)SamzaSetStateImpl.this.mapState.entries().read()));
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> readLater() {
                    return this;
                }
            };
        }

        public @UnknownKeyFor @NonNull @Initialized Iterable<T> read() {
            return (Iterable)this.mapState.keys().read();
        }

        public @UnknownKeyFor @NonNull @Initialized SetState<T> readLater() {
            return this;
        }

        public void clear() {
            this.mapState.clear();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterator<T>> readIterator() {
            final Iterator iter = (Iterator)this.mapState.readIterator().read();
            return new ReadableState<Iterator<T>>(){

                public @Nullable @UnknownKeyFor @Initialized Iterator<T> read() {
                    return new Iterator<T>(){

                        @Override
                        @Pure
                        public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
                            return iter.hasNext();
                        }

                        @Override
                        public T next() {
                            return ((Map.Entry)iter.next()).getKey();
                        }
                    };
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterator<T>> readLater() {
                    return this;
                }
            };
        }

        @Override
        public void closeIterators() {
            this.mapState.closeIterators();
        }
    }

    private class SamzaBagState<@UnknownKeyFor T>
    extends AbstractSamzaState<T>
    implements BagState<T> {
        private SamzaBagState(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized StateNamespace namespace, @UnknownKeyFor @NonNull @Initialized StateTag<? extends State> address, Coder<T> coder) {
            super(namespace, address, coder);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void add(T value) {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                ByteArray encodedKey = this.encodeKey(size);
                this.store.put((Object)encodedKey, StateValue.of(value, this.coder));
                this.store.put((Object)this.getEncodedStoreKey(), StateValue.of(Ints.toByteArray((int)(size + 1))));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                return this.isEmptyInternal();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nonnull
        public @UnknownKeyFor @NonNull @Initialized List<T> read() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                if (size == 0) {
                    return Collections.emptyList();
                }
                ArrayList values = new ArrayList(size);
                ArrayList<ByteArray> keys = new ArrayList<ByteArray>(size);
                for (int start = 0; start < size; start += SamzaStoreStateInternals.this.batchGetSize) {
                    int end = Math.min(size, start + SamzaStoreStateInternals.this.batchGetSize);
                    for (int i = start; i < end; ++i) {
                        keys.add(this.encodeKey(i));
                    }
                    this.store.getAll(keys).values().forEach(value -> values.add(this.decodeValue(value)));
                    keys.clear();
                }
                return values;
            }
        }

        public @UnknownKeyFor @NonNull @Initialized BagState<T> readLater() {
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void clear() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                if (size != 0) {
                    ArrayList<ByteArray> keys = new ArrayList<ByteArray>(size);
                    for (int i = 0; i < size; ++i) {
                        keys.add(this.encodeKey(i));
                    }
                    this.store.deleteAll(keys);
                    this.store.delete((Object)this.getEncodedStoreKey());
                }
            }
        }

        private @UnknownKeyFor @NonNull @Initialized int getSize() {
            StateValue stateSize = (StateValue)this.store.get((Object)this.getEncodedStoreKey());
            return stateSize == null || stateSize.valueBytes == null ? 0 : Ints.fromByteArray((byte[])stateSize.valueBytes);
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private @UnknownKeyFor @NonNull @Initialized ByteArray encodeKey(@UnknownKeyFor @NonNull @Initialized int size) {
            ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
            try (DataOutputStream dos = new DataOutputStream(baos);){
                dos.write(this.getEncodedStoreKeyBytes());
                dos.writeInt(size);
                ByteArray byteArray = ByteArray.of(baos.toByteArray());
                return byteArray;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private class SamzaValueState<@UnknownKeyFor T>
    extends AbstractSamzaState<T>
    implements ValueState<T> {
        private SamzaValueState(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized StateNamespace namespace, @UnknownKeyFor @NonNull @Initialized StateTag<? extends State> address, Coder<T> coder) {
            super(namespace, address, coder);
        }

        public void write(T input) {
            this.writeInternal(input);
        }

        public T read() {
            return this.readInternal();
        }

        public @UnknownKeyFor @NonNull @Initialized ValueState<T> readLater() {
            return this;
        }

        public void clear() {
            this.clearInternal();
        }
    }

    private abstract class AbstractSamzaState<@UnknownKeyFor T> {
        private final @UnknownKeyFor @NonNull @Initialized StateNamespace namespace;
        private final @UnknownKeyFor @NonNull @Initialized String addressId;
        private final @UnknownKeyFor @NonNull @Initialized boolean isBeamStore;
        private final @UnknownKeyFor @NonNull @Initialized String stageId;
        private final @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] keyBytes;
        private @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] encodedStoreKey;
        protected final @UnknownKeyFor @NonNull @Initialized Coder<T> coder;
        protected final @UnknownKeyFor @NonNull @Initialized KeyValueStore<@UnknownKeyFor @NonNull @Initialized ByteArray, @UnknownKeyFor @NonNull @Initialized StateValue<T>> store;

        protected AbstractSamzaState(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized StateNamespace namespace, @UnknownKeyFor @NonNull @Initialized StateTag<? extends State> address, Coder<T> coder) {
            this.coder = coder;
            this.namespace = namespace;
            this.addressId = address.getId();
            this.isBeamStore = !SamzaStoreStateInternals.this.stores.containsKey(address.getId());
            this.store = this.isBeamStore ? (KeyValueStore)SamzaStoreStateInternals.this.stores.get(SamzaStoreStateInternals.BEAM_STORE) : (KeyValueStore)SamzaStoreStateInternals.this.stores.get(address.getId());
            this.stageId = SamzaStoreStateInternals.this.stageId;
            this.keyBytes = SamzaStoreStateInternals.this.keyBytes;
        }

        protected void clearInternal() {
            this.store.delete((Object)this.getEncodedStoreKey());
        }

        protected void writeInternal(T value) {
            this.store.put((Object)this.getEncodedStoreKey(), StateValue.of(value, this.coder));
        }

        protected T readInternal() {
            StateValue stateValue = (StateValue)this.store.get((Object)this.getEncodedStoreKey());
            return this.decodeValue(stateValue);
        }

        protected @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmptyInternal() {
            return new ReadableState<Boolean>(){

                public @UnknownKeyFor @NonNull @Initialized Boolean read() {
                    return AbstractSamzaState.this.store.get((Object)AbstractSamzaState.this.getEncodedStoreKey()) == null;
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> readLater() {
                    return this;
                }
            };
        }

        protected @UnknownKeyFor @NonNull @Initialized ByteArray getEncodedStoreKey() {
            return ByteArray.of(this.getEncodedStoreKeyBytes());
        }

        protected @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] getEncodedStoreKeyBytes() {
            if (this.encodedStoreKey == null) {
                ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
                try (DataOutputStream dos = new DataOutputStream(baos);){
                    dos.write(this.keyBytes);
                    dos.writeUTF(this.namespace.stringKey());
                    if (this.isBeamStore) {
                        dos.writeUTF(this.stageId);
                        dos.writeUTF(this.addressId);
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not encode full address for state: " + this.addressId, e);
                }
                this.encodedStoreKey = baos.toByteArray();
            }
            return this.encodedStoreKey;
        }

        protected T decodeValue(@UnknownKeyFor @NonNull @Initialized StateValue<T> stateValue) {
            return stateValue == null ? null : (T)stateValue.getValue(this.coder);
        }

        @EnsuresNonNullIf(expression={"#1"}, result=true)
        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean equals(@Nullable @UnknownKeyFor @Initialized Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AbstractSamzaState that = (AbstractSamzaState)o;
            if (!(!this.isBeamStore && !that.isBeamStore || this.isBeamStore && that.isBeamStore && this.stageId.equals(that.stageId))) {
                return false;
            }
            return Arrays.equals(this.keyBytes, that.keyBytes) && this.addressId.equals(that.addressId) && this.namespace.equals(that.namespace);
        }

        @Pure
        public @UnknownKeyFor @NonNull @Initialized int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + Arrays.hashCode(this.getEncodedStoreKeyBytes());
            return result;
        }
    }

    public static class Factory<@UnknownKeyFor K>
    implements StateInternalsFactory<K> {
        private final @UnknownKeyFor @NonNull @Initialized String stageId;
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized KeyValueStore<@UnknownKeyFor @NonNull @Initialized ByteArray, @UnknownKeyFor @NonNull @Initialized StateValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>>> stores;
        private final @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder;
        private final @UnknownKeyFor @NonNull @Initialized int batchGetSize;

        public Factory(@UnknownKeyFor @NonNull @Initialized String stageId, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized KeyValueStore<@UnknownKeyFor @NonNull @Initialized ByteArray, @UnknownKeyFor @NonNull @Initialized StateValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>>> stores, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder, @UnknownKeyFor @NonNull @Initialized int batchGetSize) {
            this.stageId = stageId;
            this.stores = stores;
            this.keyCoder = keyCoder;
            this.batchGetSize = batchGetSize;
        }

        public @UnknownKeyFor @NonNull @Initialized StateInternals stateInternalsForKey(@Nullable K key) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(baos);
            try {
                if (key != null) {
                    this.keyCoder.encode(key, (OutputStream)baos);
                }
                byte[] keyBytes = baos.toByteArray();
                baos.reset();
                dos.write(keyBytes.length);
                dos.write(keyBytes);
            }
            catch (IOException e) {
                throw new RuntimeException("Cannot encode key for state store", e);
            }
            return new SamzaStoreStateInternals(this.stores, key, baos.toByteArray(), this.stageId, this.batchGetSize);
        }
    }
}

