/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.transforms.UpdatingCombineFn;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateContexts;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.samza.context.TaskContext;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.joda.time.Instant;

public class SamzaStoreStateInternals<K>
implements StateInternals {
    static final String BEAM_STORE = "beamStore";
    private static ThreadLocal<SoftReference<ByteArrayOutputStream>> threadLocalBaos = new ThreadLocal();
    private final Map<String, KeyValueStore<byte[], byte[]>> stores;
    private final K key;
    private final byte[] keyBytes;
    private final int batchGetSize;
    private final String stageId;

    private SamzaStoreStateInternals(Map<String, KeyValueStore<byte[], byte[]>> stores, @Nullable K key, @Nullable byte[] keyBytes, String stageId, int batchGetSize) {
        this.stores = stores;
        this.key = key;
        this.keyBytes = keyBytes;
        this.batchGetSize = batchGetSize;
        this.stageId = stageId;
    }

    static KeyValueStore<byte[], byte[]> getBeamStore(TaskContext context) {
        return context.getStore(BEAM_STORE);
    }

    static Factory createStateInternalFactory(String id, Coder<?> keyCoder, TaskContext context, SamzaPipelineOptions pipelineOptions, DoFnSignature signature) {
        Coder<?> stateKeyCoder;
        int batchGetSize = pipelineOptions.getStoreBatchGetSize();
        HashMap<String, KeyValueStore<byte[], byte[]>> stores = new HashMap<String, KeyValueStore<byte[], byte[]>>();
        stores.put(BEAM_STORE, SamzaStoreStateInternals.getBeamStore(context));
        if (keyCoder != null) {
            signature.stateDeclarations().keySet().forEach(stateId -> stores.put((String)stateId, (KeyValueStore<byte[], byte[]>)context.getStore(stateId)));
            stateKeyCoder = keyCoder;
        } else {
            stateKeyCoder = VoidCoder.of();
        }
        return new Factory(Objects.toString(id), stores, stateKeyCoder, batchGetSize);
    }

    public K getKey() {
        return this.key;
    }

    public <T extends State> T state(StateNamespace stateNamespace, StateTag<T> stateTag) {
        return this.state(stateNamespace, stateTag, StateContexts.nullContext());
    }

    public <V extends State> V state(final StateNamespace namespace, final StateTag<V> address, StateContext<?> stateContext) {
        return (V)address.bind(new StateTag.StateBinder(){

            public <T> ValueState<T> bindValue(StateTag<ValueState<T>> spec, Coder<T> coder) {
                return new SamzaValueState(namespace, address, coder);
            }

            public <T> BagState<T> bindBag(StateTag<BagState<T>> spec, Coder<T> elemCoder) {
                return new SamzaBagState(namespace, address, elemCoder);
            }

            public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
                return new SamzaSetStateImpl(namespace, address, elemCoder);
            }

            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
                return new SamzaMapStateImpl<KeyT, ValueT>(namespace, (StateTag<? extends State>)address, mapKeyCoder, mapValueCoder);
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                return new SamzaAccumulatorCombiningState<InputT, AccumT, OutputT>(namespace, (StateTag<? extends State>)address, accumCoder, combineFn);
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
                throw new UnsupportedOperationException(String.format("%s is not supported", CombiningState.class.getSimpleName()));
            }

            public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
                return new SamzaWatermarkHoldState(namespace, address, timestampCombiner);
            }
        });
    }

    private static ByteArrayOutputStream getThreadLocalBaos() {
        ByteArrayOutputStream baos;
        SoftReference<ByteArrayOutputStream> refBaos = threadLocalBaos.get();
        ByteArrayOutputStream byteArrayOutputStream = baos = refBaos == null ? null : refBaos.get();
        if (baos == null) {
            baos = new ByteArrayOutputStream();
            threadLocalBaos.set(new SoftReference<ByteArrayOutputStream>(baos));
        }
        baos.reset();
        return baos;
    }

    private class SamzaWatermarkHoldState
    extends AbstractSamzaState<Instant>
    implements WatermarkHoldState {
        private final TimestampCombiner timestampCombiner;

        public <V extends State> SamzaWatermarkHoldState(StateNamespace namespace, StateTag<V> address, TimestampCombiner timestampCombiner) {
            super(namespace, (StateTag<? extends State>)address, InstantCoder.of());
            this.timestampCombiner = timestampCombiner;
        }

        public void add(Instant value) {
            Instant combinedValue;
            Instant currentValue = (Instant)this.readInternal();
            Instant instant = combinedValue = currentValue == null ? value : this.timestampCombiner.combine(new Instant[]{currentValue, value});
            if (!combinedValue.equals((Object)currentValue)) {
                this.writeInternal(combinedValue);
            }
        }

        public ReadableState<Boolean> isEmpty() {
            return this.isEmptyInternal();
        }

        public Instant read() {
            return (Instant)this.readInternal();
        }

        public TimestampCombiner getTimestampCombiner() {
            return this.timestampCombiner;
        }

        public WatermarkHoldState readLater() {
            return this;
        }

        public void clear() {
            this.clearInternal();
        }
    }

    private class SamzaAccumulatorCombiningState<InT, AccumT, OutT>
    extends AbstractSamzaState<AccumT>
    implements CombiningState<InT, AccumT, OutT> {
        private final Combine.CombineFn<InT, AccumT, OutT> combineFn;

        protected SamzaAccumulatorCombiningState(StateNamespace namespace, StateTag<? extends State> address, Coder<AccumT> coder, Combine.CombineFn<InT, AccumT, OutT> combineFn) {
            super(namespace, address, coder);
            this.combineFn = combineFn;
        }

        public void clear() {
            this.clearInternal();
        }

        public void add(InT value) {
            AccumT accum = this.getAccum();
            Object current = this.combineFn.addInput(accum, value);
            this.writeInternal(current);
        }

        public ReadableState<Boolean> isEmpty() {
            return this.isEmptyInternal();
        }

        public AccumT getAccum() {
            Object accum = this.readInternal();
            return (AccumT)(accum != null ? accum : this.combineFn.createAccumulator());
        }

        public void addAccum(AccumT accum) {
            AccumT currentAccum = this.getAccum();
            Object mergedAccum = this.mergeAccumulators(Arrays.asList(currentAccum, accum));
            this.writeInternal(mergedAccum);
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(accumulators);
        }

        public CombiningState<InT, AccumT, OutT> readLater() {
            return this;
        }

        @Nonnull
        public OutT read() {
            AccumT accum = this.getAccum();
            Object output = this.combineFn.extractOutput(accum);
            if (this.combineFn instanceof UpdatingCombineFn) {
                AccumT updatedAccum = ((UpdatingCombineFn)this.combineFn).updateAfterFiring(accum);
                this.writeInternal(updatedAccum);
            }
            return (OutT)output;
        }
    }

    private class SamzaMapStateImpl<KeyT, ValueT>
    extends AbstractSamzaState<ValueT>
    implements SamzaMapState<KeyT, ValueT>,
    KeyValueIteratorState {
        private final Coder<KeyT> keyCoder;
        private final int storeKeySize;
        private final List<KeyValueIterator<byte[], byte[]>> openIterators;
        private int maxKeySize;

        protected SamzaMapStateImpl(StateNamespace namespace, StateTag<? extends State> address, Coder<KeyT> keyCoder, Coder<ValueT> valueCoder) {
            super(namespace, address, valueCoder);
            this.openIterators = Collections.synchronizedList(new ArrayList());
            this.keyCoder = keyCoder;
            this.storeKeySize = this.getEncodedStoreKey().length;
            this.maxKeySize = this.storeKeySize + 100000;
        }

        public void put(KeyT key, ValueT value) {
            byte[] encodedKey = this.encodeKey(key);
            this.maxKeySize = Math.max(this.maxKeySize, encodedKey.length);
            this.store.put((Object)encodedKey, (Object)this.encodeValue(value));
        }

        @Nullable
        public ReadableState<ValueT> putIfAbsent(KeyT key, ValueT value) {
            byte[] encodedKey = this.encodeKey(key);
            Object current = this.decodeValue((byte[])this.store.get((Object)encodedKey));
            if (current == null) {
                this.put(key, value);
            }
            return current == null ? null : ReadableStates.immediate(current);
        }

        public void remove(KeyT key) {
            this.store.delete((Object)this.encodeKey(key));
        }

        public ReadableState<ValueT> get(KeyT key) {
            Object value = this.decodeValue((byte[])this.store.get((Object)this.encodeKey(key)));
            return ReadableStates.immediate(value);
        }

        public ReadableState<Iterable<KeyT>> keys() {
            return new ReadableState<Iterable<KeyT>>(){

                public Iterable<KeyT> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> SamzaMapStateImpl.this.decodeKey((byte[])entry.getKey()));
                }

                public ReadableState<Iterable<KeyT>> readLater() {
                    return this;
                }
            };
        }

        public ReadableState<Iterable<ValueT>> values() {
            return new ReadableState<Iterable<ValueT>>(){

                public Iterable<ValueT> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> SamzaMapStateImpl.this.decodeValue((byte[])entry.getValue()));
                }

                public ReadableState<Iterable<ValueT>> readLater() {
                    return this;
                }
            };
        }

        public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
            return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>(){

                public Iterable<Map.Entry<KeyT, ValueT>> read() {
                    return SamzaMapStateImpl.this.createIterable((SerializableFunction & Serializable)entry -> new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((byte[])entry.getKey()), SamzaMapStateImpl.this.decodeValue((byte[])entry.getValue())));
                }

                public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }
            };
        }

        @Override
        public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator() {
            byte[] maxKey = this.createMaxKey();
            final KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            this.openIterators.add((KeyValueIterator<byte[], byte[]>)kvIter);
            return new ReadableState<Iterator<Map.Entry<KeyT, ValueT>>>(){

                @Nullable
                public Iterator<Map.Entry<KeyT, ValueT>> read() {
                    return new Iterator<Map.Entry<KeyT, ValueT>>(){

                        @Override
                        public boolean hasNext() {
                            boolean hasNext = kvIter.hasNext();
                            if (!hasNext) {
                                kvIter.close();
                                SamzaMapStateImpl.this.openIterators.remove(kvIter);
                            }
                            return hasNext;
                        }

                        @Override
                        public Map.Entry<KeyT, ValueT> next() {
                            Entry entry = (Entry)kvIter.next();
                            return new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((byte[])entry.getKey()), SamzaMapStateImpl.this.decodeValue((byte[])entry.getValue()));
                        }
                    };
                }

                public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }
            };
        }

        private <OutputT> Iterable<OutputT> createIterable(SerializableFunction<Entry<byte[], byte[]>, OutputT> fn) {
            byte[] maxKey = this.createMaxKey();
            KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            ImmutableList iterable = ImmutableList.copyOf((Iterator)kvIter);
            kvIter.close();
            return new Iterable<OutputT>((List)iterable, fn){
                final /* synthetic */ List val$iterable;
                final /* synthetic */ SerializableFunction val$fn;
                {
                    this.val$iterable = list;
                    this.val$fn = serializableFunction;
                }

                @Override
                public Iterator<OutputT> iterator() {
                    final Iterator iter = this.val$iterable.iterator();
                    return new Iterator<OutputT>(){

                        @Override
                        public boolean hasNext() {
                            return iter.hasNext();
                        }

                        @Override
                        public OutputT next() {
                            return val$fn.apply((Object)((Entry)iter.next()));
                        }
                    };
                }
            };
        }

        public void clear() {
            byte[] maxKey = this.createMaxKey();
            KeyValueIterator kvIter = this.store.range((Object)this.getEncodedStoreKey(), (Object)maxKey);
            while (kvIter.hasNext()) {
                this.store.delete((Object)((byte[])((Entry)kvIter.next()).getKey()));
            }
            kvIter.close();
        }

        private byte[] encodeKey(KeyT key) {
            try {
                ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
                baos.write(this.getEncodedStoreKey());
                this.keyCoder.encode(key, (OutputStream)baos);
                return baos.toByteArray();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private KeyT decodeKey(byte[] keyBytes) {
            try {
                byte[] realKey = Arrays.copyOfRange(keyBytes, this.storeKeySize, keyBytes.length);
                return (KeyT)this.keyCoder.decode((InputStream)new ByteArrayInputStream(realKey));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private byte[] createMaxKey() {
            byte[] maxKey = new byte[this.maxKeySize];
            Arrays.fill(maxKey, (byte)-1);
            byte[] encodedKey = this.getEncodedStoreKey();
            System.arraycopy(encodedKey, 0, maxKey, 0, encodedKey.length);
            return maxKey;
        }

        @Override
        public void closeIterators() {
            this.openIterators.forEach(KeyValueIterator::close);
            this.openIterators.clear();
        }
    }

    private class SamzaSetStateImpl<T>
    implements SamzaSetState<T>,
    KeyValueIteratorState {
        private final SamzaMapStateImpl<T, Boolean> mapState;

        private SamzaSetStateImpl(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            this.mapState = new SamzaMapStateImpl(namespace, (StateTag<? extends State>)((StateTag<State>)address), coder, BooleanCoder.of());
        }

        public ReadableState<Boolean> contains(T t) {
            return this.mapState.get(t);
        }

        @Nullable
        public ReadableState<Boolean> addIfAbsent(T t) {
            return this.mapState.putIfAbsent(t, true);
        }

        public void remove(T t) {
            this.mapState.remove(t);
        }

        public void add(T value) {
            this.mapState.put(value, true);
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    return Iterables.isEmpty((Iterable)((Iterable)SamzaSetStateImpl.this.mapState.entries().read()));
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        public Iterable<T> read() {
            return (Iterable)this.mapState.keys().read();
        }

        public SetState<T> readLater() {
            return this;
        }

        public void clear() {
            this.mapState.clear();
        }

        @Override
        public ReadableState<Iterator<T>> readIterator() {
            final Iterator iter = (Iterator)this.mapState.readIterator().read();
            return new ReadableState<Iterator<T>>(){

                @Nullable
                public Iterator<T> read() {
                    return new Iterator<T>(){

                        @Override
                        public boolean hasNext() {
                            return iter.hasNext();
                        }

                        @Override
                        public T next() {
                            return ((Map.Entry)iter.next()).getKey();
                        }
                    };
                }

                public ReadableState<Iterator<T>> readLater() {
                    return this;
                }
            };
        }

        @Override
        public void closeIterators() {
            this.mapState.closeIterators();
        }
    }

    private class SamzaBagState<T>
    extends AbstractSamzaState<T>
    implements BagState<T> {
        private SamzaBagState(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            super(namespace, address, coder);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void add(T value) {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                byte[] encodedKey = this.encodeKey(size);
                this.store.put((Object)encodedKey, (Object)this.encodeValue(value));
                this.store.put((Object)this.getEncodedStoreKey(), (Object)Ints.toByteArray((int)(size + 1)));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ReadableState<Boolean> isEmpty() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                return this.isEmptyInternal();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nonnull
        public List<T> read() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                if (size == 0) {
                    return Collections.emptyList();
                }
                ArrayList values = new ArrayList(size);
                ArrayList<byte[]> keys = new ArrayList<byte[]>(size);
                for (int start = 0; start < size; start += SamzaStoreStateInternals.this.batchGetSize) {
                    int end = Math.min(size, start + SamzaStoreStateInternals.this.batchGetSize);
                    for (int i = start; i < end; ++i) {
                        keys.add(this.encodeKey(i));
                    }
                    this.store.getAll(keys).values().forEach(value -> values.add(this.decodeValue((byte[])value)));
                    keys.clear();
                }
                return values;
            }
        }

        public BagState<T> readLater() {
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void clear() {
            KeyValueStore keyValueStore = this.store;
            synchronized (keyValueStore) {
                int size = this.getSize();
                if (size != 0) {
                    ArrayList<byte[]> keys = new ArrayList<byte[]>(size);
                    for (int i = 0; i < size; ++i) {
                        keys.add(this.encodeKey(i));
                    }
                    this.store.deleteAll(keys);
                    this.store.delete((Object)this.getEncodedStoreKey());
                }
            }
        }

        private int getSize() {
            byte[] sizeBytes = (byte[])this.store.get((Object)this.getEncodedStoreKey());
            return sizeBytes == null ? 0 : Ints.fromByteArray((byte[])sizeBytes);
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private byte[] encodeKey(int size) {
            ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
            try (DataOutputStream dos = new DataOutputStream(baos);){
                dos.write(this.getEncodedStoreKey());
                dos.writeInt(size);
                byte[] byArray = baos.toByteArray();
                return byArray;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private class SamzaValueState<T>
    extends AbstractSamzaState<T>
    implements ValueState<T> {
        private SamzaValueState(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            super(namespace, address, coder);
        }

        public void write(T input) {
            this.writeInternal(input);
        }

        public T read() {
            return this.readInternal();
        }

        public ValueState<T> readLater() {
            return this;
        }

        public void clear() {
            this.clearInternal();
        }
    }

    private abstract class AbstractSamzaState<T> {
        private final Coder<T> coder;
        private final byte[] encodedStoreKey;
        private final String namespace;
        protected final KeyValueStore<byte[], byte[]> store;

        protected AbstractSamzaState(StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
            this.coder = coder;
            this.namespace = namespace.stringKey();
            KeyValueStore userStore = (KeyValueStore)SamzaStoreStateInternals.this.stores.get(address.getId());
            this.store = userStore != null ? userStore : (KeyValueStore)SamzaStoreStateInternals.this.stores.get(SamzaStoreStateInternals.BEAM_STORE);
            ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
            try (DataOutputStream dos = new DataOutputStream(baos);){
                dos.write(SamzaStoreStateInternals.this.keyBytes);
                dos.writeUTF(namespace.stringKey());
                if (userStore == null) {
                    dos.writeUTF(SamzaStoreStateInternals.this.stageId);
                    dos.writeUTF(address.getId());
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Could not encode full address for state: " + address.getId(), e);
            }
            this.encodedStoreKey = baos.toByteArray();
        }

        protected void clearInternal() {
            this.store.delete((Object)this.getEncodedStoreKey());
        }

        protected void writeInternal(T value) {
            this.store.put((Object)this.getEncodedStoreKey(), (Object)this.encodeValue(value));
        }

        protected T readInternal() {
            byte[] valueBytes = (byte[])this.store.get((Object)this.getEncodedStoreKey());
            return this.decodeValue(valueBytes);
        }

        protected ReadableState<Boolean> isEmptyInternal() {
            return new ReadableState<Boolean>(){

                public Boolean read() {
                    return AbstractSamzaState.this.store.get((Object)AbstractSamzaState.this.getEncodedStoreKey()) == null;
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        protected byte[] getEncodedStoreKey() {
            return this.encodedStoreKey;
        }

        protected byte[] encodeValue(T value) {
            ByteArrayOutputStream baos = SamzaStoreStateInternals.getThreadLocalBaos();
            try {
                this.coder.encode(value, (OutputStream)baos);
            }
            catch (IOException e) {
                throw new RuntimeException("Could not encode state value: " + value, e);
            }
            return baos.toByteArray();
        }

        protected T decodeValue(byte[] valueBytes) {
            if (valueBytes != null) {
                try {
                    return (T)this.coder.decode((InputStream)new ByteArrayInputStream(valueBytes));
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not decode state", e);
                }
            }
            return null;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AbstractSamzaState that = (AbstractSamzaState)o;
            return Arrays.equals(this.encodedStoreKey, that.encodedStoreKey);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + Arrays.hashCode(this.encodedStoreKey);
            return result;
        }
    }

    static interface KeyValueIteratorState {
        public void closeIterators();
    }

    public static class Factory<K>
    implements StateInternalsFactory<K> {
        private final String stageId;
        private final Map<String, KeyValueStore<byte[], byte[]>> stores;
        private final Coder<K> keyCoder;
        private final int batchGetSize;

        public Factory(String stageId, Map<String, KeyValueStore<byte[], byte[]>> stores, Coder<K> keyCoder, int batchGetSize) {
            this.stageId = stageId;
            this.stores = stores;
            this.keyCoder = keyCoder;
            this.batchGetSize = batchGetSize;
        }

        public StateInternals stateInternalsForKey(@Nullable K key) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(baos);
            try {
                if (key != null) {
                    this.keyCoder.encode(key, (OutputStream)baos);
                }
                byte[] keyBytes = baos.toByteArray();
                baos.reset();
                dos.write(keyBytes.length);
                dos.write(keyBytes);
            }
            catch (IOException e) {
                throw new RuntimeException("Cannot encode key for state store", e);
            }
            return new SamzaStoreStateInternals(this.stores, key, baos.toByteArray(), this.stageId, this.batchGetSize);
        }
    }
}

