/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore.jpa;

import jakarta.annotation.Nonnull;
import jakarta.persistence.EntityManager;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.axonframework.common.annotation.Internal;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.eventsourcing.eventstore.EventCoordinator;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class JpaPollingEventCoordinator
implements EventCoordinator {
    private static final Logger LOGGER = LoggerFactory.getLogger(JpaPollingEventCoordinator.class);
    private static final ThreadFactory THREAD_FACTORY = Thread.ofPlatform().daemon().name(JpaPollingEventCoordinator.class.getSimpleName(), 1L).factory();
    private static final String COUNT_EVENTS = "SELECT COUNT(*) FROM AggregateEventEntry\n";
    private final EntityManagerProvider entityManagerProvider;
    private final Duration pollingInterval;

    public JpaPollingEventCoordinator(@Nonnull EntityManagerProvider entityManagerProvider, @Nonnull Duration pollingInterval) {
        this.entityManagerProvider = Objects.requireNonNull(entityManagerProvider, "entityManagerProvider");
        this.pollingInterval = Objects.requireNonNull(pollingInterval, "pollingInterval");
        if (!pollingInterval.isPositive()) {
            throw new IllegalArgumentException("pollingInterval must be positive: " + String.valueOf(pollingInterval));
        }
    }

    @Override
    public EventCoordinator.Handle startCoordination(Runnable onAppendDetected) {
        final AtomicBoolean terminated = new AtomicBoolean();
        final Thread pollingThread = THREAD_FACTORY.newThread(() -> {
            long lastTotalEvents = 0L;
            while (true) {
                try {
                    long totalEvents;
                    do {
                        Thread.sleep(this.pollingInterval);
                        totalEvents = this.countEvents();
                    } while (totalEvents == lastTotalEvents);
                    lastTotalEvents = totalEvents;
                }
                catch (InterruptedException e) {
                    if (terminated.get()) {
                        break;
                    }
                }
                catch (Exception e) {
                    LOGGER.warn("Exception while polling AggregateEventEntry, retrying next poll interval", (Throwable)e);
                }
                onAppendDetected.run();
            }
            LOGGER.info("Exiting " + String.valueOf(this));
        });
        pollingThread.start();
        return new EventCoordinator.Handle(){

            @Override
            public void onEventsAppended(List<TaggedEventMessage<?>> events) {
                if (events.size() > 0) {
                    pollingThread.interrupt();
                }
            }

            @Override
            public void terminate() {
                terminated.set(true);
                pollingThread.interrupt();
            }
        };
    }

    private long countEvents() {
        try (EntityManager entityManager = this.entityManagerProvider.getEntityManager();){
            long l = (Long)entityManager.createQuery(COUNT_EVENTS, Long.class).getSingleResult();
            return l;
        }
    }
}

