/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.listener;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.listener.CommitListener;
import org.apache.paimon.flink.sink.listener.CommitListenerFactory;
import org.apache.paimon.flink.sink.listener.PartitionMarkDoneListener;
import org.apache.paimon.flink.sink.listener.ReportPartStatsListener;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.StringUtils;

public class CommitListeners
implements Closeable {
    private final List<CommitListener> listeners;

    private CommitListeners(List<CommitListener> listeners) {
        this.listeners = listeners;
    }

    public void notifyCommittable(List<ManifestCommittable> committables) {
        for (CommitListener listener : this.listeners) {
            listener.notifyCommittable(committables);
        }
    }

    public void notifyCommittable(List<ManifestCommittable> committables, boolean partitionMarkDoneRecoverFromState) {
        for (CommitListener listener : this.listeners) {
            if (!partitionMarkDoneRecoverFromState && listener instanceof PartitionMarkDoneListener) continue;
            listener.notifyCommittable(committables);
        }
    }

    public void snapshotState() throws Exception {
        for (CommitListener listener : this.listeners) {
            listener.snapshotState();
        }
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeAllQuietly(this.listeners);
    }

    public static CommitListeners create(Committer.Context context, FileStoreTable table) throws Exception {
        ArrayList<CommitListener> listeners = new ArrayList<CommitListener>();
        ReportPartStatsListener.create(context.isRestored(), context.stateStore(), table).ifPresent(listeners::add);
        PartitionMarkDoneListener.create(context.getClass().getClassLoader(), context.streamingCheckpointEnabled(), context.isRestored(), context.stateStore(), table).ifPresent(listeners::add);
        String identifiers = Options.fromMap(table.options()).get(FlinkConnectorOptions.COMMIT_CUSTOM_LISTENERS);
        Arrays.stream(identifiers.split(",")).filter(identifier -> !StringUtils.isNullOrWhitespaceOnly(identifier)).map(identifier -> {
            try {
                return CommitListenerFactory.create(context, table, identifier);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).filter(Optional::isPresent).map(Optional::get).forEach(listeners::add);
        return new CommitListeners(listeners);
    }
}

