/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.dbclient.mongodb;

import io.helidon.dbclient.DbRow;
import io.helidon.dbclient.common.DbClientContext;
import io.helidon.dbclient.mongodb.MongoDbColumn;
import io.helidon.dbclient.mongodb.MongoDbRow;
import io.helidon.dbclient.mongodb.MongoDbStatement;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class MongoDbQueryProcessor
implements Subscriber<Document>,
Flow.Publisher<DbRow>,
Flow.Subscription {
    private static final Logger LOGGER = Logger.getLogger(MongoDbQueryProcessor.class.getName());
    private final AtomicLong count = new AtomicLong();
    private final CompletableFuture<Long> queryFuture;
    private final MongoDbStatement dbStatement;
    private final CompletableFuture<Void> statementFuture;
    private final DbClientContext clientContext;
    private Flow.Subscriber<? super DbRow> subscriber;
    private Subscription subscription;

    MongoDbQueryProcessor(DbClientContext clientContext, MongoDbStatement dbStatement, CompletableFuture<Void> statementFuture, CompletableFuture<Long> queryFuture) {
        this.clientContext = clientContext;
        this.statementFuture = statementFuture;
        this.queryFuture = queryFuture;
        this.dbStatement = dbStatement;
    }

    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
    }

    public void onNext(Document doc) {
        MongoDbRow dbRow = new MongoDbRow(this.clientContext.dbMapperManager(), this.clientContext.mapperManager(), doc.size());
        doc.forEach((name, value) -> {
            LOGGER.finest(() -> String.format("Column name = %s, value = %s", name, value != null ? value.toString() : "N/A"));
            dbRow.add((String)name, new MongoDbColumn(this.clientContext.dbMapperManager(), this.clientContext.mapperManager(), (String)name, value));
        });
        this.count.incrementAndGet();
        this.subscriber.onNext(dbRow);
    }

    public void onError(Throwable t) {
        LOGGER.finest(() -> String.format("Query error: %s", t.getMessage()));
        this.statementFuture.completeExceptionally(t);
        this.queryFuture.completeExceptionally(t);
        if (this.dbStatement.txManager() != null) {
            this.dbStatement.txManager().stmtFailed(this.dbStatement);
        }
        this.subscriber.onError(t);
        LOGGER.finest(() -> String.format("Query %s execution failed", this.dbStatement.statementName()));
    }

    public void onComplete() {
        LOGGER.finest(() -> "Query finished");
        this.statementFuture.complete(null);
        this.queryFuture.complete(this.count.get());
        if (this.dbStatement.txManager() != null) {
            this.dbStatement.txManager().stmtFinished(this.dbStatement);
        }
        this.subscriber.onComplete();
        LOGGER.finest(() -> String.format("Query %s execution succeeded", this.dbStatement.statementName()));
    }

    @Override
    public void subscribe(Flow.Subscriber<? super DbRow> subscriber) {
        this.subscriber = subscriber;
        LOGGER.finest(() -> "Calling onSubscribe on subscriber");
        subscriber.onSubscribe(this);
    }

    @Override
    public void request(long n) {
        LOGGER.finest(() -> String.format("Requesting %d records from MongoDB", n));
        this.subscription.request(n);
    }

    @Override
    public void cancel() {
        LOGGER.finest(() -> "Cancelling MongoDB result processing");
        this.subscription.cancel();
    }
}

