/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
import org.apache.paimon.flink.sink.WriterRefresher;
import org.apache.paimon.flink.sink.coordinator.CoordinatedWriteRestore;
import org.apache.paimon.flink.sink.coordinator.WriteOperatorCoordinator;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;

public abstract class TableWriteOperator<IN>
extends PrepareCommitOperator<IN, Committable> {
    private static final long serialVersionUID = 1L;
    protected FileStoreTable table;
    protected final StoreSinkWrite.Provider storeSinkWriteProvider;
    protected final String initialCommitUser;
    @Nullable
    protected transient WriteRestore writeRestore;
    protected transient String commitUser;
    protected transient StoreSinkWriteState state;
    protected transient StoreSinkWrite write;
    @Nullable
    protected transient WriterRefresher writeRefresher;

    public TableWriteOperator(StreamOperatorParameters<Committable> parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
        super(parameters, Options.fromMap(table.options()));
        this.table = table;
        this.storeSinkWriteProvider = storeSinkWriteProvider;
        this.initialCommitUser = initialCommitUser;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        boolean containLogSystem = this.containLogSystem();
        int numTasks = RuntimeContextUtils.getNumberOfParallelSubtasks((RuntimeContext)this.getRuntimeContext());
        int subtaskId = RuntimeContextUtils.getIndexOfThisSubtask((RuntimeContext)this.getRuntimeContext());
        StoreSinkWriteState.StateValueFilter stateFilter = (tableName, partition, bucket) -> {
            int task = containLogSystem ? ChannelComputer.select(bucket, numTasks) : ChannelComputer.select(partition, bucket, numTasks);
            return task == subtaskId;
        };
        this.state = this.createState(subtaskId, context, stateFilter);
        this.write = this.storeSinkWriteProvider.provide(this.table, this.getCommitUser(context), this.state, this.getContainingTask().getEnvironment().getIOManager(), this.memoryPoolFactory, (MetricGroup)this.getMetricGroup());
        if (this.writeRestore != null) {
            this.write.setWriteRestore(this.writeRestore);
        }
        this.writeRefresher = WriterRefresher.create(this.write.streamingMode(), this.table, this.write::replace);
    }

    public void setWriteRestore(@Nullable WriteRestore writeRestore) {
        this.writeRestore = writeRestore;
    }

    protected StoreSinkWriteState createState(int subtaskId, StateInitializationContext context, StoreSinkWriteState.StateValueFilter stateFilter) throws Exception {
        return new StoreSinkWriteStateImpl(subtaskId, context, stateFilter);
    }

    protected String getCommitUser(StateInitializationContext context) throws Exception {
        if (this.commitUser == null) {
            this.commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, this.initialCommitUser);
        }
        return this.commitUser;
    }

    protected abstract boolean containLogSystem();

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.write.snapshotState();
        this.state.snapshotState();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.write != null) {
            this.write.close();
        }
    }

    @Override
    protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException {
        return this.write.prepareCommit(waitCompaction, checkpointId);
    }

    @VisibleForTesting
    public StoreSinkWrite getWrite() {
        return this.write;
    }

    protected void tryRefreshWrite() {
        if (this.writeRefresher != null) {
            this.writeRefresher.tryRefresh();
        }
    }

    protected static abstract class CoordinatedFactory<IN>
    extends PrepareCommitOperator.Factory<IN, Committable>
    implements CoordinatedOperatorFactory<Committable> {
        private static final long serialVersionUID = 1L;
        protected final FileStoreTable table;
        protected final StoreSinkWrite.Provider storeSinkWriteProvider;
        protected final String initialCommitUser;

        protected CoordinatedFactory(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
            super(Options.fromMap(table.options()));
            this.table = table;
            this.storeSinkWriteProvider = storeSinkWriteProvider;
            this.initialCommitUser = initialCommitUser;
        }

        public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
            return new WriteOperatorCoordinator.Provider(operatorID, this.table);
        }

        public final <T extends StreamOperator<Committable>> T createStreamOperator(StreamOperatorParameters<Committable> parameters) {
            OperatorID operatorID = parameters.getStreamConfig().getOperatorID();
            TaskOperatorEventGateway gateway = parameters.getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway();
            T operator = this.createStreamOperatorImpl(parameters);
            ((TableWriteOperator)((Object)operator)).setWriteRestore(new CoordinatedWriteRestore(gateway, operatorID));
            return operator;
        }

        public abstract <T extends TableWriteOperator<IN>> T createStreamOperatorImpl(StreamOperatorParameters<Committable> var1);
    }

    protected static abstract class Factory<IN>
    extends PrepareCommitOperator.Factory<IN, Committable> {
        private static final long serialVersionUID = 1L;
        protected final FileStoreTable table;
        protected final StoreSinkWrite.Provider storeSinkWriteProvider;
        protected final String initialCommitUser;

        protected Factory(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
            super(Options.fromMap(table.options()));
            this.table = table;
            this.storeSinkWriteProvider = storeSinkWriteProvider;
            this.initialCommitUser = initialCommitUser;
        }
    }
}

