/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonNonTransientException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ExceptionUtils;
import org.axonframework.common.ProcessUtils;
import org.axonframework.common.io.IOUtils;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.eventhandling.AddedTrackerStatus;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.EventTrackerStatusChangeListener;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.MergedTrackingToken;
import org.axonframework.eventhandling.RemovedTrackerStatus;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.WrappedToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.lifecycle.ShutdownHandler;
import org.axonframework.lifecycle.StartHandler;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TrackingEventProcessor
extends AbstractEventProcessor
implements StreamingEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TrackingEventProcessor.class);
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource;
    private final TokenStore tokenStore;
    private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenBuilder;
    private final TransactionManager transactionManager;
    private final int batchSize;
    private final int segmentsSize;
    private final ThreadFactory threadFactory;
    private final Map<String, Thread> workerThreads = new ConcurrentSkipListMap<String, Thread>();
    private final long workerTerminationTimeout;
    private final AtomicReference<State> state = new AtomicReference<State>(State.NOT_STARTED);
    private final AtomicBoolean workLauncherRunning = new AtomicBoolean(false);
    private final ConcurrentMap<Integer, TrackerStatus> activeSegments = new ConcurrentSkipListMap<Integer, TrackerStatus>();
    private final ConcurrentMap<Integer, Long> segmentReleaseDeadlines = new ConcurrentSkipListMap<Integer, Long>();
    private final String segmentIdResourceKey;
    private final String lastTokenResourceKey;
    private final AtomicInteger availableThreads;
    private final long tokenClaimInterval;
    private final AtomicReference<String> tokenStoreIdentifier = new AtomicReference();
    private final ConcurrentMap<Integer, List<Instruction>> instructions = new ConcurrentHashMap<Integer, List<Instruction>>();
    private final boolean storeTokenBeforeProcessing;
    private final int eventAvailabilityTimeout;
    private final EventTrackerStatusChangeListener trackerStatusChangeListener;

    public static Builder builder() {
        return new Builder();
    }

    protected TrackingEventProcessor(Builder builder) {
        super(builder);
        TrackingEventProcessorConfiguration config = builder.trackingEventProcessorConfiguration;
        this.tokenClaimInterval = config.getTokenClaimInterval();
        this.eventAvailabilityTimeout = config.getEventAvailabilityTimeout();
        this.storeTokenBeforeProcessing = builder.storeTokenBeforeProcessing;
        this.batchSize = config.getBatchSize();
        this.messageSource = builder.messageSource;
        this.tokenStore = builder.tokenStore;
        this.segmentsSize = config.getInitialSegmentsCount();
        this.transactionManager = builder.transactionManager;
        this.availableThreads = new AtomicInteger(config.getMaxThreadCount());
        this.threadFactory = config.getThreadFactory(builder.name);
        this.workerTerminationTimeout = config.getWorkerTerminationTimeout();
        this.segmentIdResourceKey = "Processor[" + builder.name + "]/SegmentId";
        this.lastTokenResourceKey = "Processor[" + builder.name + "]/Token";
        this.initialTrackingTokenBuilder = config.getInitialTrackingToken();
        this.trackerStatusChangeListener = config.getEventTrackerStatusChangeListener();
        this.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            if (!(unitOfWork instanceof BatchingUnitOfWork) || ((BatchingUnitOfWork)unitOfWork).isFirstMessage()) {
                Instant startTime = this.now();
                TrackingToken lastToken = (TrackingToken)unitOfWork.getResource(this.lastTokenResourceKey);
                if (this.storeTokenBeforeProcessing) {
                    this.tokenStore.storeToken(lastToken, builder.name, (Integer)unitOfWork.getResource(this.segmentIdResourceKey));
                } else {
                    this.tokenStore.extendClaim(this.getName(), (Integer)unitOfWork.getResource(this.segmentIdResourceKey));
                }
                unitOfWork.onPrepareCommit(uow -> {
                    if (!this.storeTokenBeforeProcessing) {
                        this.tokenStore.storeToken(lastToken, builder.name, (Integer)unitOfWork.getResource(this.segmentIdResourceKey));
                    } else if (this.now().isAfter(startTime.plusMillis(this.eventAvailabilityTimeout))) {
                        this.tokenStore.extendClaim(this.getName(), (Integer)unitOfWork.getResource(this.segmentIdResourceKey));
                    }
                });
            }
            return interceptorChain.proceed();
        });
    }

    private Instant now() {
        return GenericEventMessage.clock.instant();
    }

    @Override
    @StartHandler(phase=0x3FFFFFFF)
    public void start() {
        if (this.activeProcessorThreads() > 0 || this.workLauncherRunning.get()) {
            if (this.state.get().isRunning()) {
                return;
            }
            throw new IllegalStateException("Cannot start this processor. It is pending shutdown...");
        }
        State previousState = this.state.getAndSet(State.STARTED);
        if (!previousState.isRunning()) {
            this.startSegmentWorkers();
        }
    }

    @Override
    public CompletableFuture<Boolean> splitSegment(int segmentId) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        if (!this.tokenStore.requiresExplicitSegmentInitialization()) {
            result.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely split tokens"));
            return result;
        }
        if (!this.activeSegments.containsKey(segmentId)) {
            return CompletableFuture.completedFuture(false);
        }
        this.instructions.computeIfAbsent(segmentId, i -> new CopyOnWriteArrayList()).add(new SplitSegmentInstruction(result, segmentId));
        return result;
    }

    @Override
    public String getTokenStoreIdentifier() {
        return this.tokenStoreIdentifier.updateAndGet(i -> i != null ? i : this.calculateIdentifier());
    }

    private String calculateIdentifier() {
        return this.transactionManager.fetchInTransaction(() -> this.tokenStore.retrieveStorageIdentifier().orElse("--unknown--"));
    }

    @Override
    public CompletableFuture<Boolean> mergeSegment(int segmentId) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        if (!this.tokenStore.requiresExplicitSegmentInitialization()) {
            result.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely merge tokens"));
            return result;
        }
        TrackerStatus segmentStatus = (TrackerStatus)this.activeSegments.get(segmentId);
        if (segmentStatus == null) {
            return CompletableFuture.completedFuture(false);
        }
        if (segmentId == segmentStatus.getSegment().mergeableSegmentId()) {
            logger.info("A merge request can only be fulfilled if there is more than one segment");
            return CompletableFuture.completedFuture(false);
        }
        int segmentToMerge = segmentStatus.getSegment().mergeableSegmentId();
        this.instructions.computeIfAbsent(segmentId, i -> new CopyOnWriteArrayList()).add(new MergeSegmentInstruction(result, segmentId, segmentToMerge));
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void processingLoop(Segment segment) {
        BlockingStream<TrackedEventMessage<?>> eventStream = null;
        long errorWaitTime = 1L;
        try {
            while (this.state.get().isRunning() && !this.processInstructions(segment.getSegmentId()) && this.canClaimSegment(segment.getSegmentId())) {
                try {
                    eventStream = this.ensureEventStreamOpened(eventStream, segment);
                    this.processBatch(segment, eventStream);
                    errorWaitTime = 1L;
                    TrackerStatus trackerStatus = (TrackerStatus)this.activeSegments.get(segment.getSegmentId());
                    if (!trackerStatus.isErrorState()) continue;
                    TrackerStatus validStatus = this.activeSegments.computeIfPresent(segment.getSegmentId(), (k, v) -> v.unmarkError());
                    this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(segment.getSegmentId(), validStatus));
                }
                catch (UnableToClaimTokenException e) {
                    logger.info("Segment is owned by another node. Releasing thread to process another segment...");
                    this.releaseSegment(segment.getSegmentId());
                }
                catch (Exception e) {
                    if (errorWaitTime == 1L) {
                        logger.warn("Error occurred. Starting retry mode.", (Throwable)e);
                    }
                    logger.warn("Releasing claim on token and preparing for retry in {}s", (Object)errorWaitTime);
                    TrackerStatus trackerStatus = (TrackerStatus)this.activeSegments.get(segment.getSegmentId());
                    if (!trackerStatus.isErrorState()) {
                        TrackerStatus errorStatus = this.activeSegments.computeIfPresent(segment.getSegmentId(), (k, v) -> v.markError(e));
                        this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(segment.getSegmentId(), errorStatus));
                    }
                    this.releaseToken(segment);
                    IOUtils.closeQuietly(eventStream);
                    eventStream = null;
                    this.doSleepFor(TimeUnit.SECONDS.toMillis(errorWaitTime));
                    errorWaitTime = Math.min(errorWaitTime * 2L, 60L);
                }
            }
            return;
        }
        finally {
            IOUtils.closeQuietly(eventStream);
            this.releaseToken(segment);
        }
    }

    private boolean processInstructions(int segmentId) {
        List toExecute = this.instructions.getOrDefault(segmentId, Collections.emptyList());
        boolean instructionsPresent = !toExecute.isEmpty();
        for (Instruction instruction : toExecute) {
            toExecute.remove(instruction);
            instruction.run();
        }
        return instructionsPresent;
    }

    private void releaseToken(Segment segment) {
        try {
            this.transactionManager.executeInTransaction(() -> this.tokenStore.releaseClaim(this.getName(), segment.getSegmentId()));
            logger.info("Released claim");
        }
        catch (Exception e) {
            logger.info("Release claim failed", (Throwable)e);
        }
    }

    protected Set<Segment> processingSegments(TrackingToken token, Segment segment) {
        Optional<MergedTrackingToken> mergedToken = WrappedToken.unwrap(token, MergedTrackingToken.class);
        if (mergedToken.isPresent()) {
            Segment[] splitSegments = segment.split();
            TreeSet<Segment> segments = new TreeSet<Segment>();
            if (mergedToken.get().isLowerSegmentAdvanced()) {
                segments.addAll(this.processingSegments(mergedToken.get().lowerSegmentToken(), splitSegments[0]));
            }
            if (mergedToken.get().isUpperSegmentAdvanced()) {
                segments.addAll(this.processingSegments(mergedToken.get().upperSegmentToken(), splitSegments[1]));
            }
            return segments;
        }
        return Collections.singleton(segment);
    }

    private void processBatch(Segment segment, BlockingStream<TrackedEventMessage<?>> eventStream) throws Exception {
        ArrayList batch = new ArrayList();
        try {
            TrackingToken finalLastToken;
            TrackerStatus updatedStatus;
            TrackerStatus previousStatus;
            TrackingToken lastToken = null;
            Set<Segment> processingSegments = Collections.emptySet();
            long processingDeadline = this.now().toEpochMilli() + (long)this.eventAvailabilityTimeout;
            long processingTime = this.eventAvailabilityTimeout;
            while (batch.isEmpty() && processingTime > 0L && eventStream.hasNextAvailable((int)processingTime, TimeUnit.MILLISECONDS)) {
                processingTime = processingDeadline - this.now().toEpochMilli();
                TrackedEventMessage<?> firstMessage = eventStream.nextAvailable();
                if (this.canHandle(firstMessage, processingSegments = this.processingSegments(lastToken = firstMessage.trackingToken(), segment))) {
                    batch.add(firstMessage);
                } else {
                    this.ignoreEvent(eventStream, firstMessage);
                }
                for (int i = 0; this.isRegularProcessing(segment, processingSegments) && i < this.batchSize * 10 && batch.size() < this.batchSize && eventStream.peek().map(m -> this.isRegularProcessing(segment, (TrackedEventMessage<?>)m)).orElse(false).booleanValue(); ++i) {
                    TrackedEventMessage<?> trackedEventMessage = eventStream.nextAvailable();
                    lastToken = trackedEventMessage.trackingToken();
                    if (this.canHandle(trackedEventMessage, processingSegments)) {
                        batch.add(trackedEventMessage);
                        continue;
                    }
                    this.ignoreEvent(eventStream, trackedEventMessage);
                }
                if (!batch.isEmpty()) continue;
                TrackingToken finalLastToken2 = lastToken;
                previousStatus = (TrackerStatus)this.activeSegments.get(segment.getSegmentId());
                if (!previousStatus.isDifferent(updatedStatus = this.activeSegments.computeIfPresent(segment.getSegmentId(), (k, v) -> v.advancedTo(finalLastToken2)), this.trackerStatusChangeListener.validatePositions())) continue;
                this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(segment.getSegmentId(), updatedStatus));
            }
            if (lastToken == null) {
                this.checkSegmentCaughtUp(segment, eventStream);
                this.transactionManager.executeInTransaction(() -> this.tokenStore.extendClaim(this.getName(), segment.getSegmentId()));
                return;
            }
            if (batch.isEmpty()) {
                finalLastToken = lastToken;
                this.transactionManager.executeInTransaction(() -> this.tokenStore.storeToken(finalLastToken, this.getName(), segment.getSegmentId()));
                return;
            }
            finalLastToken = lastToken;
            while (eventStream.peek().filter(event -> finalLastToken.equals(event.trackingToken())).isPresent()) {
                TrackedEventMessage<?> trackedEventMessage = eventStream.nextAvailable();
                if (this.canHandle(trackedEventMessage, processingSegments)) {
                    batch.add(trackedEventMessage);
                    continue;
                }
                this.ignoreEvent(eventStream, trackedEventMessage);
            }
            BatchingUnitOfWork unitOfWork = new BatchingUnitOfWork(batch);
            unitOfWork.attachTransaction(this.transactionManager);
            unitOfWork.resources().put(this.segmentIdResourceKey, segment.getSegmentId());
            unitOfWork.resources().put(this.lastTokenResourceKey, finalLastToken);
            this.processInUnitOfWork(batch, unitOfWork, processingSegments);
            previousStatus = (TrackerStatus)this.activeSegments.get(segment.getSegmentId());
            updatedStatus = this.activeSegments.computeIfPresent(segment.getSegmentId(), (k, v) -> v.advancedTo(finalLastToken));
            if (previousStatus.isDifferent(updatedStatus, this.trackerStatusChangeListener.validatePositions())) {
                this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(segment.getSegmentId(), updatedStatus));
            }
            this.checkSegmentCaughtUp(segment, eventStream);
        }
        catch (InterruptedException e) {
            if (this.isRunning()) {
                logger.error(String.format("Event processor [%s] was interrupted. Shutting down.", this.getName()), (Throwable)e);
                this.setShutdownState();
            }
            Thread.currentThread().interrupt();
        }
    }

    private void ignoreEvent(BlockingStream<TrackedEventMessage<?>> eventStream, TrackedEventMessage<?> trackedEventMessage) {
        if (!this.canHandleType(trackedEventMessage.getPayloadType())) {
            eventStream.skipMessagesWithPayloadTypeOf(trackedEventMessage);
        }
        this.reportIgnored(trackedEventMessage);
    }

    protected boolean canHandle(EventMessage<?> eventMessage, Collection<Segment> segments) throws Exception {
        for (Segment segment : segments) {
            if (!this.canHandle(eventMessage, segment)) continue;
            return true;
        }
        return false;
    }

    private boolean isRegularProcessing(Segment segment, TrackedEventMessage<?> nextMessage) {
        return nextMessage != null && this.isRegularProcessing(segment, this.processingSegments(nextMessage.trackingToken(), segment));
    }

    private boolean isRegularProcessing(Segment segment, Collection<Segment> processingSegments) {
        return processingSegments.size() == 1 && Objects.equals(processingSegments.iterator().next(), segment);
    }

    private void checkSegmentCaughtUp(Segment segment, BlockingStream<TrackedEventMessage<?>> eventStream) {
        TrackerStatus previousStatus;
        if (!eventStream.hasNextAvailable() && !(previousStatus = (TrackerStatus)this.activeSegments.get(segment.getSegmentId())).isCaughtUp()) {
            TrackerStatus updatedStates = this.activeSegments.computeIfPresent(segment.getSegmentId(), (k, v) -> v.caughtUp());
            this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(segment.getSegmentId(), updatedStates));
        }
    }

    private BlockingStream<TrackedEventMessage<?>> ensureEventStreamOpened(BlockingStream<TrackedEventMessage<?>> eventStreamIn, Segment segment) {
        BlockingStream eventStream = eventStreamIn;
        if (eventStream == null && this.state.get().isRunning()) {
            TrackingToken trackingToken = this.transactionManager.fetchInTransaction(() -> this.tokenStore.fetchToken(this.getName(), segment.getSegmentId()));
            logger.info("Fetched token: {} for segment: {}", (Object)trackingToken, (Object)segment);
            eventStream = this.transactionManager.fetchInTransaction(() -> this.doOpenStream(trackingToken));
        }
        return eventStream;
    }

    private BlockingStream<TrackedEventMessage<?>> doOpenStream(TrackingToken trackingToken) {
        if (trackingToken instanceof WrappedToken) {
            return new WrappedMessageStream((WrappedToken)trackingToken, this.messageSource.openStream(WrappedToken.unwrapLowerBound(trackingToken)));
        }
        return this.messageSource.openStream(WrappedToken.unwrapLowerBound(trackingToken));
    }

    @Override
    public void releaseSegment(int segmentId) {
        this.releaseSegment(segmentId, this.tokenClaimInterval * 2L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void releaseSegment(int segmentId, long releaseDuration, TimeUnit unit) {
        this.segmentReleaseDeadlines.put(segmentId, System.currentTimeMillis() + unit.toMillis(releaseDuration));
    }

    private boolean canClaimSegment(int segmentId) {
        return this.segmentReleaseDeadlines.getOrDefault(segmentId, Long.MIN_VALUE) < System.currentTimeMillis();
    }

    @Override
    public void resetTokens() {
        this.resetTokens(this.initialTrackingTokenBuilder);
    }

    @Override
    public <R> void resetTokens(R resetContext) {
        this.resetTokens(this.initialTrackingTokenBuilder, resetContext);
    }

    @Override
    public void resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenSupplier) {
        this.resetTokens(initialTrackingTokenSupplier.apply(this.messageSource));
    }

    @Override
    public <R> void resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenSupplier, R resetContext) {
        this.resetTokens(initialTrackingTokenSupplier.apply(this.messageSource), resetContext);
    }

    @Override
    public void resetTokens(TrackingToken startPosition) {
        this.resetTokens(startPosition, null);
    }

    @Override
    public <R> void resetTokens(TrackingToken startPosition, R resetContext) {
        Assert.state(this.supportsReset(), () -> "The handlers assigned to this Processor do not support a reset");
        Assert.state(!this.isRunning() && this.activeProcessorThreads() == 0 && !this.workLauncherRunning.get(), () -> "TrackingProcessor must be shut down before triggering a reset");
        this.transactionManager.executeInTransaction(() -> {
            int i;
            int[] segments = this.tokenStore.fetchSegments(this.getName());
            TrackingToken[] tokens = new TrackingToken[segments.length];
            for (i = 0; i < segments.length; ++i) {
                tokens[i] = this.tokenStore.fetchToken(this.getName(), segments[i]);
            }
            this.eventHandlerInvoker().performReset(resetContext);
            for (i = 0; i < tokens.length; ++i) {
                this.tokenStore.storeToken(ReplayToken.createReplayToken(tokens[i], startPosition), this.getName(), segments[i]);
            }
        });
    }

    @Override
    public boolean supportsReset() {
        return this.eventHandlerInvoker().supportsReset();
    }

    @Override
    public boolean isRunning() {
        return this.state.get().isRunning();
    }

    @Override
    public boolean isError() {
        return this.state.get() == State.PAUSED_ERROR;
    }

    public StreamableMessageSource<? extends TrackedEventMessage<?>> getMessageSource() {
        return this.messageSource;
    }

    @Override
    public void shutDown() {
        this.setShutdownState();
        this.awaitTermination().join();
    }

    @Override
    @ShutdownHandler(phase=0x3FFFFFFF)
    public CompletableFuture<Void> shutdownAsync() {
        this.setShutdownState();
        return this.awaitTermination();
    }

    private void setShutdownState() {
        if (this.state.getAndSet(State.SHUT_DOWN).isRunning()) {
            logger.info("Shutdown state set for Processor '{}'.", (Object)this.getName());
        }
    }

    private CompletableFuture<Void> awaitTermination() {
        if (this.activeProcessorThreads() <= 0 && !this.workLauncherRunning.get()) {
            return CompletableFuture.completedFuture(null);
        }
        logger.info("Processor '{}' awaiting termination...", (Object)this.getName());
        return this.workerThreads.entrySet().stream().map(worker -> CompletableFuture.runAsync(() -> {
            try {
                Thread workerThread = (Thread)worker.getValue();
                workerThread.join(this.workerTerminationTimeout);
                if (workerThread.isAlive()) {
                    workerThread.interrupt();
                    workerThread.join(this.workerTerminationTimeout);
                }
            }
            catch (InterruptedException e) {
                logger.info("Thread was interrupted waiting for TrackingProcessor Worker '{}' shutdown.", worker.getKey());
                Thread.currentThread().interrupt();
            }
        })).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).orElse(CompletableFuture.completedFuture(null));
    }

    public int availableProcessorThreads() {
        return this.availableThreads.get();
    }

    @Override
    public int maxCapacity() {
        return this.availableProcessorThreads();
    }

    public int activeProcessorThreads() {
        return this.activeSegments.size();
    }

    @Override
    public Map<Integer, EventTrackerStatus> processingStatus() {
        return Collections.unmodifiableMap(this.activeSegments);
    }

    protected State getState() {
        return this.state.get();
    }

    protected void startSegmentWorkers() {
        this.spawnWorkerThread(new WorkerLauncher()).start();
    }

    private Thread spawnWorkerThread(Worker worker) {
        Thread workerThread = this.threadFactory.newThread(worker);
        this.workerThreads.put(worker.name(), workerThread);
        return workerThread;
    }

    protected void doSleepFor(long millisToSleep) {
        long deadline = System.currentTimeMillis() + millisToSleep;
        try {
            long timeLeft;
            while (this.getState().isRunning() && (timeLeft = deadline - System.currentTimeMillis()) > 0L) {
                Thread.sleep(Math.min(timeLeft, 100L));
            }
        }
        catch (InterruptedException e) {
            logger.warn("Thread interrupted. Preparing to shut down event processor");
            this.shutDown();
            Thread.currentThread().interrupt();
        }
    }

    private static interface Worker
    extends Runnable {
        public String name();

        public void cleanUp();
    }

    private class MergeSegmentInstruction
    extends Instruction {
        private final int segmentId;
        private final int otherSegment;

        public MergeSegmentInstruction(CompletableFuture<Boolean> result, int ownedSegment, int otherSegment) {
            super(result);
            this.segmentId = ownedSegment;
            this.otherSegment = otherSegment;
        }

        @Override
        protected boolean runSafe() {
            logger.info("Processing merge instruction for segments [{}] and [{}] in processor [{}]", new Object[]{this.segmentId, this.otherSegment, TrackingEventProcessor.this.getName()});
            TrackingEventProcessor.this.releaseSegment(this.otherSegment);
            while (TrackingEventProcessor.this.activeSegments.containsKey(this.otherSegment)) {
                Thread.yield();
            }
            TrackerStatus status = (TrackerStatus)TrackingEventProcessor.this.activeSegments.get(this.segmentId);
            TrackingToken otherToken = TrackingEventProcessor.this.tokenStore.fetchToken(TrackingEventProcessor.this.getName(), this.otherSegment);
            Segment segmentToMergeWith = Segment.computeSegment(this.otherSegment, TrackingEventProcessor.this.tokenStore.fetchSegments(TrackingEventProcessor.this.getName()));
            if (!status.getSegment().isMergeableWith(segmentToMergeWith)) {
                TrackingEventProcessor.this.tokenStore.releaseClaim(TrackingEventProcessor.this.getName(), this.otherSegment);
                return false;
            }
            Segment newSegment = status.getSegment().mergedWith(segmentToMergeWith);
            int tokenToDelete = newSegment.getSegmentId() == this.segmentId ? this.otherSegment : this.segmentId;
            TrackingEventProcessor.this.tokenStore.deleteToken(TrackingEventProcessor.this.getName(), tokenToDelete);
            MergedTrackingToken mergedToken = this.otherSegment < this.segmentId ? new MergedTrackingToken(otherToken, status.getInternalTrackingToken()) : new MergedTrackingToken(status.getInternalTrackingToken(), otherToken);
            TrackingEventProcessor.this.tokenStore.storeToken(mergedToken, TrackingEventProcessor.this.getName(), newSegment.getSegmentId());
            return true;
        }
    }

    private class SplitSegmentInstruction
    extends Instruction {
        private final int segmentId;

        public SplitSegmentInstruction(CompletableFuture<Boolean> result, int segmentId) {
            super(result);
            this.segmentId = segmentId;
        }

        @Override
        protected boolean runSafe() {
            logger.info("Processing split instruction for segment [{}] in processor [{}]", (Object)this.segmentId, (Object)TrackingEventProcessor.this.getName());
            TrackerStatus status = (TrackerStatus)TrackingEventProcessor.this.activeSegments.get(this.segmentId);
            TrackerStatus[] newStatus = status.split();
            int newSegmentId = newStatus[1].getSegment().getSegmentId();
            TrackingEventProcessor.this.tokenStore.initializeSegment(newStatus[1].getTrackingToken(), TrackingEventProcessor.this.getName(), newSegmentId);
            TrackingEventProcessor.this.activeSegments.put(this.segmentId, newStatus[0]);
            return true;
        }
    }

    private static class WrappedMessageStream
    implements BlockingStream<TrackedEventMessage<?>> {
        private final BlockingStream<TrackedEventMessage<?>> delegate;
        private WrappedToken lastToken;

        public WrappedMessageStream(WrappedToken token, BlockingStream<TrackedEventMessage<?>> delegate) {
            this.delegate = delegate;
            this.lastToken = token;
        }

        @Override
        public Optional<TrackedEventMessage<?>> peek() {
            return this.delegate.peek();
        }

        @Override
        public boolean hasNextAvailable(int timeout, TimeUnit unit) throws InterruptedException {
            return this.delegate.hasNextAvailable(timeout, unit);
        }

        @Override
        public TrackedEventMessage<?> nextAvailable() throws InterruptedException {
            TrackedEventMessage<?> trackedEventMessage = this.alterToken(this.delegate.nextAvailable());
            this.lastToken = trackedEventMessage.trackingToken() instanceof WrappedToken ? (WrappedToken)trackedEventMessage.trackingToken() : null;
            return trackedEventMessage;
        }

        @Override
        public void close() {
            this.delegate.close();
        }

        @Override
        public boolean hasNextAvailable() {
            return this.delegate.hasNextAvailable();
        }

        public <T> TrackedEventMessage<T> alterToken(TrackedEventMessage<T> message) {
            if (this.lastToken == null) {
                return message;
            }
            return message.withTrackingToken(this.lastToken.advancedTo(message.trackingToken()));
        }
    }

    private class WorkerLauncher
    implements Worker {
        private WorkerLauncher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                int waitTime = 1;
                String processorName = TrackingEventProcessor.this.getName();
                while (TrackingEventProcessor.this.getState().isRunning()) {
                    int[] tokenStoreCurrentSegments;
                    TrackingEventProcessor.this.workLauncherRunning.set(true);
                    try {
                        tokenStoreCurrentSegments = TrackingEventProcessor.this.transactionManager.fetchInTransaction(() -> TrackingEventProcessor.this.tokenStore.fetchSegments(processorName));
                        if (tokenStoreCurrentSegments.length == 0 && TrackingEventProcessor.this.segmentsSize > 0) {
                            tokenStoreCurrentSegments = TrackingEventProcessor.this.transactionManager.fetchInTransaction(() -> {
                                TrackingToken initialToken = (TrackingToken)TrackingEventProcessor.this.initialTrackingTokenBuilder.apply(TrackingEventProcessor.this.messageSource);
                                TrackingEventProcessor.this.tokenStore.initializeTokenSegments(processorName, TrackingEventProcessor.this.segmentsSize, initialToken);
                                return TrackingEventProcessor.this.tokenStore.fetchSegments(processorName);
                            });
                        }
                        waitTime = 1;
                    }
                    catch (Exception e) {
                        if (waitTime == 1) {
                            logger.warn("Fetch Segments for Processor '{}' failed: {}. Preparing for retry in {}s", new Object[]{processorName, e.getMessage(), waitTime, e});
                        } else {
                            logger.info("Fetching Segments for Processor '{}' still failing: {}. Preparing for retry in {}s", new Object[]{processorName, e.getMessage(), waitTime});
                        }
                        TrackingEventProcessor.this.doSleepFor(TimeUnit.SECONDS.toMillis(waitTime));
                        waitTime = Math.min(waitTime * 2, 60);
                        continue;
                    }
                    TrackingSegmentWorker workingInCurrentThread = null;
                    for (int i = 0; i < tokenStoreCurrentSegments.length && TrackingEventProcessor.this.availableThreads.get() > 0; ++i) {
                        TrackerStatus removedStatus;
                        int segmentId = tokenStoreCurrentSegments[i];
                        if (TrackingEventProcessor.this.activeSegments.containsKey(segmentId) || !TrackingEventProcessor.this.canClaimSegment(segmentId)) continue;
                        try {
                            TrackingEventProcessor.this.transactionManager.executeInTransaction(() -> {
                                TrackingToken token = TrackingEventProcessor.this.tokenStore.fetchToken(processorName, segmentId);
                                int[] segmentIds = TrackingEventProcessor.this.tokenStore.fetchSegments(processorName);
                                Segment segment = Segment.computeSegment(segmentId, segmentIds);
                                logger.info("Worker assigned to segment {} for processing", (Object)segment);
                                TrackerStatus newStatus = new TrackerStatus(segment, token);
                                TrackerStatus previousStatus = TrackingEventProcessor.this.activeSegments.putIfAbsent(segmentId, newStatus);
                                if (previousStatus == null) {
                                    TrackingEventProcessor.this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(segmentId, new AddedTrackerStatus(newStatus)));
                                }
                            });
                        }
                        catch (UnableToClaimTokenException ucte) {
                            logger.debug("Unable to claim the token for segment: {}. It is owned by another process", (Object)segmentId);
                            removedStatus = (TrackerStatus)TrackingEventProcessor.this.activeSegments.remove(segmentId);
                            if (removedStatus == null) continue;
                            TrackingEventProcessor.this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(segmentId, new RemovedTrackerStatus(removedStatus)));
                            continue;
                        }
                        catch (Exception e) {
                            removedStatus = (TrackerStatus)TrackingEventProcessor.this.activeSegments.remove(segmentId);
                            if (removedStatus != null) {
                                TrackingEventProcessor.this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(segmentId, new RemovedTrackerStatus(removedStatus)));
                            }
                            if (AxonNonTransientException.isCauseOf(e)) {
                                logger.error("An unrecoverable error has occurred wile attempting to claim a token for segment: {}. Shutting down processor [{}].", new Object[]{segmentId, TrackingEventProcessor.this.getName(), e});
                                TrackingEventProcessor.this.state.set(State.PAUSED_ERROR);
                                break;
                            }
                            logger.info("An error occurred while attempting to claim a token for segment: {}. Will retry later...", (Object)segmentId, (Object)e);
                            break;
                        }
                        TrackingSegmentWorker trackingSegmentWorker = new TrackingSegmentWorker(((TrackerStatus)TrackingEventProcessor.this.activeSegments.get(segmentId)).getSegment());
                        if (TrackingEventProcessor.this.availableThreads.decrementAndGet() > 0) {
                            logger.info("Dispatching new tracking segment worker: {}", (Object)trackingSegmentWorker);
                            TrackingEventProcessor.this.spawnWorkerThread(trackingSegmentWorker).start();
                            continue;
                        }
                        workingInCurrentThread = trackingSegmentWorker;
                        break;
                    }
                    if (Objects.nonNull(workingInCurrentThread)) {
                        logger.info("Using current Thread for last segment worker: {}", workingInCurrentThread);
                        TrackingEventProcessor.this.workLauncherRunning.set(false);
                        workingInCurrentThread.run();
                        return;
                    }
                    TrackingEventProcessor.this.doSleepFor(TrackingEventProcessor.this.tokenClaimInterval);
                }
                TrackingEventProcessor.this.workLauncherRunning.set(false);
            }
            finally {
                this.cleanUp();
            }
        }

        @Override
        public String name() {
            return WorkerLauncher.class.getSimpleName();
        }

        @Override
        public void cleanUp() {
            TrackingEventProcessor.this.workerThreads.remove(this.name());
        }
    }

    private class TrackingSegmentWorker
    implements Worker {
        private final Segment segment;

        public TrackingSegmentWorker(Segment segment) {
            this.segment = segment;
        }

        @Override
        public void run() {
            try {
                TrackingEventProcessor.this.processingLoop(this.segment);
            }
            catch (Throwable e) {
                logger.error("Processing loop ended due to uncaught exception. Pausing processor in Error State.", e);
                TrackingEventProcessor.this.state.set(State.PAUSED_ERROR);
                throw e;
            }
            finally {
                this.cleanUp();
            }
        }

        public String toString() {
            return "TrackingSegmentWorker{processor=" + TrackingEventProcessor.this.getName() + ", segment=" + this.segment + '}';
        }

        @Override
        public String name() {
            return TrackingSegmentWorker.class.getSimpleName() + this.segment.getSegmentId();
        }

        @Override
        public void cleanUp() {
            TrackerStatus removedStatus = (TrackerStatus)TrackingEventProcessor.this.activeSegments.remove(this.segment.getSegmentId());
            if (removedStatus != null) {
                TrackingEventProcessor.this.trackerStatusChangeListener.onEventTrackerStatusChange(Collections.singletonMap(this.segment.getSegmentId(), new RemovedTrackerStatus(removedStatus)));
            }
            TrackingEventProcessor.this.workerThreads.remove(this.name());
            logger.info("Worker for segment {} stopped.", (Object)this.segment);
            if (!TrackingEventProcessor.this.workLauncherRunning.get() && TrackingEventProcessor.this.availableThreads.getAndIncrement() == 0 && TrackingEventProcessor.this.getState().isRunning()) {
                logger.info("No Worker Launcher active. Using current thread to assign segments.");
                new WorkerLauncher().run();
            }
        }
    }

    private abstract class Instruction
    implements Runnable {
        private final CompletableFuture<Boolean> result;

        public Instruction(CompletableFuture<Boolean> result) {
            this.result = result;
        }

        @Override
        public void run() {
            try {
                ProcessUtils.executeWithRetry(() -> TrackingEventProcessor.this.transactionManager.executeInTransaction(() -> this.result.complete(this.runSafe())), re -> ExceptionUtils.findException((Throwable)re, UnableToClaimTokenException.class).isPresent(), TrackingEventProcessor.this.tokenClaimInterval, TimeUnit.MILLISECONDS, 10L);
            }
            catch (Exception e) {
                this.result.completeExceptionally(e);
            }
        }

        protected abstract boolean runSafe();
    }

    public static class Builder
    extends AbstractEventProcessor.Builder {
        private StreamableMessageSource<TrackedEventMessage<?>> messageSource;
        private TokenStore tokenStore;
        private TransactionManager transactionManager;
        private TrackingEventProcessorConfiguration trackingEventProcessorConfiguration = TrackingEventProcessorConfiguration.forSingleThreadedProcessing();
        private Boolean storeTokenBeforeProcessing;

        public Builder() {
            super.rollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE);
        }

        @Override
        public Builder name(String name) {
            super.name(name);
            return this;
        }

        @Override
        public Builder eventHandlerInvoker(EventHandlerInvoker eventHandlerInvoker) {
            super.eventHandlerInvoker(eventHandlerInvoker);
            return this;
        }

        @Override
        public Builder rollbackConfiguration(RollbackConfiguration rollbackConfiguration) {
            super.rollbackConfiguration(rollbackConfiguration);
            return this;
        }

        @Override
        public Builder errorHandler(ErrorHandler errorHandler) {
            super.errorHandler(errorHandler);
            return this;
        }

        @Override
        public Builder messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        public Builder messageSource(StreamableMessageSource<TrackedEventMessage<?>> messageSource) {
            BuilderUtils.assertNonNull(messageSource, "StreamableMessageSource may not be null");
            this.messageSource = messageSource;
            return this;
        }

        public Builder tokenStore(TokenStore tokenStore) {
            BuilderUtils.assertNonNull(tokenStore, "TokenStore may not be null");
            this.tokenStore = tokenStore;
            return this;
        }

        public Builder transactionManager(TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "TransactionManager may not be null");
            this.transactionManager = transactionManager;
            if (this.storeTokenBeforeProcessing == null) {
                this.storeTokenBeforeProcessing = transactionManager != NoTransactionManager.instance();
            }
            return this;
        }

        public Builder trackingEventProcessorConfiguration(TrackingEventProcessorConfiguration trackingEventProcessorConfiguration) {
            BuilderUtils.assertNonNull(trackingEventProcessorConfiguration, "TrackingEventProcessorConfiguration may not be null");
            this.trackingEventProcessorConfiguration = trackingEventProcessorConfiguration;
            return this;
        }

        public Builder storingTokensAfterProcessing() {
            this.storeTokenBeforeProcessing = false;
            return this;
        }

        public TrackingEventProcessor build() {
            return new TrackingEventProcessor(this);
        }

        @Override
        protected void validate() throws AxonConfigurationException {
            super.validate();
            if (this.storeTokenBeforeProcessing == null) {
                this.storeTokenBeforeProcessing = false;
            }
            BuilderUtils.assertNonNull(this.messageSource, "The StreamableMessageSource is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.tokenStore, "The TokenStore is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.transactionManager, "The TransactionManager is a hard requirement and should be provided");
        }
    }

    protected static enum State {
        NOT_STARTED(false),
        STARTED(true),
        PAUSED(false),
        SHUT_DOWN(false),
        PAUSED_ERROR(false);

        private final boolean allowProcessing;

        private State(boolean allowProcessing) {
            this.allowProcessing = allowProcessing;
        }

        boolean isRunning() {
            return this.allowProcessing;
        }
    }
}

