/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.pooled;

import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.common.infra.DescribableComponent;
import org.axonframework.eventhandling.ErrorContext;
import org.axonframework.eventhandling.EventHandlingComponent;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingException;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.ProcessorEventHandlingComponents;
import org.axonframework.eventhandling.ResetNotSupportedException;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.pooled.Coordinator;
import org.axonframework.eventhandling.pooled.DefaultWorkPackageEventFilter;
import org.axonframework.eventhandling.pooled.MaxSegmentProvider;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessorConfiguration;
import org.axonframework.eventhandling.pooled.WorkPackage;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventstreaming.EventCriteria;
import org.axonframework.eventstreaming.StreamableEventSource;
import org.axonframework.eventstreaming.TrackingTokenSource;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.QualifiedName;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PooledStreamingEventProcessor
implements StreamingEventProcessor,
DescribableComponent {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String name;
    private final PooledStreamingEventProcessorConfiguration configuration;
    private final StreamableEventSource<? extends EventMessage> eventSource;
    private final ProcessorEventHandlingComponents eventHandlingComponents;
    private final EventCriteria eventCriteria;
    private final UnitOfWorkFactory unitOfWorkFactory;
    private final TokenStore tokenStore;
    private final ScheduledExecutorService workerExecutor;
    private final Coordinator coordinator;
    private final WorkPackage.EventFilter workPackageEventFilter;
    private final AtomicReference<String> tokenStoreIdentifier = new AtomicReference();
    private final Map<Integer, TrackerStatus> processingStatus = new ConcurrentHashMap<Integer, TrackerStatus>();

    public PooledStreamingEventProcessor(@Nonnull String name, @Nonnull List<EventHandlingComponent> eventHandlingComponents, @Nonnull UnaryOperator<PooledStreamingEventProcessorConfiguration> customization) {
        this(Objects.requireNonNull(name, "Name may not be null"), Objects.requireNonNull(eventHandlingComponents, "EventHandlingComponents may not be null"), (PooledStreamingEventProcessorConfiguration)Objects.requireNonNull(customization, "Customization may not be null").apply(new PooledStreamingEventProcessorConfiguration()));
    }

    public PooledStreamingEventProcessor(@Nonnull String name, @Nonnull List<EventHandlingComponent> eventHandlingComponents, @Nonnull PooledStreamingEventProcessorConfiguration configuration) {
        this.name = Objects.requireNonNull(name, "Name may not be null");
        BuilderUtils.assertThat(name, n -> Objects.nonNull(n) && !n.isEmpty(), "Event Processor name may not be null or empty");
        this.configuration = Objects.requireNonNull(configuration, "Configuration may not be null");
        configuration.validate();
        this.eventSource = configuration.eventSource();
        this.tokenStore = configuration.tokenStore();
        this.unitOfWorkFactory = configuration.unitOfWorkFactory();
        this.eventHandlingComponents = new ProcessorEventHandlingComponents(eventHandlingComponents);
        this.workPackageEventFilter = new DefaultWorkPackageEventFilter(this.name, this.eventHandlingComponents, configuration.errorHandler());
        this.workerExecutor = configuration.workerExecutor();
        Set<QualifiedName> supportedEvents = this.eventHandlingComponents.supportedEvents();
        this.eventCriteria = Objects.requireNonNull(configuration.eventCriteriaProvider().apply(supportedEvents), "EventCriteriaProvider function must not return null");
        this.coordinator = Coordinator.builder().name(name).eventSource(this.eventSource).tokenStore(this.tokenStore).unitOfWorkFactory(this.unitOfWorkFactory).executorService(configuration.coordinatorExecutor()).workPackageFactory(this::spawnWorker).onMessageIgnored(configuration.ignoredMessageHandler()).processingStatusUpdater(this::statusUpdater).tokenClaimInterval(configuration.tokenClaimInterval()).claimExtensionThreshold(configuration.claimExtensionThreshold()).clock(configuration.clock()).maxSegmentProvider(configuration.maxSegmentProvider()).initialSegmentCount(configuration.initialSegmentCount()).initialToken(configuration.initialToken()).coordinatorClaimExtension(configuration.coordinatorExtendsClaims()).eventCriteria(this.eventCriteria).build();
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void start() {
        logger.info("Starting PooledStreamingEventProcessor [{}].", (Object)this.name);
        this.coordinator.start();
    }

    @Override
    public void shutDown() {
        this.shutdownAsync().join();
    }

    @Override
    public CompletableFuture<Void> shutdownAsync() {
        logger.info("Stopping PooledStreamingEventProcessor [{}]", (Object)this.name);
        return this.coordinator.stop();
    }

    @Override
    public boolean isRunning() {
        return this.coordinator.isRunning();
    }

    @Override
    public boolean isError() {
        return this.coordinator.isError();
    }

    @Override
    public String getTokenStoreIdentifier() {
        return this.tokenStoreIdentifier.updateAndGet(i -> i != null ? i : this.calculateIdentifier());
    }

    private String calculateIdentifier() {
        UnitOfWork unitOfWork = this.unitOfWorkFactory.create();
        return (String)FutureUtils.joinAndUnwrap(unitOfWork.executeWithResult(context -> CompletableFuture.completedFuture(this.tokenStore.retrieveStorageIdentifier().orElse("--unknown--"))));
    }

    @Override
    public void releaseSegment(int segmentId) {
        long tokenClaimInterval = this.configuration.tokenClaimInterval();
        this.releaseSegment(segmentId, tokenClaimInterval * 2L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void releaseSegment(int segmentId, long releaseDuration, TimeUnit unit) {
        this.coordinator.releaseUntil(segmentId, GenericEventMessage.clock.instant().plusMillis(unit.toMillis(releaseDuration)));
    }

    @Override
    public CompletableFuture<Boolean> claimSegment(int segmentId) {
        return this.coordinator.claimSegment(segmentId);
    }

    @Override
    public CompletableFuture<Boolean> splitSegment(int segmentId) {
        if (!this.tokenStore.requiresExplicitSegmentInitialization()) {
            CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
            result.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely split tokens."));
            return result;
        }
        return this.coordinator.splitSegment(segmentId);
    }

    @Override
    public CompletableFuture<Boolean> mergeSegment(int segmentId) {
        if (!this.tokenStore.requiresExplicitSegmentInitialization()) {
            CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
            result.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely merge tokens."));
            return result;
        }
        return this.coordinator.mergeSegment(segmentId);
    }

    @Override
    public boolean supportsReset() {
        return false;
    }

    @Override
    public void resetTokens() {
        Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken = this.configuration.initialToken();
        this.resetTokens(initialToken);
    }

    @Override
    public <R> void resetTokens(R resetContext) {
        Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken = this.configuration.initialToken();
        this.resetTokens(initialToken, resetContext);
    }

    @Override
    public void resetTokens(@Nonnull Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier) {
        this.resetTokens(FutureUtils.joinAndUnwrap(initialTrackingTokenSupplier.apply(this.eventSource)));
    }

    @Override
    public <R> void resetTokens(@Nonnull Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier, R resetContext) {
        this.resetTokens(FutureUtils.joinAndUnwrap(initialTrackingTokenSupplier.apply(this.eventSource)), resetContext);
    }

    @Override
    public void resetTokens(@Nonnull TrackingToken startPosition) {
        this.resetTokens(startPosition, null);
    }

    @Override
    public <R> void resetTokens(@Nonnull TrackingToken startPosition, R resetContext) {
        throw new ResetNotSupportedException("TODO #3304 - Integrate event replay logic into Event Handling Component");
    }

    @Override
    public int maxCapacity() {
        MaxSegmentProvider maxSegmentProvider = this.configuration.maxSegmentProvider();
        return maxSegmentProvider.getMaxSegments(this.name);
    }

    @Override
    public Map<Integer, EventTrackerStatus> processingStatus() {
        return Collections.unmodifiableMap(this.processingStatus);
    }

    private WorkPackage spawnWorker(Segment segment, TrackingToken initialToken) {
        WorkPackage.BatchProcessor batchProcessor = (events, context) -> this.processWithErrorHandling(events, context);
        int batchSize = this.configuration.batchSize();
        long claimExtensionThreshold = this.configuration.claimExtensionThreshold();
        Clock clock = this.configuration.clock();
        return WorkPackage.builder().name(this.name).tokenStore(this.tokenStore).unitOfWorkFactory(this.unitOfWorkFactory).executorService(this.workerExecutor).eventFilter(this.workPackageEventFilter).batchProcessor(batchProcessor).segment(segment).initialToken(initialToken).batchSize(batchSize).claimExtensionThreshold(claimExtensionThreshold).segmentStatusUpdater(this.singleStatusUpdater(segment.getSegmentId(), new TrackerStatus(segment, initialToken))).clock(clock).build();
    }

    private MessageStream.Empty<Message> processWithErrorHandling(List<? extends EventMessage> events, ProcessingContext context) {
        return this.eventHandlingComponents.handle(events, context).onErrorContinue(ex -> {
            try {
                this.configuration.errorHandler().handleError(new ErrorContext(this.name, (Throwable)ex, events));
            }
            catch (RuntimeException re) {
                return MessageStream.failed(re);
            }
            catch (Exception e) {
                return MessageStream.failed(new EventProcessingException("Exception occurred while processing events", e));
            }
            return MessageStream.empty().cast();
        }).ignoreEntries().cast();
    }

    private Consumer<UnaryOperator<TrackerStatus>> singleStatusUpdater(int segmentId, TrackerStatus initialStatus) {
        return statusUpdater -> this.processingStatus.compute(segmentId, (s, status) -> (TrackerStatus)statusUpdater.apply(status == null ? initialStatus : status));
    }

    private void statusUpdater(int segmentId, UnaryOperator<TrackerStatus> segmentUpdater) {
        this.processingStatus.computeIfPresent(segmentId, (s, ts) -> (TrackerStatus)segmentUpdater.apply((TrackerStatus)ts));
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("name", this.name);
        descriptor.describeProperty("mode", "pooled");
        descriptor.describeProperty("eventHandlingComponents", this.eventHandlingComponents);
        descriptor.describeProperty("eventCriteria", this.eventCriteria);
        descriptor.describeProperty("configuration", this.configuration);
        descriptor.describeProperty("processingStatus", this.processingStatus);
    }
}

