/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.sink;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoNamespace;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData;
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
import com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor;
import com.mongodb.kafka.connect.sink.MongoSinkTask;
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.RateLimitSettings;
import com.mongodb.kafka.connect.sink.dlq.AnalyzedBatchFailedWithBulkWriteException;
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
import com.mongodb.kafka.connect.source.statistics.JmxStatisticsManager;
import com.mongodb.kafka.connect.util.TimeseriesValidation;
import com.mongodb.kafka.connect.util.jmx.SinkTaskStatistics;
import com.mongodb.kafka.connect.util.jmx.internal.MBeanServerUtils;
import com.mongodb.kafka.connect.util.time.InnerOuterTimer;
import com.mongodb.kafka.connect.util.time.Timer;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;

final class StartedMongoSinkTask
implements AutoCloseable {
    private final MongoSinkConfig sinkConfig;
    private final MongoClient mongoClient;
    private final ErrorReporter errorReporter;
    private final Set<MongoNamespace> checkedTimeseriesNamespaces;
    private final SinkTaskStatistics statistics;
    private final InnerOuterTimer inTaskPutInConnectFrameworkTimer;

    StartedMongoSinkTask(MongoSinkConfig sinkConfig, MongoClient mongoClient, ErrorReporter errorReporter) {
        this.sinkConfig = sinkConfig;
        this.mongoClient = mongoClient;
        this.errorReporter = errorReporter;
        this.checkedTimeseriesNamespaces = new HashSet<MongoNamespace>();
        this.statistics = new SinkTaskStatistics(this.getMBeanName());
        this.statistics.register();
        this.inTaskPutInConnectFrameworkTimer = InnerOuterTimer.start(inTaskPutSample -> {
            this.statistics.getInTaskPut().sample(inTaskPutSample.toMillis());
            if (MongoSinkTask.LOGGER.isDebugEnabled()) {
                MongoSinkTask.LOGGER.debug(this.statistics.getName() + ": " + this.statistics.toJSON());
            }
        }, inFrameworkSample -> this.statistics.getInConnectFramework().sample(inFrameworkSample.toMillis()));
    }

    private String getMBeanName() {
        String id = MBeanServerUtils.taskIdFromCurrentThread();
        String connectorName = JmxStatisticsManager.getConnectorName(this.sinkConfig.getOriginals());
        return "com.mongodb.kafka.connect:type=sink-task-metrics,connector=" + connectorName + ",task=sink-task-" + id;
    }

    @Override
    public void close() {
        try (MongoClient autoCloseable = this.mongoClient;){
            this.statistics.unregister();
        }
    }

    void put(Collection<SinkRecord> records) {
        try (InnerOuterTimer.InnerTimer automatic = this.inTaskPutInConnectFrameworkTimer.sampleOuter();){
            this.statistics.getRecords().sample(records.size());
            this.trackLatestRecordTimestampOffset(records);
            if (records.isEmpty()) {
                MongoSinkTask.LOGGER.debug("No sink records to process for current poll operation");
            } else {
                Timer processingTime = Timer.start();
                List<List<MongoProcessedSinkRecordData>> batches = MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(records, this.sinkConfig, this.errorReporter);
                this.statistics.getProcessingPhases().sample(processingTime.getElapsedTime().toMillis());
                for (List<MongoProcessedSinkRecordData> batch : batches) {
                    this.bulkWriteBatch(batch);
                }
            }
        }
    }

    private void trackLatestRecordTimestampOffset(Collection<SinkRecord> records) {
        OptionalLong latestRecord = records.stream().filter(v -> v.timestamp() != null).mapToLong(ConnectRecord::timestamp).max();
        if (latestRecord.isPresent()) {
            long offsetMs = System.currentTimeMillis() - latestRecord.getAsLong();
            this.statistics.getLatestKafkaTimeDifferenceMs().sample(offsetMs);
        }
    }

    private void bulkWriteBatch(List<MongoProcessedSinkRecordData> batch) {
        if (batch.isEmpty()) {
            return;
        }
        MongoNamespace namespace = batch.get(0).getNamespace();
        MongoSinkTopicConfig config = batch.get(0).getConfig();
        this.checkTimeseries(namespace, config);
        List writeModels = batch.stream().map(MongoProcessedSinkRecordData::getWriteModel).collect(Collectors.toList());
        boolean bulkWriteOrdered = config.getBoolean("bulk.write.ordered");
        Timer writeTime = Timer.start();
        try {
            MongoSinkTask.LOGGER.debug("Bulk writing {} document(s) into collection [{}] via an {} bulk write", new Object[]{writeModels.size(), namespace.getFullName(), bulkWriteOrdered ? "ordered" : "unordered"});
            BulkWriteResult result = this.mongoClient.getDatabase(namespace.getDatabaseName()).getCollection(namespace.getCollectionName(), BsonDocument.class).bulkWrite(writeModels, new BulkWriteOptions().ordered(bulkWriteOrdered));
            this.statistics.getBatchWritesSuccessful().sample(writeTime.getElapsedTime().toMillis());
            this.statistics.getRecordsSuccessful().sample(batch.size());
            MongoSinkTask.LOGGER.debug("Mongodb bulk write result: {}", (Object)result);
        }
        catch (RuntimeException e) {
            this.statistics.getBatchWritesFailed().sample(writeTime.getElapsedTime().toMillis());
            this.statistics.getRecordsFailed().sample(batch.size());
            this.handleTolerableWriteException(batch.stream().map(MongoProcessedSinkRecordData::getSinkRecord).collect(Collectors.toList()), bulkWriteOrdered, e, config.logErrors(), config.tolerateErrors());
        }
        StartedMongoSinkTask.checkRateLimit(config);
    }

    private void checkTimeseries(MongoNamespace namespace, MongoSinkTopicConfig config) {
        if (!this.checkedTimeseriesNamespaces.contains(namespace)) {
            if (config.isTimeseries()) {
                TimeseriesValidation.validateCollection(this.mongoClient, namespace, config);
            }
            this.checkedTimeseriesNamespaces.add(namespace);
        }
    }

    private static void checkRateLimit(MongoSinkTopicConfig config) {
        RateLimitSettings rls = config.getRateLimitSettings();
        if (rls.isTriggered()) {
            MongoSinkTask.LOGGER.debug("Rate limit settings triggering {}ms defer timeout after processing {} further batches for topic {}", new Object[]{rls.getTimeoutMs(), rls.getEveryN(), config.getTopic()});
            try {
                Thread.sleep(rls.getTimeoutMs());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new DataException("Rate limiting was interrupted", e);
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void handleTolerableWriteException(List<SinkRecord> batch, boolean ordered, RuntimeException e, boolean logErrors, boolean tolerateErrors) {
        if (e instanceof MongoBulkWriteException) {
            AnalyzedBatchFailedWithBulkWriteException analyzedBatch = new AnalyzedBatchFailedWithBulkWriteException(batch, ordered, (MongoBulkWriteException)e, this.errorReporter, StartedMongoSinkTask::log);
            if (logErrors) {
                MongoSinkTask.LOGGER.error("Failed to put into the sink some records, see log entries below for the details", (Throwable)e);
                analyzedBatch.log();
            }
            if (!tolerateErrors) throw new DataException(e);
            analyzedBatch.report();
            return;
        } else {
            if (logErrors) {
                StartedMongoSinkTask.log(batch, e);
            }
            if (!tolerateErrors) throw new DataException(e);
            batch.forEach(record -> this.errorReporter.report((SinkRecord)record, e));
        }
    }

    private static void log(Collection<SinkRecord> records, RuntimeException e) {
        MongoSinkTask.LOGGER.error("Failed to put into the sink the following records: {}", records, (Object)e);
    }
}

