/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.processors.streaming.pooled;

import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.axonframework.common.Assert;
import org.axonframework.common.FutureUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.processors.streaming.pooled.EventSchedulingProcessingContext;
import org.axonframework.eventhandling.processors.streaming.segmenting.Segment;
import org.axonframework.eventhandling.processors.streaming.segmenting.TrackerStatus;
import org.axonframework.eventhandling.processors.streaming.token.TrackingToken;
import org.axonframework.eventhandling.processors.streaming.token.WrappedToken;
import org.axonframework.eventhandling.processors.streaming.token.store.TokenStore;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.EmptyApplicationContext;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkPackage {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    static final int BUFFER_SIZE = 1024;
    private final String name;
    private final TokenStore tokenStore;
    private final UnitOfWorkFactory unitOfWorkFactory;
    private final ExecutorService executorService;
    private final EventFilter eventFilter;
    private final BatchProcessor batchProcessor;
    private final Segment segment;
    private final int batchSize;
    private final long claimExtensionThreshold;
    private final Consumer<UnaryOperator<TrackerStatus>> segmentStatusUpdater;
    private final Supplier<ProcessingContext> schedulingProcessingContextProvider;
    private Runnable batchProcessedCallback;
    private final Clock clock;
    private TrackingToken lastDeliveredToken;
    private TrackingToken lastConsumedToken;
    private TrackingToken lastStoredToken;
    private final AtomicLong nextClaimExtension;
    private final AtomicBoolean processingEvents;
    private final Queue<ProcessingEntry> processingQueue = new ConcurrentLinkedQueue<ProcessingEntry>();
    private final AtomicBoolean scheduled = new AtomicBoolean();
    private final AtomicReference<CompletableFuture<Exception>> abortFlag = new AtomicReference();
    private final AtomicReference<Exception> abortException = new AtomicReference();

    protected static Builder builder() {
        return new Builder();
    }

    private WorkPackage(Builder builder) {
        this.name = builder.name;
        this.tokenStore = builder.tokenStore;
        this.unitOfWorkFactory = builder.unitOfWorkFactory;
        this.executorService = builder.executorService;
        this.eventFilter = builder.eventFilter;
        this.batchProcessor = builder.batchProcessor;
        this.segment = builder.segment;
        this.lastDeliveredToken = builder.initialToken;
        this.batchSize = builder.batchSize;
        this.claimExtensionThreshold = builder.claimExtensionThreshold;
        this.segmentStatusUpdater = builder.segmentStatusUpdater;
        this.clock = builder.clock;
        this.lastConsumedToken = builder.initialToken;
        this.nextClaimExtension = new AtomicLong(this.now() + this.claimExtensionThreshold);
        this.processingEvents = new AtomicBoolean(false);
        this.schedulingProcessingContextProvider = builder.schedulingProcessingContextProvider;
    }

    private long now() {
        return this.clock.instant().toEpochMilli();
    }

    public boolean scheduleEvents(List<MessageStream.Entry<? extends EventMessage>> eventEntries) {
        if (eventEntries.isEmpty()) {
            return false;
        }
        this.assertEqualTokens(eventEntries);
        if (eventEntries.stream().allMatch(this::shouldNotSchedule)) {
            if (logger.isTraceEnabled()) {
                eventEntries.forEach(eventEntry -> {
                    TrackingToken eventToken = TrackingToken.fromContext(eventEntry).orElse(null);
                    logger.trace("Ignoring event [{}] with position [{}] for work package [{}]. The last token [{}] covers event's token [{}].", new Object[]{((EventMessage)eventEntry.message()).identifier(), eventToken != null ? eventToken.position().orElse(-1L) : -1L, this.segment.getSegmentId(), this.lastDeliveredToken, eventToken});
                });
            }
            return false;
        }
        BatchProcessingEntry batchProcessingEntry = new BatchProcessingEntry();
        boolean canHandleAny = eventEntries.stream().map(eventEntry -> {
            boolean canHandle = this.canHandleMessage((MessageStream.Entry<? extends EventMessage>)eventEntry);
            batchProcessingEntry.add(new DefaultProcessingEntry((MessageStream.Entry<? extends EventMessage>)eventEntry, canHandle));
            return canHandle;
        }).reduce(Boolean::logicalOr).orElse(false);
        this.processingQueue.add(batchProcessingEntry);
        this.lastDeliveredToken = batchProcessingEntry.trackingToken();
        this.scheduleWorker();
        return canHandleAny;
    }

    private void assertEqualTokens(List<MessageStream.Entry<? extends EventMessage>> eventEntries) {
        TrackingToken expectedToken = TrackingToken.fromContext(eventEntries.getFirst()).orElse(null);
        Assert.isTrue(eventEntries.stream().map(entry -> TrackingToken.fromContext(entry).orElse(null)).allMatch(token -> Objects.equals(expectedToken, token)), () -> "All tokens should match when scheduling multiple events in one go.");
    }

    public boolean scheduleEvent(MessageStream.Entry<? extends EventMessage> eventEntry) {
        TrackingToken eventToken = TrackingToken.fromContext(eventEntry).orElse(null);
        EventMessage message = eventEntry.message();
        if (this.shouldNotSchedule(eventEntry)) {
            logger.trace("Ignoring event [{}] with position [{}] for work package [{}]. The last token [{}] covers event's token [{}].", new Object[]{message.identifier(), eventToken != null ? eventToken.position().orElse(-1L) : -1L, this.segment.getSegmentId(), this.lastDeliveredToken, eventToken});
            return false;
        }
        logger.debug("Assigned event [{}] with position [{}] to work package [{}].", new Object[]{message.identifier(), eventToken != null ? eventToken.position().orElse(-1L) : -1L, this.segment.getSegmentId()});
        boolean canHandle = this.canHandleMessage(eventEntry);
        this.processingQueue.add(new DefaultProcessingEntry(eventEntry, canHandle));
        this.lastDeliveredToken = eventToken;
        this.scheduleWorker();
        return canHandle;
    }

    private boolean canHandleMessage(MessageStream.Entry<? extends EventMessage> eventEntry) {
        ProcessingContext processingContext = Message.addToContext(WorkPackage.copyResources(eventEntry, this.schedulingProcessingContextProvider.get()), eventEntry.message());
        return this.canHandle(eventEntry.message(), processingContext);
    }

    private static ProcessingContext copyResources(Context from, ProcessingContext to) {
        from.resources().forEach((k, v) -> to.putResource(k, v));
        return to;
    }

    private boolean shouldNotSchedule(MessageStream.Entry<? extends EventMessage> eventEntry) {
        TrackingToken eventToken = TrackingToken.fromContext(eventEntry).orElse(null);
        return this.lastDeliveredToken != null && eventToken != null && this.lastDeliveredToken.covers(eventToken);
    }

    private boolean canHandle(EventMessage eventMessage, ProcessingContext processingContext) {
        try {
            return this.eventFilter.canHandle(eventMessage, processingContext, this.segment);
        }
        catch (Exception e) {
            logger.warn("Error while detecting whether event can be handled in Work Package [{}]-[{}]. Aborting Work Package...", new Object[]{this.segment.getSegmentId(), this.name, e});
            this.abort(e);
            return false;
        }
    }

    public void scheduleWorker() {
        if (!this.scheduled.compareAndSet(false, true)) {
            return;
        }
        logger.debug("Scheduling Work Package [{}]-[{}] to process events.", (Object)this.segment.getSegmentId(), (Object)this.name);
        this.executorService.submit(() -> {
            CompletableFuture<Exception> aborting = this.abortFlag.get();
            if (aborting != null) {
                logger.debug("Work Package [{}]-[{}] should be aborted. Will shutdown this work package.", (Object)this.segment.getSegmentId(), (Object)this.name);
                this.segmentStatusUpdater.accept(previousStatus -> null);
                aborting.complete(this.abortException.get());
                return;
            }
            try {
                this.processEvents();
            }
            catch (Exception e) {
                logger.warn("Error while processing batch in Work Package [{}]-[{}]. Aborting Work Package...", new Object[]{this.segment.getSegmentId(), this.name, e});
                this.abort(e);
            }
            this.scheduled.set(false);
            if (!this.processingQueue.isEmpty() || this.abortFlag.get() != null) {
                logger.debug("Rescheduling Work Package [{}]-[{}] since there are events left.", (Object)this.segment.getSegmentId(), (Object)this.name);
                this.scheduleWorker();
            }
        });
    }

    private void processEvents() {
        ArrayList<EventMessage> eventBatch = new ArrayList<EventMessage>();
        while (!this.isAbortTriggered() && eventBatch.size() < this.batchSize && !this.processingQueue.isEmpty()) {
            ProcessingEntry entry = this.processingQueue.poll();
            this.lastConsumedToken = WrappedToken.advance(this.lastConsumedToken, entry.trackingToken());
            entry.addToBatch(eventBatch);
        }
        if (!eventBatch.isEmpty()) {
            logger.debug("Work Package [{}]-[{}] is processing a batch of {} events.", new Object[]{this.segment.getSegmentId(), this.name, eventBatch.size()});
            try {
                this.processingEvents.set(true);
                UnitOfWork unitOfWork = this.unitOfWorkFactory.create();
                unitOfWork.runOnPreInvocation(ctx -> {
                    ctx.putResource(Segment.RESOURCE_KEY, this.segment);
                    ctx.putResource(TrackingToken.RESOURCE_KEY, this.lastConsumedToken);
                });
                unitOfWork.onInvocation(ctx -> this.batchProcessor.process((List<? extends EventMessage>)eventBatch, (ProcessingContext)ctx).asCompletableFuture());
                unitOfWork.runOnPrepareCommit(ctx -> this.storeToken(this.lastConsumedToken, (ProcessingContext)ctx));
                unitOfWork.runOnAfterCommit(ctx -> {
                    this.segmentStatusUpdater.accept(status -> status.advancedTo(this.lastConsumedToken));
                    if (this.batchProcessedCallback != null) {
                        this.batchProcessedCallback.run();
                    }
                });
                FutureUtils.joinAndUnwrap(unitOfWork.execute());
            }
            finally {
                this.processingEvents.set(false);
            }
        } else {
            this.segmentStatusUpdater.accept(status -> status.advancedTo(this.lastConsumedToken));
            if (this.lastStoredToken != this.lastConsumedToken && this.now() > this.nextClaimExtension.get()) {
                FutureUtils.joinAndUnwrap(this.unitOfWorkFactory.create().executeWithResult(context -> {
                    this.storeToken(this.lastConsumedToken, (ProcessingContext)context);
                    return FutureUtils.emptyCompletedFuture();
                }));
            } else {
                this.extendClaimIfThresholdIsMet();
            }
        }
    }

    public void extendClaimIfThresholdIsMet() {
        if (this.now() > this.nextClaimExtension.get()) {
            logger.debug("Work Package [{}]-[{}] will extend its token claim.", (Object)this.name, (Object)this.segment.getSegmentId());
            FutureUtils.joinAndUnwrap(this.unitOfWorkFactory.create().executeWithResult(context -> this.tokenStore.extendClaim(this.name, this.segment.getSegmentId(), (ProcessingContext)context)));
            this.nextClaimExtension.set(this.now() + this.claimExtensionThreshold);
        }
    }

    private void storeToken(TrackingToken token, @Nonnull ProcessingContext processingContext) {
        logger.debug("Work Package [{}]-[{}] will store token [{}].", new Object[]{this.name, this.segment.getSegmentId(), token});
        FutureUtils.joinAndUnwrap(this.tokenStore.storeToken(token, this.name, this.segment.getSegmentId(), processingContext));
        this.lastStoredToken = token;
        this.nextClaimExtension.set(this.now() + this.claimExtensionThreshold);
    }

    public boolean hasRemainingCapacity() {
        return this.processingQueue.size() < 1024;
    }

    public boolean isDone() {
        return this.processingQueue.isEmpty() && !this.scheduled.get();
    }

    public Segment segment() {
        return this.segment;
    }

    public TrackingToken lastDeliveredToken() {
        return this.lastDeliveredToken;
    }

    public boolean isAbortTriggered() {
        return this.abortFlag.get() != null;
    }

    public CompletableFuture<Exception> abort(Exception abortReason) {
        if (abortReason != null) {
            logger.debug("Abort request received for Work Package [{}]-[{}].", new Object[]{this.name, this.segment.getSegmentId(), abortReason});
            this.segmentStatusUpdater.accept(status -> {
                if (status != null) {
                    return status.isErrorState() ? status : status.markError(abortReason);
                }
                return null;
            });
        }
        CompletableFuture<Exception> abortTask = this.abortFlag.updateAndGet(currentFlag -> {
            if (currentFlag == null) {
                this.abortException.set(abortReason);
                return new CompletableFuture();
            }
            this.abortException.updateAndGet(currentReason -> currentReason == null ? abortReason : currentReason);
            return currentFlag;
        });
        this.scheduleWorker();
        return abortTask;
    }

    void onBatchProcessed(Runnable batchProcessedCallback) {
        this.batchProcessedCallback = batchProcessedCallback;
    }

    public boolean isProcessingEvents() {
        return this.processingEvents.get();
    }

    static class Builder {
        private String name;
        private TokenStore tokenStore;
        private UnitOfWorkFactory unitOfWorkFactory;
        private ExecutorService executorService;
        private EventFilter eventFilter;
        private BatchProcessor batchProcessor;
        private Segment segment;
        private TrackingToken initialToken;
        private int batchSize = 1;
        private long claimExtensionThreshold = 5000L;
        private Consumer<UnaryOperator<TrackerStatus>> segmentStatusUpdater;
        private Clock clock = GenericEventMessage.clock;
        private Supplier<ProcessingContext> schedulingProcessingContextProvider = () -> new EventSchedulingProcessingContext(EmptyApplicationContext.INSTANCE);

        Builder() {
        }

        Builder name(String name) {
            this.name = name;
            return this;
        }

        Builder tokenStore(TokenStore tokenStore) {
            this.tokenStore = tokenStore;
            return this;
        }

        Builder unitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) {
            this.unitOfWorkFactory = unitOfWorkFactory;
            return this;
        }

        Builder executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        Builder eventFilter(EventFilter eventFilter) {
            this.eventFilter = eventFilter;
            return this;
        }

        Builder batchProcessor(BatchProcessor batchProcessor) {
            this.batchProcessor = batchProcessor;
            return this;
        }

        Builder segment(Segment segment) {
            this.segment = segment;
            return this;
        }

        Builder initialToken(TrackingToken initialToken) {
            this.initialToken = initialToken;
            return this;
        }

        Builder batchSize(int batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        Builder claimExtensionThreshold(long claimExtensionThreshold) {
            this.claimExtensionThreshold = claimExtensionThreshold;
            return this;
        }

        Builder segmentStatusUpdater(Consumer<UnaryOperator<TrackerStatus>> segmentStatusUpdater) {
            this.segmentStatusUpdater = segmentStatusUpdater;
            return this;
        }

        Builder clock(Clock clock) {
            this.clock = clock;
            return this;
        }

        Builder schedulingProcessingContextProvider(@Nonnull Supplier<ProcessingContext> schedulingProcessingContextProvider) {
            Objects.requireNonNull(schedulingProcessingContextProvider, "schedulingProcessingContextProvider may not be null.");
            this.schedulingProcessingContextProvider = schedulingProcessingContextProvider;
            return this;
        }

        WorkPackage build() {
            return new WorkPackage(this);
        }
    }

    @FunctionalInterface
    static interface EventFilter {
        public boolean canHandle(EventMessage var1, ProcessingContext var2, Segment var3) throws Exception;
    }

    @FunctionalInterface
    static interface BatchProcessor {
        public MessageStream.Empty<Message> process(@Nonnull List<? extends EventMessage> var1, @Nonnull ProcessingContext var2);
    }

    private static class BatchProcessingEntry
    implements ProcessingEntry {
        private final List<ProcessingEntry> processingEntries = new ArrayList<ProcessingEntry>();

        public void add(ProcessingEntry processingEntry) {
            this.processingEntries.add(processingEntry);
        }

        @Override
        public TrackingToken trackingToken() {
            return this.processingEntries.getFirst().trackingToken();
        }

        @Override
        public void addToBatch(List<EventMessage> eventBatch) {
            this.processingEntries.forEach(entry -> entry.addToBatch(eventBatch));
        }
    }

    private static class DefaultProcessingEntry
    implements ProcessingEntry {
        private final MessageStream.Entry<? extends EventMessage> eventEntry;
        private final boolean canHandle;

        public DefaultProcessingEntry(MessageStream.Entry<? extends EventMessage> eventEntry, boolean canHandle) {
            this.eventEntry = eventEntry;
            this.canHandle = canHandle;
        }

        @Override
        public TrackingToken trackingToken() {
            return TrackingToken.fromContext(this.eventEntry).orElse(null);
        }

        @Override
        public void addToBatch(List<EventMessage> eventBatch) {
            if (this.canHandle) {
                eventBatch.add(this.eventEntry.message());
            }
        }
    }

    private static interface ProcessingEntry {
        public TrackingToken trackingToken();

        public void addToBatch(List<EventMessage> var1);
    }
}

