package org.factcast.store.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.core.Fact;
import org.factcast.core.subscription.FactStreamInfo;
import org.factcast.core.subscription.SubscriptionImpl;
import org.factcast.core.subscription.SubscriptionRequest;
import org.factcast.core.subscription.SubscriptionRequestTO;
import org.factcast.core.subscription.observer.FastForwardTarget;
import org.factcast.store.internal.StoreMetrics;
import org.factcast.store.internal.catchup.PgCatchupFactory;
import org.factcast.store.internal.query.PgFactIdToSerialMapper;
import org.factcast.store.internal.query.PgLatestSerialFetcher;
import org.factcast.store.internal.query.PgQueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowCallbackHandler;

/* loaded from: input_file:org/factcast/store/internal/PgFactStream.class */
public class PgFactStream {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PgFactStream.class);
    private final JdbcTemplate jdbcTemplate;
    private final EventBus eventBus;
    private final PgFactIdToSerialMapper idToSerMapper;
    private final SubscriptionImpl subscription;
    private final AtomicLong serial = new AtomicLong(0);
    private final AtomicBoolean disconnected = new AtomicBoolean(false);
    private final PgLatestSerialFetcher fetcher;
    private final PgCatchupFactory pgCatchupFactory;
    private final FastForwardTarget ffwdTarget;
    private final PgMetrics metrics;
    private CondensedQueryExecutor condensedExecutor;
    private SubscriptionRequestTO request;
    private PgPostQueryMatcher postQueryMatcher;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/factcast/store/internal/PgFactStream$FactRowCallbackHandler.class */
    public static class FactRowCallbackHandler implements RowCallbackHandler {
        private final SubscriptionImpl subscription;
        private final PgPostQueryMatcher postQueryMatcher;
        private final Supplier<Boolean> isConnectedSupplier;
        private final AtomicLong serial;
        private final SubscriptionRequestTO request;

        public void processRow(ResultSet resultSet) throws SQLException {
            if (this.isConnectedSupplier.get().booleanValue()) {
                if (resultSet.isClosed()) {
                    throw new IllegalStateException("ResultSet already closed. We should not have got here. THIS IS A BUG!");
                }
                Fact from = PgFact.from(resultSet);
                UUID id = from.id();
                if (this.postQueryMatcher.test(from)) {
                    try {
                        this.subscription.notifyElement(from);
                        PgFactStream.log.trace("{} notifyElement called with id={}", this.request, id);
                    } catch (Throwable th) {
                        resultSet.close();
                        this.subscription.notifyError(th);
                    }
                } else {
                    PgFactStream.log.trace("{} filtered id={}", this.request, id);
                }
                this.serial.set(resultSet.getLong(PgConstants.COLUMN_SER));
            }
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public FactRowCallbackHandler(SubscriptionImpl subscriptionImpl, PgPostQueryMatcher pgPostQueryMatcher, Supplier<Boolean> supplier, AtomicLong atomicLong, SubscriptionRequestTO subscriptionRequestTO) {
            this.subscription = subscriptionImpl;
            this.postQueryMatcher = pgPostQueryMatcher;
            this.isConnectedSupplier = supplier;
            this.serial = atomicLong;
            this.request = subscriptionRequestTO;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/factcast/store/internal/PgFactStream$RatioLogLevel.class */
    public enum RatioLogLevel {
        DEBUG,
        INFO,
        WARN
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connect(@NonNull SubscriptionRequestTO subscriptionRequestTO) {
        Objects.requireNonNull(subscriptionRequestTO, "request is marked non-null but is null");
        this.request = subscriptionRequestTO;
        log.debug("{} connect subscription {}", subscriptionRequestTO, subscriptionRequestTO.dump());
        this.postQueryMatcher = new PgPostQueryMatcher(subscriptionRequestTO);
        PgQueryBuilder pgQueryBuilder = new PgQueryBuilder(subscriptionRequestTO.specs());
        initializeSerialToStartAfter();
        if (subscriptionRequestTO.streamInfo()) {
            this.subscription.notifyFactStreamInfo(new FactStreamInfo(this.serial.get(), this.fetcher.retrieveLatestSer()));
        }
        catchupAndFollow(subscriptionRequestTO, this.subscription, new PgSynchronizedQuery(this.jdbcTemplate, pgQueryBuilder.createSQL(), pgQueryBuilder.createStatementSetter(this.serial), new FactRowCallbackHandler(this.subscription, this.postQueryMatcher, this::isConnected, this.serial, subscriptionRequestTO), this.serial, this.fetcher));
    }

    private void initializeSerialToStartAfter() {
        Optional startingAfter = this.request.startingAfter();
        PgFactIdToSerialMapper pgFactIdToSerialMapper = this.idToSerMapper;
        Objects.requireNonNull(pgFactIdToSerialMapper);
        Long l = (Long) startingAfter.map(pgFactIdToSerialMapper::retrieve).orElse(0L);
        this.serial.set(l.longValue());
        log.trace("{} setting starting point to SER={}", this.request, l);
    }

    @VisibleForTesting
    void catchupAndFollow(SubscriptionRequest subscriptionRequest, SubscriptionImpl subscriptionImpl, PgSynchronizedQuery pgSynchronizedQuery) {
        long maxBatchDelayInMs;
        if (subscriptionRequest.ephemeral()) {
            this.serial.set(this.fetcher.retrieveLatestSer());
        } else {
            catchup(this.postQueryMatcher);
            logCatchupTransformationStats();
        }
        fastForward(subscriptionRequest, subscriptionImpl);
        if (isConnected()) {
            log.trace("{} signaling catchup", subscriptionRequest);
            subscriptionImpl.notifyCatchup();
        }
        if (isConnected()) {
            if (!subscriptionRequest.continuous()) {
                subscriptionImpl.notifyComplete();
                log.debug("{} completed", subscriptionRequest);
                return;
            }
            log.debug("{} entering follow mode", subscriptionRequest);
            if (subscriptionRequest.maxBatchDelayInMs() < 1) {
                maxBatchDelayInMs = 0;
            } else {
                maxBatchDelayInMs = ((subscriptionRequest.maxBatchDelayInMs() / 4) * 3) + ((long) Math.abs(Math.random() * (subscriptionRequest.maxBatchDelayInMs() / 4.0d)));
                log.trace("{} setting delay to {}, maxDelay was {}", new Object[]{subscriptionRequest, Long.valueOf(maxBatchDelayInMs), Long.valueOf(subscriptionRequest.maxBatchDelayInMs())});
            }
            this.condensedExecutor = new CondensedQueryExecutor(maxBatchDelayInMs, pgSynchronizedQuery, this::isConnected, subscriptionRequest.specs());
            this.eventBus.register(this.condensedExecutor);
            this.condensedExecutor.trigger();
        }
    }

    @VisibleForTesting
    void fastForward(SubscriptionRequest subscriptionRequest, SubscriptionImpl subscriptionImpl) {
        if (isConnected()) {
            long j = 0;
            if (subscriptionRequest.startingAfter().isPresent()) {
                j = this.idToSerMapper.retrieve((UUID) subscriptionRequest.startingAfter().get());
            }
            UUID targetId = this.ffwdTarget.targetId();
            long targetSer = this.ffwdTarget.targetSer();
            if (targetId == null || targetSer <= j) {
                return;
            }
            subscriptionImpl.notifyFastForward(targetId);
        }
    }

    @VisibleForTesting
    void catchup(PgPostQueryMatcher pgPostQueryMatcher) {
        if (isConnected()) {
            log.trace("{} catchup phase1 - historic facts staring with SER={}", this.request, Long.valueOf(this.serial.get()));
            this.pgCatchupFactory.create(this.request, pgPostQueryMatcher, this.subscription, this.serial, this.metrics).run();
        }
        if (isConnected()) {
            log.trace("{} catchup phase2 - facts since connect (SER={})", this.request, Long.valueOf(this.serial.get()));
            this.pgCatchupFactory.create(this.request, pgPostQueryMatcher, this.subscription, this.serial, this.metrics).run();
        }
    }

    @VisibleForTesting
    void logCatchupTransformationStats() {
        if (this.subscription.factsTransformed().get() > 0) {
            long j = this.subscription.factsTransformed().get() + this.subscription.factsNotTransformed().get();
            long j2 = this.subscription.factsTransformed().get();
            long round = Math.round((100.0d / j) * j2);
            RatioLogLevel calculateLogLevel = calculateLogLevel(j, round);
            switch (calculateLogLevel) {
                case DEBUG:
                    log.debug("{} CatchupTransformationRatio: {}%, ({}/{})", new Object[]{this.request, Long.valueOf(round), Long.valueOf(j2), Long.valueOf(j)});
                    return;
                case INFO:
                    log.info("{} CatchupTransformationRatio: {}%, ({}/{})", new Object[]{this.request, Long.valueOf(round), Long.valueOf(j2), Long.valueOf(j)});
                    return;
                case WARN:
                    log.warn("{} CatchupTransformationRatio: {}%, ({}/{})", new Object[]{this.request, Long.valueOf(round), Long.valueOf(j2), Long.valueOf(j)});
                    return;
                default:
                    throw new IllegalArgumentException("switch fall-through. THIS IS A BUG! " + calculateLogLevel);
            }
        }
    }

    @VisibleForTesting
    RatioLogLevel calculateLogLevel(long j, long j2) {
        RatioLogLevel ratioLogLevel = RatioLogLevel.DEBUG;
        if (j >= 50) {
            this.metrics.distributionSummary(StoreMetrics.VALUE.CATCHUP_TRANSFORMATION_RATIO).record(j2);
            if (j2 >= 20.0d) {
                ratioLogLevel = RatioLogLevel.WARN;
            } else if (j2 >= 10.0d) {
                ratioLogLevel = RatioLogLevel.INFO;
            }
        }
        return ratioLogLevel;
    }

    private boolean isConnected() {
        return !this.disconnected.get();
    }

    public synchronized void close() {
        log.trace("{} disconnecting ", this.request);
        this.disconnected.set(true);
        if (this.condensedExecutor != null) {
            this.eventBus.unregister(this.condensedExecutor);
            this.condensedExecutor.cancel();
            this.condensedExecutor = null;
        }
        log.debug("{} disconnected ", this.request);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public PgFactStream(JdbcTemplate jdbcTemplate, EventBus eventBus, PgFactIdToSerialMapper pgFactIdToSerialMapper, SubscriptionImpl subscriptionImpl, PgLatestSerialFetcher pgLatestSerialFetcher, PgCatchupFactory pgCatchupFactory, FastForwardTarget fastForwardTarget, PgMetrics pgMetrics) {
        this.jdbcTemplate = jdbcTemplate;
        this.eventBus = eventBus;
        this.idToSerMapper = pgFactIdToSerialMapper;
        this.subscription = subscriptionImpl;
        this.fetcher = pgLatestSerialFetcher;
        this.pgCatchupFactory = pgCatchupFactory;
        this.ffwdTarget = fastForwardTarget;
        this.metrics = pgMetrics;
    }
}
