/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterStateHandler;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

@Internal
final class StatefulSinkWriterStateHandler<InputT, WriterStateT>
implements SinkWriterStateHandler<InputT> {
    private static final ListStateDescriptor<byte[]> WRITER_RAW_STATES_DESC = new ListStateDescriptor<byte[]>("writer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    private final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer;
    private final Collection<String> previousSinkStateNames;
    private final Sink<InputT> sink;
    private List<ListState<WriterStateT>> previousSinkStates = new ArrayList<ListState<WriterStateT>>();
    private ListState<WriterStateT> writerState;
    private StatefulSinkWriter<InputT, WriterStateT> sinkWriter;

    public StatefulSinkWriterStateHandler(SupportsWriterState<InputT, WriterStateT> sink) {
        Preconditions.checkArgument(sink instanceof Sink, "Should be an instance of " + Sink.class.getName());
        this.sink = (Sink)((Object)sink);
        List previousSinkStateNames = sink instanceof SupportsWriterState.WithCompatibleState ? ((SupportsWriterState.WithCompatibleState)((Object)sink)).getCompatibleWriterStateNames() : Collections.emptyList();
        this.writerStateSimpleVersionedSerializer = sink.getWriterStateSerializer();
        this.previousSinkStateNames = previousSinkStateNames;
    }

    @Override
    public SinkWriter<InputT> createWriter(WriterInitContext initContext, StateInitializationContext context) throws Exception {
        ListState<byte[]> rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC);
        this.writerState = new SimpleVersionedListState<WriterStateT>(rawState, this.writerStateSimpleVersionedSerializer);
        if (context.isRestored()) {
            List writerStates = CollectionUtil.iterableToList((Iterable)this.writerState.get());
            ArrayList states = new ArrayList(writerStates);
            for (String previousSinkStateName : this.previousSinkStateNames) {
                ListStateDescriptor<byte[]> preSinkStateDesc = new ListStateDescriptor<byte[]>(previousSinkStateName, BytePrimitiveArraySerializer.INSTANCE);
                ListState<byte[]> preRawState = context.getOperatorStateStore().getListState(preSinkStateDesc);
                SimpleVersionedListState<WriterStateT> previousSinkState = new SimpleVersionedListState<WriterStateT>(preRawState, this.writerStateSimpleVersionedSerializer);
                this.previousSinkStates.add(previousSinkState);
                Iterables.addAll(states, previousSinkState.get());
            }
            this.sinkWriter = ((SupportsWriterState)((Object)this.sink)).restoreWriter(initContext, states);
        } else {
            this.sinkWriter = StatefulSinkWriterStateHandler.cast(this.sink.createWriter(initContext));
        }
        return this.sinkWriter;
    }

    @Override
    public void snapshotState(long checkpointId) throws Exception {
        this.writerState.update(this.sinkWriter.snapshotState(checkpointId));
        this.previousSinkStates.forEach(State::clear);
    }

    private static StatefulSinkWriter cast(SinkWriter writer) {
        Preconditions.checkArgument(writer instanceof StatefulSinkWriter, "The writer should implement StatefulSinkWriter");
        return (StatefulSinkWriter)writer;
    }
}

