/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;

import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.CommittableTransaction;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.CommittableUpsertTransaction;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongodbSinkAggregatedCommitter
implements SinkAggregatedCommitter<MongodbCommitInfo, MongodbAggregatedCommitInfo> {
    private static final Logger log = LoggerFactory.getLogger(MongodbSinkAggregatedCommitter.class);
    private static final long waitingTime = 5000L;
    private static final long TRANSACTION_TIMEOUT_MS = 60000L;
    private final boolean enableUpsert;
    private final String[] upsertKeys;
    private final MongodbClientProvider collectionProvider;
    private ClientSession clientSession;
    private MongoClient client;

    public MongodbSinkAggregatedCommitter(MongodbWriterOptions options) {
        this.enableUpsert = options.isUpsertEnable();
        this.upsertKeys = options.getPrimaryKey();
        this.collectionProvider = MongodbCollectionProvider.builder().connectionString(options.getConnectString()).database(options.getDatabase()).collection(options.getCollection()).build();
    }

    public List<MongodbAggregatedCommitInfo> commit(List<MongodbAggregatedCommitInfo> aggregatedCommitInfo) {
        return aggregatedCommitInfo.stream().map(this::processAggregatedCommitInfo).filter(failedAggregatedCommitInfo -> !failedAggregatedCommitInfo.getCommitInfos().isEmpty()).collect(Collectors.toList());
    }

    private MongodbAggregatedCommitInfo processAggregatedCommitInfo(MongodbAggregatedCommitInfo aggregatedCommitInfo) {
        List<MongodbCommitInfo> failedCommitInfos = aggregatedCommitInfo.getCommitInfos().stream().flatMap(this::processCommitInfo).filter(failedDocumentBulks -> !failedDocumentBulks.isEmpty()).map(MongodbCommitInfo::new).collect(Collectors.toList());
        return new MongodbAggregatedCommitInfo(failedCommitInfos);
    }

    private Stream<List<DocumentBulk>> processCommitInfo(MongodbCommitInfo commitInfo) {
        this.client = this.collectionProvider.getClient();
        this.clientSession = this.client.startSession();
        MongoCollection<BsonDocument> collection = this.collectionProvider.getDefaultCollection();
        return Stream.of(commitInfo.getDocumentBulks().stream().filter(bulk -> !bulk.getDocuments().isEmpty()).filter(bulk -> {
            try {
                CommittableTransaction transaction = this.enableUpsert ? new CommittableUpsertTransaction(collection, bulk.getDocuments(), this.upsertKeys) : new CommittableTransaction(collection, bulk.getDocuments());
                int insertedDocs = this.clientSession.withTransaction(transaction, TransactionOptions.builder().readPreference(ReadPreference.primary()).readConcern(ReadConcern.LOCAL).writeConcern(WriteConcern.MAJORITY).build());
                log.info("Inserted {} documents into collection {}.", (Object)insertedDocs, (Object)collection.getNamespace());
                return false;
            }
            catch (Exception e) {
                log.error("Failed to commit with Mongo transaction.", (Throwable)e);
                return true;
            }
        }).collect(Collectors.toList()));
    }

    public MongodbAggregatedCommitInfo combine(List<MongodbCommitInfo> commitInfos) {
        return new MongodbAggregatedCommitInfo(commitInfos);
    }

    public void abort(List<MongodbAggregatedCommitInfo> aggregatedCommitInfo) {
    }

    public void close() {
        long deadline = System.currentTimeMillis() + 60000L;
        while (this.clientSession.hasActiveTransaction() && System.currentTimeMillis() < deadline) {
            Thread.sleep(5000L);
        }
        if (this.clientSession != null) {
            this.clientSession.close();
        }
        if (this.client != null) {
            this.client.close();
        }
    }
}

