/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.transforms.UpdatingCombineFn;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateContexts;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.UnsignedBytes;
import org.apache.samza.config.Config;
import org.apache.samza.context.TaskContext;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

public class SamzaStoreStateInternals<K>
implements StateInternals {
    static final String BEAM_STORE = "beamStore";
    private static ThreadLocal<SoftReference<ByteArrayOutputStream>> threadLocalBaos = new ThreadLocal();
    private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
    private final K key;
    private final byte[] keyBytes;
    private final int batchGetSize;
    private final String stageId;

    private SamzaStoreStateInternals(Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores, @Nullable K key, byte @Nullable [] keyBytes, String stageId, int batchGetSize) {
        this.stores = stores;
        this.key = key;
        this.keyBytes = keyBytes;
        this.batchGetSize = batchGetSize;
        this.stageId = stageId;
    }

    static KeyValueStore<ByteArray, StateValue<?>> getBeamStore(TaskContext context) {
        return context.getStore(BEAM_STORE);
    }

    static <K> Factory<K> createStateInternalFactory(String id, Coder<K> keyCoder, TaskContext context, SamzaPipelineOptions pipelineOptions, DoFnSignature signature) {
        Coder<K> stateKeyCoder;
        int batchGetSize = pipelineOptions.getStoreBatchGetSize();
        HashMap stores = new HashMap();
        stores.put(BEAM_STORE, SamzaStoreStateInternals.getBeamStore(context));
        if (keyCoder != null) {
            signature.stateDeclarations().keySet().forEach(stateId -> stores.put((String)stateId, (KeyValueStore<ByteArray, StateValue<?>>)context.getStore(stateId)));
            stateKeyCoder = keyCoder;
        } else {
            stateKeyCoder = VoidCoder.of();
        }
        return new Factory<K>(Objects.toString(id), stores, stateKeyCoder, batchGetSize);
    }

    public K getKey() {
        return this.key;
    }

    public <T extends State> T state(StateNamespace stateNamespace, StateTag<T> stateTag) {
        return this.state(stateNamespace, stateTag, StateContexts.nullContext());
    }

    public <V extends State> V state(final StateNamespace namespace, final StateTag<V> address, StateContext<?> stateContext) {
        return (V)address.bind(new StateTag.StateBinder(){

            public <T> ValueState<T> bindValue(StateTag<ValueState<T>> spec, Coder<T> coder) {
                return new SamzaValueState(namespace, address, coder);
            }

            public <T> BagState<T> bindBag(StateTag<BagState<T>> spec, Coder<T> elemCoder) {
                return new SamzaBagState(namespace, address, elemCoder);
            }

            public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
                return new SamzaSetStateImpl(namespace, address, elemCoder);
            }

            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
                return new SamzaMapStateImpl<KeyT, ValueT>(namespace, (StateTag<? extends State>)address, mapKeyCoder, mapValueCoder);
            }

            public <T> OrderedListState<T> bindOrderedList(StateTag<OrderedListState<T>> spec, Coder<T> elemCoder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", OrderedListState.class.getSimpleName()));
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                return new SamzaAccumulatorCombiningState<InputT, AccumT, OutputT>(namespace, (StateTag<? extends State>)address, accumCoder, combineFn);
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
                throw new UnsupportedOperationException(String.format("%s is not supported", CombiningState.class.getSimpleName()));
            }

            public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
                return new SamzaWatermarkHoldState(namespace, address, timestampCombiner);
            }
        });
    }

    private static ByteArrayOutputStream getThreadLocalBaos() {
        ByteArrayOutputStream baos;
        SoftReference<ByteArrayOutputStream> refBaos = threadLocalBaos.get();
        ByteArrayOutputStream byteArrayOutputStream = baos = refBaos == null ? null : refBaos.get();
        if (baos == null) {
            baos = new ByteArrayOutputStream();
            threadLocalBaos.set(new SoftReference<ByteArrayOutputStream>(baos));
        }
        baos.reset();
        return baos;
    }

    public static class StateValueSerdeFactory
    implements SerdeFactory<StateValue<?>> {
        public Serde<StateValue<?>> getSerde(String name, Config config) {
            return new StateValueSerde();
        }

        public static class StateValueSerde
        implements Serde<StateValue<?>> {
            public StateValue<?> fromBytes(byte[] bytes) {
                return StateValue.of(bytes);
            }

            public byte[] toBytes(StateValue<?> stateValue) {
                return stateValue == null ? null : stateValue.getValueBytes();
            }
        }
    }

    public static class StateValue<T>
    implements Serializable {
        private T value;
        private Coder<T> valueCoder;
        private byte[] valueBytes;

        private StateValue(T value, Coder<T> valueCoder, byte[] valueBytes) {
            this.value = value;
            this.valueCoder = valueCoder;
            this.valueBytes = valueBytes;
        }

        public static <T> StateValue<T> of(T value, Coder<T> valueCoder) {
            return new StateValue<T>(value, valueCoder, null);
        }

        public static <T> StateValue<T> of(byte[] valueBytes) {
            return new StateValue<Object>(null, null, valueBytes);
        }

        public T getValue(Coder<T> coder) {
            if (this.value == null && this.valueBytes != null) {
                if (this.valueCoder == null) {
                    this.valueCoder = coder;
                }
                try {
                    this.value = this.valueCoder.decode((InputStream)new ByteArrayInputStream(this.valueBytes));
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not decode state", e);
                }
            }
            return this.value;
        }

        public byte[] getValueBytes() {
            if (this.valueBytes == null && this.value != null) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                try {
                    this.valueCoder.encode(this.value, (OutputStream)baos);
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not encode state value: " + this.value, e);
                }
                this.valueBytes = baos.toByteArray();
            }
            return this.valueBytes;
        }
    }

    public static class ByteArraySerdeFactory
    implements SerdeFactory<ByteArray> {
        public Serde<ByteArray> getSerde(String name, Config config) {
            return new ByteArraySerde();
        }

        public static class ByteArraySerde
        implements Serde<ByteArray> {
            public byte[] toBytes(ByteArray byteArray) {
                return byteArray.value;
            }

            public ByteArray fromBytes(byte[] bytes) {
                return ByteArray.of(bytes);
            }
        }
    }

    public static class ByteArray
    implements Serializable,
    Comparable<ByteArray> {
        private final byte[] value;

        public static ByteArray of(byte[] value) {
            return new ByteArray(value);
        }

        private ByteArray(byte[] value) {
            this.value = value;
        }

        public byte[] getValue() {
            return this.value;
        }

        public boolean equals(@Nullable Object o) {
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ByteArray byteArray = (ByteArray)o;
            return Arrays.equals(this.value, byteArray.value);
        }

        public int hashCode() {
            return this.value != null ? Arrays.hashCode(this.value) : 0;
        }

        @Override
        public int compareTo(ByteArray other) {
            return UnsignedBytes.lexicographicalComparator().compare(this.value, other.value);
        }
    }

    private class SamzaWatermarkHoldState
    extends AbstractSamzaState<Instant>
    implements WatermarkHoldState {
        private final TimestampCombiner timestampCombiner;

        public <V extends State> SamzaWatermarkHoldState(StateNamespace namespace, StateTag<V> address, TimestampCombiner timestampCombiner) {
            super(namespace, (StateTag<? extends State>)address, InstantCoder.of());
            this.timestampCombiner = timestampCombiner;
        }

        public void add(Instant value) {
            Instant combinedValue;
            Instant currentValue = (Instant)this.readInternal();
            Instant instant = combinedValue = currentValue == null ? value : this.timestampCombiner.combine(new Instant[]{currentValue, value});
            if (!combinedValue.equals((Object)currentValue)) {
                this.writeInternal(combinedValue);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return this.isEmptyInternal();
        }

        public Instant read() {
            return (Instant)this.readInternal();
        }

        public TimestampCombiner getTimestampCombiner() {
            return this.timestampCombiner;
        }

        public WatermarkHoldState readLater() {
            return this;
        }

        public void clear() {
            this.clearInternal();
        }
    }

    private class SamzaAccumulatorCombiningState<InT, AccumT, OutT>
    extends AbstractSamzaState<AccumT>
    implements CombiningState<InT, AccumT, OutT> {
        private final Combine.CombineFn<InT, AccumT, OutT> combineFn;

        protected SamzaAccumulatorCombiningState(StateNamespace namespace, StateTag<? extends State> address, Coder<AccumT> coder, Combine.CombineFn<InT, AccumT, OutT> combineFn) {
            super(namespace, address, coder);
            this.combineFn = combineFn;
        }

        public void clear() {
            this.clearInternal();
        }

        public void add(InT value) {
            AccumT accum = this.getAccum();
            Object current = this.combineFn.addInput(accum, value);
            this.writeInternal(current);
        }

        public ReadableState<Boolean> isEmpty() {
            return this.isEmptyInternal();
        }

        public AccumT getAccum() {
            Object accum = this.readInternal();
            return (AccumT)(accum != null ? accum : this.combineFn.createAccumulator());
        }

        public void addAccum(AccumT accum) {
            AccumT currentAccum = this.getAccum();
            Object mergedAccum = this.mergeAccumulators(Arrays.asList(currentAccum, accum));
            this.writeInternal(mergedAccum);
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators);
        }

        public CombiningState<InT, AccumT, OutT> readLater() {
            return this;
        }

        @Nonnull
        public OutT read() {
            AccumT accum = this.getAccum();
            Object output = this.combineFn.extractOutput(accum);
            if (this.combineFn instanceof UpdatingCombineFn) {
                AccumT updatedAccum = ((UpdatingCombineFn)this.combineFn).updateAfterFiring(accum);
                this.writeInternal(updatedAccum);
            }
            return (OutT)output;
        }
    }

    private class SamzaMapStateImpl<KeyT, ValueT>
    extends AbstractSamzaState<ValueT>
    implements SamzaMapState<KeyT, ValueT>,
    KeyValueIteratorState {
        private final Coder<KeyT> keyCoder;
        private final int storeKeySize;
        private final List<KeyValueIterator<ByteArray, StateValue<ValueT>>> openIterators;
        private int maxKeySize;

        protected SamzaMapStateImpl(StateNamespace namespace, StateTag<? extends State> address, Coder<KeyT> keyCoder, Coder<ValueT> valueCoder) {
            super(namespace, address, valueCoder);
            this.openIterators = Collections.synchronizedList(new ArrayList());
            this.keyCoder = keyCoder;
            this.storeKeySize = this.getEncodedStoreKeyBytes().length;
            this.maxKeySize = this.storeKeySize + 100000;
        }

        public void put(KeyT key, ValueT value) {
            ByteArray encodedKey = this.encodeKey(key);
            this.maxKeySize = Math.max(this.maxKeySize, encodedKey.getValue().length);
            this.store.put((Object)encodedKey, StateValue.of(value, this.coder));
        }

        public @Nullable ReadableState<ValueT> computeIfAbsent(KeyT key, Function<? super KeyT, ? extends ValueT> mappingFunction) {
            ByteArray encodedKey = this.encodeKey(key);
            Object current = this.decodeValue((StateValue)this.store.get((Object)encodedKey));
            if (current == null) {
                this.put(key, mappingFunction.apply(key));
            }
            return current == null ? null : ReadableStates.immediate(current);
        }

        public void remove(KeyT key) {
            this.store.delete((Object)this.encodeKey(key));
        }

        public ReadableState<ValueT> get(KeyT key) {
            return this.getOrDefault(key, null);
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> getOrDefault(final KeyT key, final @Nullable ValueT defaultValue) {
            return new ReadableState<ValueT>(){

                public @Nullable ValueT read() {
                    Object value = SamzaMapStateImpl.this.decodeValue((StateValue)SamzaMapStateImpl.this.store.get((Object)SamzaMapStateImpl.this.encodeKey(key)));
                    return value != null ? value : defaultValue;
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> readLater() {
                    return this;
                }
            };
        }

        public ReadableState<Iterable<KeyT>> keys() {
            return new ReadableState<Iterable<KeyT>>(){

                public Iterable<KeyT> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> SamzaMapStateImpl.this.decodeKey((ByteArray)entry.getKey()));
                }

                public ReadableState<Iterable<KeyT>> readLater() {
                    return this;
                }
            };
        }

        public ReadableState<Iterable<ValueT>> values() {
            return new ReadableState<Iterable<ValueT>>(){

                public Iterable<ValueT> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> SamzaMapStateImpl.this.decodeValue((StateValue)entry.getValue()));
                }

                public ReadableState<Iterable<ValueT>> readLater() {
                    return this;
                }
            };
        }

        public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
            return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>(){

                public Iterable<Map.Entry<KeyT, ValueT>> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((ByteArray)entry.getKey()), SamzaMapStateImpl.this.decodeValue((StateValue)entry.getValue())));
                }

                public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }
            };
        }

        public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
            final ReadableState<Iterable<KeyT>> keys = this.keys();
            return new ReadableState<Boolean>(){

                public @Nullable Boolean read() {
                    return Iterables.isEmpty((Iterable)((Iterable)keys.read()));
                }

                public @UnknownKeyFor @NonNull @Initialized ReadableState<Boolean> readLater() {
                    keys.readLater();
                    return this;
                }
            };
        }

        @Override
        public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator() {
            ByteArray maxKey = this.createMaxKey();
            final KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            this.openIterators.add(kvIter);
            return new ReadableState<Iterator<Map.Entry<KeyT, ValueT>>>(){

                public @Nullable Iterator<Map.Entry<KeyT, ValueT>> read() {
                    return new Iterator<Map.Entry<KeyT, ValueT>>(){

                        @Override
                        public boolean hasNext() {
                            boolean hasNext = kvIter.hasNext();
                            if (!hasNext) {
                                kvIter.close();
                                SamzaMapStateImpl.this.openIterators.remove(kvIter);
                            }
                            return hasNext;
                        }

                        @Override
                        public Map.Entry<KeyT, ValueT> next() {
                            Entry entry = (Entry)kvIter.next();
                            return new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((ByteArray)entry.getKey()), SamzaMapStateImpl.this.decodeValue((StateValue)entry.getValue()));
                        }
                    };
                }

                public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }
            };
        }

        private <OutputT> Iterable<OutputT> createIterable(SerializableFunction<Entry<ByteArray, StateValue<ValueT>>, OutputT> fn) {
            ByteArray maxKey = this.createMaxKey();
            KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            ImmutableList iterable = ImmutableList.copyOf((Iterator)kvIter);
            kvIter.close();
            return new Iterable<OutputT>((List)iterable, fn){
                final /* synthetic */ List val$iterable;
                final /* synthetic */ SerializableFunction val$fn;
                {
                    this.val$iterable = list;
                    this.val$fn = serializableFunction;
                }

                @Override
                public Iterator<OutputT> iterator() {
                    final Iterator iter = this.val$iterable.iterator();
                    return new Iterator<OutputT>(){

                        @Override
                        public boolean hasNext() {
                            return iter.hasNext();
                        }

                        @Override
                        public OutputT next() {
                            return val$fn.apply((Object)((Entry)iter.next()));
                        }
                    };
                }
            };
        }

        public void clear() {
            ByteArray maxKey = this.createMaxKey();
            KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            while (kvIter.hasNext()) {
                this.store.delete((Object)((ByteArray)((Entry)kvIter.next()).getKey()));
            }
            kvIter.close();
        }

        private ByteArray encodeKey(KeyT key) {
            try {
                ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
                baos.write(this.getEncodedStoreKeyBytes());
                this.keyCoder.encode(key, (OutputStream)baos);
                return ByteArray.of(baos.toByteArray());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private KeyT decodeKey(ByteArray keyBytes) {
            try {
                byte[] realKey = Arrays.copyOfRange(keyBytes.value, this.storeKeySize, keyBytes.value.length);
                return (KeyT)this.keyCoder.decode((InputStream)new ByteArrayInputStream(realKey));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private ByteArray createMaxKey() {
            byte[] maxKey = new byte[this.maxKeySize];
            Arrays.fill(maxKey, (byte)-1);
            byte[] encodedKey = this.getEncodedStoreKeyBytes();
            System.arraycopy(encodedKey, 0, maxKey, 0, encodedKey.length);
            return ByteArray.of(maxKey);
        }

        @Override
        public void closeIterators() {
            this.openIterators.forEach(KeyValueIterator::close);
            this.openIterators.clear();
        }
    }

    private class SamzaSetStateImpl<T>
    implements SamzaSetState<T>,
    KeyValueIteratorState {
        private final SamzaMapStateImpl<T, Boolean> mapState;

        private SamzaSetStateImpl(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            this.mapState = new SamzaMapStateImpl(namespace, (StateTag<? extends State>)((StateTag<State>)address), coder, BooleanCoder.of());
        }

        public ReadableState<Boolean> contains(T t) {
            return this.mapState.get(t);
        }

        public @Nullable ReadableState<Boolean> addIfAbsent(T t) {
            return this.mapState.putIfAbsent(t, true);
        }

        public void remove(T t) {
            this.mapState.remove(t);
        }

        public void add(T value) {
            this.mapState.put(value, true);
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    return Iterables.isEmpty((Iterable)((Iterable)SamzaSetStateImpl.this.mapState.entries().read()));
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public Iterable<T> read() {
            return (Iterable)this.mapState.keys().read();
        }

        public SetState<T> readLater() {
            return this;
        }

        public void clear() {
            this.mapState.clear();
        }

        @Override
        public ReadableState<Iterator<T>> readIterator() {
            final Iterator iter = (Iterator)this.mapState.readIterator().read();
            return new ReadableState<Iterator<T>>(){

                public @Nullable Iterator<T> read() {
                    return new Iterator<T>(){

                        @Override
                        public boolean hasNext() {
                            return iter.hasNext();
                        }

                        @Override
                        public T next() {
                            return ((Map.Entry)iter.next()).getKey();
                        }
                    };
                }

                public ReadableState<Iterator<T>> readLater() {
                    return this;
                }
            };
        }

        @Override
        public void closeIterators() {
            this.mapState.closeIterators();
        }
    }

    private class SamzaBagState<T>
    extends AbstractSamzaState<T>
    implements BagState<T> {
        private SamzaBagState(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            super(namespace, address, coder);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void add(T value) {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                ByteArray encodedKey = this.encodeKey(size);
                this.store.put((Object)encodedKey, StateValue.of(value, this.coder));
                this.store.put((Object)this.getEncodedStoreKey(), StateValue.of(Ints.toByteArray((int)(size + 1))));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ReadableState<Boolean> isEmpty() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                return this.isEmptyInternal();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nonnull
        public List<T> read() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                if (size == 0) {
                    return Collections.emptyList();
                }
                ArrayList values = new ArrayList(size);
                ArrayList<ByteArray> keys = new ArrayList<ByteArray>(size);
                for (int start = 0; start < size; start += SamzaStoreStateInternals.this.batchGetSize) {
                    int end = Math.min(size, start + SamzaStoreStateInternals.this.batchGetSize);
                    for (int i = start; i < end; ++i) {
                        keys.add(this.encodeKey(i));
                    }
                    this.store.getAll(keys).values().forEach(value -> values.add(this.decodeValue(value)));
                    keys.clear();
                }
                return values;
            }
        }

        public BagState<T> readLater() {
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void clear() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                if (size != 0) {
                    ArrayList<ByteArray> keys = new ArrayList<ByteArray>(size);
                    for (int i = 0; i < size; ++i) {
                        keys.add(this.encodeKey(i));
                    }
                    this.store.deleteAll(keys);
                    this.store.delete((Object)this.getEncodedStoreKey());
                }
            }
        }

        private int getSize() {
            StateValue stateSize = (StateValue)this.store.get((Object)this.getEncodedStoreKey());
            return stateSize == null || stateSize.valueBytes == null ? 0 : Ints.fromByteArray((byte[])stateSize.valueBytes);
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private ByteArray encodeKey(int size) {
            ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
            try (DataOutputStream dos = new DataOutputStream(baos);){
                dos.write(this.getEncodedStoreKeyBytes());
                dos.writeInt(size);
                ByteArray byteArray = ByteArray.of(baos.toByteArray());
                return byteArray;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private class SamzaValueState<T>
    extends AbstractSamzaState<T>
    implements ValueState<T> {
        private SamzaValueState(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            super(namespace, address, coder);
        }

        public void write(T input) {
            this.writeInternal(input);
        }

        public T read() {
            return this.readInternal();
        }

        public ValueState<T> readLater() {
            return this;
        }

        public void clear() {
            this.clearInternal();
        }
    }

    private abstract class AbstractSamzaState<T> {
        private final StateNamespace namespace;
        private final String addressId;
        private final boolean isBeamStore;
        private final String stageId;
        private final byte[] keyBytes;
        private byte[] encodedStoreKey;
        protected final Coder<T> coder;
        protected final KeyValueStore<ByteArray, StateValue<T>> store;

        protected AbstractSamzaState(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            this.coder = coder;
            this.namespace = namespace;
            this.addressId = address.getId();
            this.isBeamStore = !SamzaStoreStateInternals.this.stores.containsKey(address.getId());
            this.store = this.isBeamStore ? (KeyValueStore)SamzaStoreStateInternals.this.stores.get(SamzaStoreStateInternals.BEAM_STORE) : (KeyValueStore)SamzaStoreStateInternals.this.stores.get(address.getId());
            this.stageId = SamzaStoreStateInternals.this.stageId;
            this.keyBytes = SamzaStoreStateInternals.this.keyBytes;
        }

        protected void clearInternal() {
            this.store.delete((Object)this.getEncodedStoreKey());
        }

        protected void writeInternal(T value) {
            this.store.put((Object)this.getEncodedStoreKey(), StateValue.of(value, this.coder));
        }

        protected T readInternal() {
            StateValue stateValue = (StateValue)this.store.get((Object)this.getEncodedStoreKey());
            return this.decodeValue(stateValue);
        }

        protected ReadableState<Boolean> isEmptyInternal() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    return AbstractSamzaState.this.store.get((Object)AbstractSamzaState.this.getEncodedStoreKey()) == null;
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        protected ByteArray getEncodedStoreKey() {
            return ByteArray.of(this.getEncodedStoreKeyBytes());
        }

        protected byte[] getEncodedStoreKeyBytes() {
            if (this.encodedStoreKey == null) {
                ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
                try (DataOutputStream dos = new DataOutputStream(baos);){
                    dos.write(this.keyBytes);
                    dos.writeUTF(this.namespace.stringKey());
                    if (this.isBeamStore) {
                        dos.writeUTF(this.stageId);
                        dos.writeUTF(this.addressId);
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not encode full address for state: " + this.addressId, e);
                }
                this.encodedStoreKey = baos.toByteArray();
            }
            return this.encodedStoreKey;
        }

        protected T decodeValue(StateValue<T> stateValue) {
            return stateValue == null ? null : (T)stateValue.getValue(this.coder);
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AbstractSamzaState that = (AbstractSamzaState)o;
            if (!(!this.isBeamStore && !that.isBeamStore || this.isBeamStore && that.isBeamStore && this.stageId.equals(that.stageId))) {
                return false;
            }
            return Arrays.equals(this.keyBytes, that.keyBytes) && this.addressId.equals(that.addressId) && this.namespace.equals(that.namespace);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + Arrays.hashCode(this.getEncodedStoreKeyBytes());
            return result;
        }
    }

    static interface KeyValueIteratorState {
        public void closeIterators();
    }

    public static class Factory<K>
    implements StateInternalsFactory<K> {
        private final String stageId;
        private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
        private final Coder<K> keyCoder;
        private final int batchGetSize;

        public Factory(String stageId, Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores, Coder<K> keyCoder, int batchGetSize) {
            this.stageId = stageId;
            this.stores = stores;
            this.keyCoder = keyCoder;
            this.batchGetSize = batchGetSize;
        }

        public StateInternals stateInternalsForKey(@Nullable K key) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(baos);
            try {
                if (key != null) {
                    this.keyCoder.encode(key, (OutputStream)baos);
                }
                byte[] keyBytes = baos.toByteArray();
                baos.reset();
                dos.write(keyBytes.length);
                dos.write(keyBytes);
            }
            catch (IOException e) {
                throw new RuntimeException("Cannot encode key for state store", e);
            }
            return new SamzaStoreStateInternals(this.stores, key, baos.toByteArray(), this.stageId, this.batchGetSize);
        }
    }
}

