/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.stateful;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateContext;
import org.apache.beam.sdk.util.state.StateContexts;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.apache.beam.spark.repackaged.com.google.common.collect.HashBasedTable;
import org.apache.beam.spark.repackaged.com.google.common.collect.Table;
import org.joda.time.Instant;

class SparkStateInternals<K>
implements StateInternals<K> {
    private final K key;
    private final Table<String, String, byte[]> stateTable;

    private SparkStateInternals(K key) {
        this.key = key;
        this.stateTable = HashBasedTable.create();
    }

    private SparkStateInternals(K key, Table<String, String, byte[]> stateTable) {
        this.key = key;
        this.stateTable = stateTable;
    }

    static <K> SparkStateInternals<K> forKey(K key) {
        return new SparkStateInternals<K>(key);
    }

    static <K> SparkStateInternals<K> forKeyAndState(K key, Table<String, String, byte[]> stateTable) {
        return new SparkStateInternals<K>(key, stateTable);
    }

    public Table<String, String, byte[]> getState() {
        return this.stateTable;
    }

    public K getKey() {
        return this.key;
    }

    public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
        return this.state(namespace, address, StateContexts.nullContext());
    }

    public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
        return (T)address.bind((StateTag.StateBinder)new SparkStateBinder(this.key, namespace, c));
    }

    private final class SparkBagState<T>
    extends AbstractState<List<T>>
    implements BagState<T> {
        private SparkBagState(StateNamespace namespace, StateTag<?, BagState<T>> address, Coder<T> coder) {
            super(namespace, address, (Coder)ListCoder.of(coder));
        }

        public SparkBagState<T> readLater() {
            return this;
        }

        public List<T> read() {
            ArrayList value = (ArrayList)super.readValue();
            if (value == null) {
                value = new ArrayList();
            }
            return value;
        }

        public void add(T input) {
            Object value = this.read();
            value.add(input);
            this.writeValue(value);
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public ReadableState<Boolean> readLater() {
                    return this;
                }

                public Boolean read() {
                    return SparkStateInternals.this.stateTable.get(SparkBagState.this.namespace.stringKey(), SparkBagState.this.address.getId()) == null;
                }
            };
        }
    }

    private class SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
    extends AbstractState<AccumT>
    implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
        private final K key;
        private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;

        private SparkAccumulatorCombiningState(StateNamespace namespace, StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> coder, K key, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
            super(namespace, address, coder);
            this.key = key;
            this.combineFn = combineFn;
        }

        public SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
            return this;
        }

        public OutputT read() {
            return (OutputT)this.combineFn.extractOutput(this.key, this.getAccum());
        }

        public void add(InputT input) {
            AccumT accum = this.getAccum();
            this.combineFn.addInput(this.key, accum, input);
            this.writeValue(accum);
        }

        public AccumT getAccum() {
            Object accum = this.readValue();
            if (accum == null) {
                accum = this.combineFn.createAccumulator(this.key);
            }
            return (AccumT)accum;
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public ReadableState<Boolean> readLater() {
                    return this;
                }

                public Boolean read() {
                    return SparkStateInternals.this.stateTable.get(SparkAccumulatorCombiningState.this.namespace.stringKey(), SparkAccumulatorCombiningState.this.address.getId()) == null;
                }
            };
        }

        public void addAccum(AccumT accum) {
            accum = this.combineFn.mergeAccumulators(this.key, Arrays.asList(this.getAccum(), accum));
            this.writeValue(accum);
        }

        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            return (AccumT)this.combineFn.mergeAccumulators(this.key, accumulators);
        }
    }

    private class SparkWatermarkHoldState<W extends BoundedWindow>
    extends AbstractState<Instant>
    implements WatermarkHoldState<W> {
        private final OutputTimeFn<? super W> outputTimeFn;

        public SparkWatermarkHoldState(StateNamespace namespace, StateTag<?, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
            super(namespace, address, (Coder)InstantCoder.of());
            this.outputTimeFn = outputTimeFn;
        }

        public SparkWatermarkHoldState<W> readLater() {
            return this;
        }

        public Instant read() {
            return (Instant)this.readValue();
        }

        public void add(Instant outputTime) {
            Instant combined = this.read();
            combined = combined == null ? outputTime : this.outputTimeFn.combine(combined, outputTime);
            this.writeValue(combined);
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>(){

                public ReadableState<Boolean> readLater() {
                    return this;
                }

                public Boolean read() {
                    return SparkStateInternals.this.stateTable.get(SparkWatermarkHoldState.this.namespace.stringKey(), SparkWatermarkHoldState.this.address.getId()) == null;
                }
            };
        }

        public OutputTimeFn<? super W> getOutputTimeFn() {
            return this.outputTimeFn;
        }
    }

    private class SparkValueState<T>
    extends AbstractState<T>
    implements ValueState<T> {
        private SparkValueState(StateNamespace namespace, StateTag<?, ValueState<T>> address, Coder<T> coder) {
            super(namespace, address, coder);
        }

        public SparkValueState<T> readLater() {
            return this;
        }

        public T read() {
            return this.readValue();
        }

        public void write(T input) {
            this.writeValue(input);
        }
    }

    private class AbstractState<T> {
        final StateNamespace namespace;
        final StateTag<?, ? extends State> address;
        final Coder<T> coder;

        private AbstractState(StateNamespace namespace, StateTag<?, ? extends State> address, Coder<T> coder) {
            this.namespace = namespace;
            this.address = address;
            this.coder = coder;
        }

        T readValue() {
            byte[] buf = (byte[])SparkStateInternals.this.stateTable.get(this.namespace.stringKey(), this.address.getId());
            if (buf != null) {
                return CoderHelpers.fromByteArray(buf, this.coder);
            }
            return null;
        }

        void writeValue(T input) {
            SparkStateInternals.this.stateTable.put(this.namespace.stringKey(), this.address.getId(), CoderHelpers.toByteArray(input, this.coder));
        }

        public void clear() {
            SparkStateInternals.this.stateTable.remove(this.namespace.stringKey(), this.address.getId());
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AbstractState that = (AbstractState)o;
            return this.namespace.equals(that.namespace) && this.address.equals(that.address);
        }

        public int hashCode() {
            int result = this.namespace.hashCode();
            result = 31 * result + this.address.hashCode();
            return result;
        }
    }

    private class SparkStateBinder
    implements StateTag.StateBinder<K> {
        private final K key;
        private final StateNamespace namespace;
        private final StateContext<?> c;

        private SparkStateBinder(K key, StateNamespace namespace, StateContext<?> c) {
            this.key = key;
            this.namespace = namespace;
            this.c = c;
        }

        public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
            return new SparkValueState(this.namespace, address, coder);
        }

        public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
            return new SparkBagState(this.namespace, address, elemCoder);
        }

        public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) {
            throw new UnsupportedOperationException(String.format("%s is not supported", SetState.class.getSimpleName()));
        }

        public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<? super K, MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
            throw new UnsupportedOperationException(String.format("%s is not supported", MapState.class.getSimpleName()));
        }

        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
            return new SparkAccumulatorCombiningState(this.namespace, address, accumCoder, this.key, combineFn.asKeyedFn());
        }

        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
            return new SparkAccumulatorCombiningState(this.namespace, address, accumCoder, this.key, combineFn);
        }

        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
            return new SparkAccumulatorCombiningState(this.namespace, address, accumCoder, this.key, CombineFnUtil.bindContext(combineFn, this.c));
        }

        public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
            return new SparkWatermarkHoldState<W>(this.namespace, address, outputTimeFn);
        }
    }
}

