/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.axonframework.common.Assert;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.WrappedToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkPackage {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    static final int BUFFER_SIZE = 1024;
    private final String name;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final ExecutorService executorService;
    private final EventFilter eventFilter;
    private final BatchProcessor batchProcessor;
    private final Segment segment;
    private final int batchSize;
    private final long claimExtensionThreshold;
    private final Consumer<UnaryOperator<TrackerStatus>> segmentStatusUpdater;
    private final Clock clock;
    private final String segmentIdResourceKey;
    private final String lastTokenResourceKey;
    private TrackingToken lastDeliveredToken;
    private TrackingToken lastConsumedToken;
    private TrackingToken lastStoredToken;
    private long lastClaimExtension;
    private final Queue<ProcessingEntry> processingQueue = new ConcurrentLinkedQueue<ProcessingEntry>();
    private final AtomicBoolean scheduled = new AtomicBoolean();
    private final AtomicReference<CompletableFuture<Exception>> abortFlag = new AtomicReference();
    private final AtomicReference<Exception> abortException = new AtomicReference();

    protected static Builder builder() {
        return new Builder();
    }

    private WorkPackage(Builder builder) {
        this.name = builder.name;
        this.tokenStore = builder.tokenStore;
        this.transactionManager = builder.transactionManager;
        this.executorService = builder.executorService;
        this.eventFilter = builder.eventFilter;
        this.batchProcessor = builder.batchProcessor;
        this.segment = builder.segment;
        this.lastDeliveredToken = builder.initialToken;
        this.batchSize = builder.batchSize;
        this.claimExtensionThreshold = builder.claimExtensionThreshold;
        this.segmentStatusUpdater = builder.segmentStatusUpdater;
        this.clock = builder.clock;
        this.segmentIdResourceKey = "Processor[" + builder.name + "]/SegmentId";
        this.lastTokenResourceKey = "Processor[" + builder.name + "]/Token";
        this.lastConsumedToken = builder.initialToken;
        this.lastClaimExtension = System.currentTimeMillis();
    }

    public boolean scheduleEvents(List<TrackedEventMessage<?>> events) {
        if (events.isEmpty()) {
            return false;
        }
        this.assertEqualTokens(events);
        if (events.stream().allMatch(this::shouldNotSchedule)) {
            if (logger.isTraceEnabled()) {
                events.forEach(event -> logger.trace("Ignoring event [{}] with position [{}] for work package [{}]. The last token [{}] covers event's token [{}].", new Object[]{event.getIdentifier(), event.trackingToken().position().orElse(-1L), this.segment.getSegmentId(), this.lastDeliveredToken, event.trackingToken()}));
            }
            return false;
        }
        BatchProcessingEntry batchProcessingEntry = new BatchProcessingEntry();
        boolean canHandleAny = events.stream().map(event -> {
            boolean canHandle = this.canHandle((TrackedEventMessage<?>)event);
            batchProcessingEntry.add(new DefaultProcessingEntry((TrackedEventMessage<?>)event, canHandle));
            return canHandle;
        }).reduce(Boolean::logicalOr).orElse(false);
        this.processingQueue.add(batchProcessingEntry);
        this.lastDeliveredToken = batchProcessingEntry.trackingToken();
        this.scheduleWorker();
        return canHandleAny;
    }

    private void assertEqualTokens(List<TrackedEventMessage<?>> events) {
        TrackingToken expectedToken = events.get(0).trackingToken();
        Assert.isTrue(events.stream().map(TrackedEventMessage::trackingToken).allMatch(token -> Objects.equals(expectedToken, token)), () -> "All tokens should match when scheduling multiple events in one go.");
    }

    public boolean scheduleEvent(TrackedEventMessage<?> event) {
        if (this.shouldNotSchedule(event)) {
            logger.trace("Ignoring event [{}] with position [{}] for work package [{}]. The last token [{}] covers event's token [{}].", new Object[]{event.getIdentifier(), event.trackingToken().position().orElse(-1L), this.segment.getSegmentId(), this.lastDeliveredToken, event.trackingToken()});
            return false;
        }
        logger.debug("Assigned event [{}] with position [{}] to work package [{}].", new Object[]{event.getIdentifier(), event.trackingToken().position().orElse(-1L), this.segment.getSegmentId()});
        boolean canHandle = this.canHandle(event);
        this.processingQueue.add(new DefaultProcessingEntry(event, canHandle));
        this.lastDeliveredToken = event.trackingToken();
        this.scheduleWorker();
        return canHandle;
    }

    private boolean shouldNotSchedule(TrackedEventMessage<?> event) {
        return this.lastDeliveredToken != null && this.lastDeliveredToken.covers(event.trackingToken());
    }

    private boolean canHandle(TrackedEventMessage<?> event) {
        try {
            return this.eventFilter.canHandle(event, this.segment);
        }
        catch (Exception e) {
            logger.warn("Error while detecting whether event can be handled in Work Package [{}]-[{}]. Aborting Work Package...", new Object[]{this.segment.getSegmentId(), this.name, e});
            this.abort(e);
            return false;
        }
    }

    public void scheduleWorker() {
        if (!this.scheduled.compareAndSet(false, true)) {
            return;
        }
        logger.debug("Scheduling Work Package [{}]-[{}] to process events.", (Object)this.segment.getSegmentId(), (Object)this.name);
        this.executorService.submit(() -> {
            CompletableFuture<Exception> aborting = this.abortFlag.get();
            if (aborting != null) {
                logger.debug("Work Package [{}]-[{}] should be aborted. Will shutdown this work package.", (Object)this.segment.getSegmentId(), (Object)this.name);
                this.segmentStatusUpdater.accept(previousStatus -> null);
                aborting.complete(this.abortException.get());
                return;
            }
            try {
                this.processEvents();
            }
            catch (Exception e) {
                logger.warn("Error while processing batch in Work Package [{}]-[{}]. Aborting Work Package...", new Object[]{this.segment.getSegmentId(), this.name, e});
                this.abort(e);
            }
            this.scheduled.set(false);
            if (!this.processingQueue.isEmpty() || this.abortFlag.get() != null) {
                logger.debug("Rescheduling Work Package [{}]-[{}] since there are events left.", (Object)this.segment.getSegmentId(), (Object)this.name);
                this.scheduleWorker();
            }
        });
    }

    private void processEvents() throws Exception {
        ArrayList eventBatch = new ArrayList();
        while (!this.isAbortTriggered() && eventBatch.size() < this.batchSize && !this.processingQueue.isEmpty()) {
            ProcessingEntry entry = this.processingQueue.poll();
            this.lastConsumedToken = WrappedToken.advance(this.lastConsumedToken, entry.trackingToken());
            entry.addToBatch(eventBatch, this.lastConsumedToken);
        }
        if (!eventBatch.isEmpty()) {
            logger.debug("Work Package [{}]-[{}] is processing a batch of {} events.", new Object[]{this.segment.getSegmentId(), this.name, eventBatch.size()});
            BatchingUnitOfWork unitOfWork = new BatchingUnitOfWork(eventBatch);
            unitOfWork.attachTransaction(this.transactionManager);
            unitOfWork.resources().put(this.segmentIdResourceKey, this.segment.getSegmentId());
            unitOfWork.resources().put(this.lastTokenResourceKey, this.lastConsumedToken);
            unitOfWork.onPrepareCommit(u -> this.storeToken(this.lastConsumedToken));
            unitOfWork.afterCommit(u -> this.segmentStatusUpdater.accept(status -> status.advancedTo(this.lastConsumedToken)));
            this.batchProcessor.processBatch(eventBatch, unitOfWork, Collections.singleton(this.segment));
        } else {
            this.segmentStatusUpdater.accept(status -> status.advancedTo(this.lastConsumedToken));
            if (this.lastClaimExtension < this.clock.instant().toEpochMilli() - this.claimExtensionThreshold) {
                if (this.lastStoredToken != this.lastConsumedToken) {
                    this.transactionManager.executeInTransaction(() -> this.storeToken(this.lastConsumedToken));
                } else {
                    this.transactionManager.executeInTransaction(this::extendClaim);
                }
            }
        }
    }

    private void extendClaim() {
        logger.debug("Work Package [{}]-[{}] will extend its token claim.", (Object)this.name, (Object)this.segment.getSegmentId());
        this.tokenStore.extendClaim(this.name, this.segment.getSegmentId());
        this.lastClaimExtension = this.clock.instant().toEpochMilli();
    }

    private void storeToken(TrackingToken token) {
        logger.debug("Work Package [{}]-[{}] will store token [{}].", new Object[]{this.name, this.segment.getSegmentId(), token});
        this.tokenStore.storeToken(token, this.name, this.segment.getSegmentId());
        this.lastStoredToken = token;
        this.lastClaimExtension = this.clock.instant().toEpochMilli();
    }

    public boolean hasRemainingCapacity() {
        return this.processingQueue.size() < 1024;
    }

    public boolean isDone() {
        return this.processingQueue.isEmpty() && !this.scheduled.get();
    }

    public Segment segment() {
        return this.segment;
    }

    public TrackingToken lastDeliveredToken() {
        return this.lastDeliveredToken;
    }

    public boolean isAbortTriggered() {
        return this.abortFlag.get() != null;
    }

    public CompletableFuture<Exception> abort(Exception abortReason) {
        if (abortReason != null) {
            logger.debug("Abort request received for Work Package [{}]-[{}].", new Object[]{this.name, this.segment.getSegmentId(), abortReason});
            this.segmentStatusUpdater.accept(status -> {
                if (status != null) {
                    return status.isErrorState() ? status : status.markError(abortReason);
                }
                return null;
            });
        }
        CompletableFuture<Exception> abortTask = this.abortFlag.updateAndGet(currentFlag -> {
            if (currentFlag == null) {
                this.abortException.set(abortReason);
                return new CompletableFuture();
            }
            this.abortException.updateAndGet(currentReason -> currentReason == null ? abortReason : currentReason);
            return currentFlag;
        });
        this.scheduleWorker();
        return abortTask;
    }

    private static class BatchProcessingEntry
    implements ProcessingEntry {
        private final List<ProcessingEntry> processingEntries = new ArrayList<ProcessingEntry>();

        public void add(ProcessingEntry processingEntry) {
            this.processingEntries.add(processingEntry);
        }

        @Override
        public TrackingToken trackingToken() {
            return this.processingEntries.get(0).trackingToken();
        }

        @Override
        public void addToBatch(List<TrackedEventMessage<?>> eventBatch, TrackingToken wrappedToken) {
            this.processingEntries.forEach(entry -> entry.addToBatch(eventBatch, wrappedToken));
        }
    }

    private static class DefaultProcessingEntry
    implements ProcessingEntry {
        private final TrackedEventMessage<?> eventMessage;
        private final boolean canHandle;

        public DefaultProcessingEntry(TrackedEventMessage<?> eventMessage, boolean canHandle) {
            this.eventMessage = eventMessage;
            this.canHandle = canHandle;
        }

        @Override
        public TrackingToken trackingToken() {
            return this.eventMessage.trackingToken();
        }

        @Override
        public void addToBatch(List<TrackedEventMessage<?>> eventBatch, TrackingToken wrappedToken) {
            if (this.canHandle) {
                eventBatch.add(this.eventMessage.withTrackingToken(wrappedToken));
            }
        }
    }

    private static interface ProcessingEntry {
        public TrackingToken trackingToken();

        public void addToBatch(List<TrackedEventMessage<?>> var1, TrackingToken var2);
    }

    static class Builder {
        private String name;
        private TokenStore tokenStore;
        private TransactionManager transactionManager;
        private ExecutorService executorService;
        private EventFilter eventFilter;
        private BatchProcessor batchProcessor;
        private Segment segment;
        private TrackingToken initialToken;
        private int batchSize = 1;
        private long claimExtensionThreshold = 5000L;
        private Consumer<UnaryOperator<TrackerStatus>> segmentStatusUpdater;
        private Clock clock = GenericEventMessage.clock;

        Builder() {
        }

        Builder name(String name) {
            this.name = name;
            return this;
        }

        Builder tokenStore(TokenStore tokenStore) {
            this.tokenStore = tokenStore;
            return this;
        }

        Builder transactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
            return this;
        }

        Builder executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        Builder eventFilter(EventFilter eventFilter) {
            this.eventFilter = eventFilter;
            return this;
        }

        Builder batchProcessor(BatchProcessor batchProcessor) {
            this.batchProcessor = batchProcessor;
            return this;
        }

        Builder segment(Segment segment) {
            this.segment = segment;
            return this;
        }

        Builder initialToken(TrackingToken initialToken) {
            this.initialToken = initialToken;
            return this;
        }

        Builder batchSize(int batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        Builder claimExtensionThreshold(long claimExtensionThreshold) {
            this.claimExtensionThreshold = claimExtensionThreshold;
            return this;
        }

        Builder segmentStatusUpdater(Consumer<UnaryOperator<TrackerStatus>> segmentStatusUpdater) {
            this.segmentStatusUpdater = segmentStatusUpdater;
            return this;
        }

        Builder clock(Clock clock) {
            this.clock = clock;
            return this;
        }

        WorkPackage build() {
            return new WorkPackage(this);
        }
    }

    @FunctionalInterface
    static interface BatchProcessor {
        public void processBatch(List<? extends EventMessage<?>> var1, UnitOfWork<? extends EventMessage<?>> var2, Collection<Segment> var3) throws Exception;
    }

    @FunctionalInterface
    static interface EventFilter {
        public boolean canHandle(TrackedEventMessage<?> var1, Segment var2) throws Exception;
    }
}

