/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.axonframework.common.ProcessUtils;
import org.axonframework.common.io.IOUtils;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.WrappedToken;
import org.axonframework.eventhandling.pooled.ClaimTask;
import org.axonframework.eventhandling.pooled.CoordinatorTask;
import org.axonframework.eventhandling.pooled.MaxSegmentProvider;
import org.axonframework.eventhandling.pooled.MergeTask;
import org.axonframework.eventhandling.pooled.SplitTask;
import org.axonframework.eventhandling.pooled.WorkPackage;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.messaging.StreamableMessageSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Coordinator {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String name;
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final ScheduledExecutorService executorService;
    private final BiFunction<Segment, TrackingToken, WorkPackage> workPackageFactory;
    private final EventFilter eventFilter;
    private final Consumer<? super TrackedEventMessage<?>> ignoredMessageHandler;
    private final BiConsumer<Integer, UnaryOperator<TrackerStatus>> processingStatusUpdater;
    private final long tokenClaimInterval;
    private final long claimExtensionThreshold;
    private final Clock clock;
    private final MaxSegmentProvider maxSegmentProvider;
    private final int initialSegmentCount;
    private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
    private final boolean coordinatorExtendsClaims;
    private final Consumer<Segment> segmentReleasedAction;
    private final Map<Integer, WorkPackage> workPackages = new ConcurrentHashMap<Integer, WorkPackage>();
    private final AtomicReference<RunState> runState;
    private final Map<Integer, Instant> releasesDeadlines = new ConcurrentHashMap<Integer, Instant>();
    private final Map<Integer, Integer> releasesLastBackOffSeconds = new ConcurrentHashMap<Integer, Integer>();
    private int errorWaitBackOff = 500;
    private final Queue<CoordinatorTask> coordinatorTasks = new ConcurrentLinkedQueue<CoordinatorTask>();
    private final AtomicReference<CoordinationTask> coordinationTask = new AtomicReference();
    private final AtomicLong coordinationTaskGeneration = new AtomicLong(-1L);

    protected static Builder builder() {
        return new Builder();
    }

    private Coordinator(Builder builder) {
        this.name = builder.name;
        this.messageSource = builder.messageSource;
        this.tokenStore = builder.tokenStore;
        this.transactionManager = builder.transactionManager;
        this.executorService = builder.executorService;
        this.workPackageFactory = builder.workPackageFactory;
        this.eventFilter = builder.eventFilter;
        this.ignoredMessageHandler = builder.ignoredMessageHandler;
        this.processingStatusUpdater = builder.processingStatusUpdater;
        this.tokenClaimInterval = builder.tokenClaimInterval;
        this.claimExtensionThreshold = builder.claimExtensionThreshold;
        this.clock = builder.clock;
        this.maxSegmentProvider = builder.maxSegmentProvider;
        this.initialSegmentCount = builder.initialSegmentCount;
        this.initialToken = builder.initialToken;
        this.runState = new AtomicReference<RunState>(RunState.initial(builder.shutdownAction));
        this.coordinatorExtendsClaims = builder.coordinatorExtendsClaims;
        this.segmentReleasedAction = builder.segmentReleasedAction;
    }

    public void start() {
        RunState newState = this.runState.updateAndGet(RunState::attemptStart);
        if (newState.wasStarted()) {
            logger.debug("Processor [{}]. Starting Coordinator...", (Object)this.name);
            try {
                ProcessUtils.executeUntilTrue(this::initializeTokenStore, 100L, 30L);
                CoordinationTask task = new CoordinationTask();
                this.executorService.submit(task);
                this.coordinationTask.set(task);
            }
            catch (Exception e) {
                this.runState.updateAndGet(RunState::attemptStop).shutdownHandle().complete(null);
                throw e;
            }
        } else if (!newState.isRunning) {
            throw new IllegalStateException(String.format("Cannot start a processor [%s] while it's in process of shutting down.", this.name));
        }
    }

    public CompletableFuture<Void> stop() {
        logger.debug("Processor [{}]. Stopping Coordinator...", (Object)this.name);
        CompletableFuture<Void> handle = this.runState.updateAndGet(RunState::attemptStop).shutdownHandle();
        CoordinationTask task = this.coordinationTask.getAndSet(null);
        if (task != null) {
            logger.info("Processor [{}]. Stop requested. Scheduling immediate coordination task.", (Object)this.name);
            task.scheduleImmediateCoordinationTask();
        }
        return handle;
    }

    public boolean isRunning() {
        return this.runState.get().isRunning();
    }

    private void scheduleCoordinator() {
        CoordinationTask coordinator = this.coordinationTask.get();
        if (coordinator != null) {
            logger.debug("Processor [{}]. Segment operation queued. Scheduling immediate coordination task.", (Object)this.name);
            coordinator.scheduleImmediateCoordinationTask();
        }
    }

    public boolean isError() {
        return this.errorWaitBackOff > 500;
    }

    public void releaseUntil(int segmentId, Instant releaseDuration) {
        logger.debug("Processor [{}] will release segment {} for processing until {}.", new Object[]{this.name, segmentId, releaseDuration});
        this.releasesDeadlines.put(segmentId, releaseDuration);
        this.scheduleCoordinator();
    }

    public CompletableFuture<Boolean> splitSegment(int segmentId) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.coordinatorTasks.add(new SplitTask(result, this.name, segmentId, this.workPackages, this.releasesDeadlines, this.tokenStore, this.transactionManager, this.clock));
        this.scheduleCoordinator();
        return result;
    }

    public CompletableFuture<Boolean> mergeSegment(int segmentId) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.coordinatorTasks.add(new MergeTask(result, this.name, segmentId, this.workPackages, this.releasesDeadlines, this.tokenStore, this.transactionManager, this.clock));
        this.scheduleCoordinator();
        return result;
    }

    public CompletableFuture<Boolean> claimSegment(int segmentId) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.coordinatorTasks.add(new ClaimTask(result, this.name, segmentId, this.workPackages, this.releasesDeadlines, this.tokenStore, this.transactionManager));
        this.scheduleCoordinator();
        return result;
    }

    private boolean initializeTokenStore() {
        AtomicBoolean tokenStoreInitialized = new AtomicBoolean(false);
        this.transactionManager.executeInTransaction(() -> {
            int[] segments = this.tokenStore.fetchSegments(this.name);
            try {
                if (segments == null || segments.length == 0) {
                    logger.info("Processor [{}]. Initializing ({}) segments", (Object)this.name, (Object)this.initialSegmentCount);
                    this.tokenStore.initializeTokenSegments(this.name, this.initialSegmentCount, this.initialToken.apply(this.messageSource));
                }
                tokenStoreInitialized.set(true);
            }
            catch (Exception e) {
                logger.info("Error while initializing the Token Store. This may simply indicate concurrent attempts to initialize.", (Throwable)e);
            }
        });
        return tokenStoreInitialized.get();
    }

    private class CoordinationTask
    implements Runnable {
        private final AtomicBoolean processingGate = new AtomicBoolean();
        private final AtomicBoolean scheduledGate = new AtomicBoolean();
        private final AtomicBoolean interruptibleScheduledGate = new AtomicBoolean();
        private BlockingStream<TrackedEventMessage<?>> eventStream;
        private TrackingToken lastScheduledToken = NoToken.INSTANCE;
        private boolean availabilityCallbackSupported;
        private long unclaimedSegmentValidationThreshold;
        private final long generation = Coordinator.access$2200(Coordinator.this).incrementAndGet();

        private CoordinationTask() {
        }

        @Override
        public void run() {
            if (!this.processingGate.compareAndSet(false, true)) {
                return;
            }
            long currentTaskGeneration = Coordinator.this.coordinationTaskGeneration.get();
            if (this.generation != currentTaskGeneration) {
                logger.debug("Processor [{}] coordination task is stale (generation {} vs current {}). Exiting.", new Object[]{Coordinator.this.name, this.generation, currentTaskGeneration});
                return;
            }
            if (!((RunState)Coordinator.this.runState.get()).isRunning()) {
                logger.debug("Processor [{}] (Coordination Task [{}]). Stopped processing. Runnable flag is false. Releasing claims and closing the event stream.", (Object)Coordinator.this.name, (Object)this.generation);
                this.abortWorkPackages(null).thenRun(() -> ((RunState)Coordinator.this.runState.get()).shutdownHandle().complete(null));
                IOUtils.closeQuietly(this.eventStream);
                return;
            }
            Coordinator.this.workPackages.entrySet().stream().filter(entry -> this.isSegmentBlockedFromClaim((Integer)entry.getKey())).map(Map.Entry::getValue).forEach(workPackage -> {
                logger.info("Processor [{}] (Coordination Task [{}]) was requested and will comply with releasing claim for segment {}.", new Object[]{Coordinator.this.name, this.generation, workPackage.segment().getSegmentId()});
                this.abortWorkPackage((WorkPackage)workPackage, null);
            });
            if (Coordinator.this.coordinatorExtendsClaims) {
                logger.debug("Processor [{}] (Coordination Task [{}]) will extend the claim of work packages that are busy processing events and have met the claim threshold.", (Object)Coordinator.this.name, (Object)this.generation);
                Coordinator.this.workPackages.values().stream().filter(workPackage -> !workPackage.isAbortTriggered()).filter(WorkPackage::isProcessingEvents).forEach(workPackage -> {
                    try {
                        workPackage.extendClaimIfThresholdIsMet();
                    }
                    catch (Exception e) {
                        logger.warn("Processor [{}] (Coordination Task [{}]). Error while extending claim for Work Package [{}]. Aborting Work Package...", new Object[]{Coordinator.this.name, this.generation, workPackage.segment().getSegmentId(), e});
                        workPackage.abort(e);
                    }
                });
            }
            if (!Coordinator.this.coordinatorTasks.isEmpty()) {
                CoordinatorTask task = (CoordinatorTask)Coordinator.this.coordinatorTasks.remove();
                logger.debug("Processor [{}] (Coordination Task [{}]) found task [{}] to run.", new Object[]{Coordinator.this.name, this.generation, task.getDescription()});
                ((CompletableFuture)task.run().thenRun(() -> {
                    this.unclaimedSegmentValidationThreshold = 0L;
                })).whenComplete((result, exception) -> {
                    this.processingGate.set(false);
                    if (exception != null) {
                        logger.warn("Processor [{}] (Coordination Task [{}]). Task [{}] completed with error. Scheduling immediate coordination task (itself).", new Object[]{Coordinator.this.name, this.generation, task.getDescription(), exception});
                    } else {
                        logger.debug("Processor [{}] (Coordination Task [{}]). Task [{}] completed successfully. Scheduling immediate coordination task (itself).", new Object[]{Coordinator.this.name, this.generation, task.getDescription()});
                    }
                    this.scheduleImmediateCoordinationTask();
                });
                return;
            }
            if (this.eventStream == null || this.unclaimedSegmentValidationThreshold <= Coordinator.this.clock.instant().toEpochMilli()) {
                this.unclaimedSegmentValidationThreshold = Coordinator.this.clock.instant().toEpochMilli() + Coordinator.this.tokenClaimInterval;
                try {
                    TrackingToken streamStartPosition = this.lastScheduledToken;
                    if (!this.releaseSegmentsIfTooManyClaimed()) {
                        logger.debug("Processor [{}] (Coordination Task [{}]) will try to claim new segments.", (Object)Coordinator.this.name, (Object)this.generation);
                        Map<Segment, TrackingToken> newSegments = this.claimNewSegments();
                        for (Map.Entry<Segment, TrackingToken> entry2 : newSegments.entrySet()) {
                            Segment segment = entry2.getKey();
                            TrackingToken token = entry2.getValue();
                            TrackingToken otherUnwrapped = WrappedToken.unwrapLowerBound(token);
                            streamStartPosition = streamStartPosition == null || otherUnwrapped == null ? null : streamStartPosition.lowerBound(otherUnwrapped);
                            Coordinator.this.workPackages.computeIfAbsent(segment.getSegmentId(), wp -> this.createWorkPackage(segment, token));
                        }
                        if (logger.isInfoEnabled() && !newSegments.isEmpty()) {
                            logger.info("Processor [{}] (Coordination Task [{}]) claimed {} new segments for processing.", new Object[]{Coordinator.this.name, this.generation, newSegments.size()});
                        }
                    }
                    this.ensureOpenStream(streamStartPosition);
                }
                catch (Exception e) {
                    logger.warn("Processor [{}] (Coordination Task [{}]). Exception occurred while starting work packages and opening the event stream.", new Object[]{Coordinator.this.name, this.generation, e});
                    this.abortAndScheduleRetry(e);
                    return;
                }
            }
            if (Coordinator.this.workPackages.isEmpty()) {
                logger.debug("Processor [{}] (Coordination Task [{}]). No segments claimed. Scheduling delayed coordination task (itself) with delay of {}ms.", new Object[]{Coordinator.this.name, this.generation, Coordinator.this.tokenClaimInterval});
                this.lastScheduledToken = NoToken.INSTANCE;
                IOUtils.closeQuietly(this.eventStream);
                this.eventStream = null;
                this.processingGate.set(false);
                this.scheduleDelayedCoordinationTask(Coordinator.this.tokenClaimInterval);
                return;
            }
            if (this.eventStream == null) {
                logger.warn("Processor [{}] (Coordination Task [{}]) has [{}] work packages and last scheduled token [{}] but no event stream. Aborting work packages and scheduling retry.", new Object[]{Coordinator.this.name, this.generation, Coordinator.this.workPackages.size(), this.lastScheduledToken});
                this.abortAndScheduleRetry(new IllegalStateException("Event stream is null with [" + Coordinator.this.workPackages.size() + "] active work packages and last scheduled token of [" + this.lastScheduledToken + "]"));
                return;
            }
            try {
                if (!this.eventStream.hasNextAvailable() && this.isDone()) {
                    Coordinator.this.workPackages.keySet().forEach(i -> Coordinator.this.processingStatusUpdater.accept(i, TrackerStatus::caughtUp));
                }
                this.coordinateWorkPackages();
                Coordinator.this.errorWaitBackOff = 500;
                this.processingGate.set(false);
                if (this.isSpaceAvailable() && this.eventStream.hasNextAvailable()) {
                    logger.debug("Processor [{}] (Coordination Task [{}]). Space available and events pending. Scheduling immediate coordination task (itself).", (Object)Coordinator.this.name, (Object)this.generation);
                    this.scheduleImmediateCoordinationTask();
                } else if (this.isSpaceAvailable()) {
                    if (!this.availabilityCallbackSupported) {
                        logger.trace("Processor [{}] (Coordination Task [{}]). Space available, no events pending, no availability callback. Scheduling coordination task (itself) with delay of 500ms.", (Object)Coordinator.this.name, (Object)this.generation);
                        this.scheduleCoordinationTask(500L);
                    } else {
                        long delay = Math.min(Coordinator.this.claimExtensionThreshold, Coordinator.this.tokenClaimInterval);
                        logger.trace("Processor [{}] (Coordination Task [{}]). Space available, no events pending. Scheduling delayed coordination task (itself) with delay of {}ms.", new Object[]{Coordinator.this.name, this.generation, delay});
                        this.scheduleDelayedCoordinationTask(delay);
                    }
                } else {
                    logger.trace("Processor [{}] (Coordination Task [{}]). No space available in work packages. Scheduling coordination task (itself) with delay of 100ms.", (Object)Coordinator.this.name, (Object)this.generation);
                    this.scheduleCoordinationTask(100L);
                }
            }
            catch (Exception e) {
                logger.warn("Processor [{}] (Coordination Task [{}]). Exception occurred while coordinating the work packages.", new Object[]{Coordinator.this.name, this.generation, e});
                if (e instanceof InterruptedException) {
                    logger.error("Processor [{}] (Coordination Task [{}]) was interrupted. Shutting down.", new Object[]{Coordinator.this.name, this.generation, e});
                    Coordinator.this.stop();
                    Thread.currentThread().interrupt();
                }
                this.abortAndScheduleRetry(e);
            }
        }

        private WorkPackage createWorkPackage(Segment segment, TrackingToken token) {
            WorkPackage workPackage = (WorkPackage)Coordinator.this.workPackageFactory.apply(segment, token);
            workPackage.onBatchProcessed(() -> this.resetRetryExponentialBackoff(segment.getSegmentId()));
            return workPackage;
        }

        private void resetRetryExponentialBackoff(int segmentId) {
            Coordinator.this.releasesLastBackOffSeconds.compute(segmentId, (s, b) -> null);
            logger.debug("Processor [{}] (Coordination Task [{}]) reset release deadline backoff for Segment [#{}].", new Object[]{Coordinator.this.name, this.generation, segmentId});
        }

        private CompletableFuture<Void> abortWorkPackages(Exception cause) {
            return Coordinator.this.workPackages.values().stream().map(wp -> this.abortWorkPackage((WorkPackage)wp, cause)).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).orElse(CompletableFuture.completedFuture(null)).thenRun(Coordinator.this.workPackages::clear);
        }

        private boolean releaseSegmentsIfTooManyClaimed() {
            boolean tooManySegmentsClaimed;
            int maxSegmentsPerNode = Coordinator.this.maxSegmentProvider.apply(Coordinator.this.name);
            boolean bl = tooManySegmentsClaimed = Coordinator.this.workPackages.size() > maxSegmentsPerNode;
            if (tooManySegmentsClaimed) {
                logger.info("Processor [{}] (Coordination Task [{}]). Total segments [{}] is above maxSegmentsPerNode = [{}], going to release surplus claimed segments.", new Object[]{Coordinator.this.name, this.generation, Coordinator.this.workPackages.size(), maxSegmentsPerNode});
                Coordinator.this.workPackages.values().stream().limit(Coordinator.this.workPackages.size() - maxSegmentsPerNode).forEach(workPackage -> Coordinator.this.releaseUntil(workPackage.segment().getSegmentId(), GenericEventMessage.clock.instant().plusMillis(Coordinator.this.tokenClaimInterval)));
            }
            return tooManySegmentsClaimed;
        }

        private Map<Segment, TrackingToken> claimNewSegments() {
            HashMap<Segment, TrackingToken> newClaims = new HashMap<Segment, TrackingToken>();
            List segments = Coordinator.this.transactionManager.fetchInTransaction(() -> Coordinator.this.tokenStore.fetchAvailableSegments(Coordinator.this.name));
            List unClaimedSegments = segments.stream().filter(segment -> !Coordinator.this.workPackages.containsKey(segment.getSegmentId())).collect(Collectors.toList());
            int maxSegmentsToClaim = Coordinator.this.maxSegmentProvider.apply(Coordinator.this.name) - Coordinator.this.workPackages.size();
            for (Segment segment2 : unClaimedSegments) {
                int segmentId = segment2.getSegmentId();
                if (this.isSegmentBlockedFromClaim(segmentId)) {
                    logger.debug("Processor [{}] (Coordination Task [{}]). Segment {} is still marked to not be claimed till [{}].", new Object[]{Coordinator.this.name, this.generation, segmentId, Coordinator.this.releasesDeadlines.get(segmentId)});
                    Coordinator.this.processingStatusUpdater.accept(segmentId, u -> null);
                    continue;
                }
                if (newClaims.size() >= maxSegmentsToClaim) continue;
                try {
                    TrackingToken token = Coordinator.this.transactionManager.fetchInTransaction(() -> Coordinator.this.tokenStore.fetchToken(Coordinator.this.name, segment2));
                    newClaims.put(segment2, token);
                    logger.info("Processor [{}] (Coordination Task [{}]) claimed the token for segment {}.", new Object[]{Coordinator.this.name, this.generation, segmentId});
                }
                catch (UnableToClaimTokenException e) {
                    Coordinator.this.processingStatusUpdater.accept(segmentId, u -> null);
                    logger.debug("Processor [{}] (Coordination Task [{}]) is unable to claim the token for segment {}. It is owned by another process or has been split/merged concurrently.", new Object[]{Coordinator.this.name, this.generation, segmentId});
                }
            }
            return newClaims;
        }

        private boolean isSegmentBlockedFromClaim(int segmentId) {
            Instant releaseDeadline = Coordinator.this.releasesDeadlines.compute(segmentId, (i, current) -> current == null || Coordinator.this.clock.instant().isAfter((Instant)current) ? null : current);
            return releaseDeadline != null;
        }

        private void ensureOpenStream(TrackingToken trackingToken) {
            if (this.eventStream != null && !Objects.equals(trackingToken, this.lastScheduledToken)) {
                logger.debug("Processor [{}] (Coordination Task [{}]) will close the current stream.", (Object)Coordinator.this.name, (Object)this.generation);
                IOUtils.closeQuietly(this.eventStream);
                this.eventStream = null;
                this.lastScheduledToken = NoToken.INSTANCE;
            }
            if (this.eventStream == null && !Coordinator.this.workPackages.isEmpty() && !(trackingToken instanceof NoToken)) {
                this.eventStream = Coordinator.this.messageSource.openStream(trackingToken);
                logger.debug("Processor [{}] (Coordination Task [{}]) opened stream with tracking token [{}].", new Object[]{Coordinator.this.name, this.generation, trackingToken});
                this.availabilityCallbackSupported = this.eventStream.setOnAvailableCallback(() -> {
                    logger.trace("Processor [{}] (Coordination Task [{}]). Events became available (callback triggered). Scheduling immediate coordination task (itself).", (Object)Coordinator.this.name, (Object)this.generation);
                    this.scheduleImmediateCoordinationTask();
                });
                this.lastScheduledToken = trackingToken;
            }
        }

        private boolean isSpaceAvailable() {
            return Coordinator.this.workPackages.values().stream().allMatch(WorkPackage::hasRemainingCapacity);
        }

        private boolean isDone() {
            return Coordinator.this.workPackages.values().stream().allMatch(WorkPackage::isDone);
        }

        private void coordinateWorkPackages() throws InterruptedException {
            logger.debug("Processor [{}] (Coordination Task [{}]) is coordinating work to all its work packages.", (Object)Coordinator.this.name, (Object)this.generation);
            for (int fetched = 0; fetched < 1024 && this.isSpaceAvailable() && this.eventStream.hasNextAvailable(); ++fetched) {
                TrackedEventMessage<?> event = this.eventStream.nextAvailable();
                this.lastScheduledToken = event.trackingToken();
                if (this.eventsEqualingLastScheduledToken()) {
                    ArrayList events = new ArrayList();
                    events.add(event);
                    while (this.eventsEqualingLastScheduledToken()) {
                        events.add(this.eventStream.nextAvailable());
                    }
                    this.offerEventsToWorkPackages(events);
                    continue;
                }
                this.offerEventToWorkPackages(event);
            }
            Coordinator.this.workPackages.values().stream().filter(WorkPackage::isAbortTriggered).forEach(workPackage -> {
                this.advanceReleaseDeadlineFor(workPackage.segment().getSegmentId());
                this.abortWorkPackage((WorkPackage)workPackage, null);
            });
            Coordinator.this.workPackages.values().forEach(WorkPackage::scheduleWorker);
        }

        private boolean eventsEqualingLastScheduledToken() {
            return this.eventStream.peek().filter(e -> this.lastScheduledToken.equals(e.trackingToken())).isPresent();
        }

        private void offerEventToWorkPackages(TrackedEventMessage<?> event) {
            boolean anyScheduled = false;
            for (WorkPackage workPackage : Coordinator.this.workPackages.values()) {
                boolean scheduled = workPackage.scheduleEvent(event);
                anyScheduled = anyScheduled || scheduled;
            }
            if (!anyScheduled) {
                Coordinator.this.ignoredMessageHandler.accept(event);
                if (!Coordinator.this.eventFilter.canHandleTypeOf(event)) {
                    this.eventStream.skipMessagesWithPayloadTypeOf(event);
                }
            }
        }

        private void offerEventsToWorkPackages(List<TrackedEventMessage<?>> events) {
            boolean anyScheduled = false;
            for (WorkPackage workPackage : Coordinator.this.workPackages.values()) {
                boolean scheduled = workPackage.scheduleEvents(Collections.unmodifiableList(events));
                anyScheduled = anyScheduled || scheduled;
            }
            if (!anyScheduled) {
                events.forEach(event -> {
                    Coordinator.this.ignoredMessageHandler.accept(event);
                    if (!Coordinator.this.eventFilter.canHandleTypeOf((TrackedEventMessage<?>)event)) {
                        this.eventStream.skipMessagesWithPayloadTypeOf((TrackedEventMessage<?>)event);
                    }
                });
            }
        }

        private void scheduleImmediateCoordinationTask() {
            this.scheduleCoordinationTask(0L);
        }

        private void scheduleCoordinationTask(long delay) {
            if (this.scheduledGate.compareAndSet(false, true)) {
                logger.trace("Processor [{}] (Coordination Task [{}]). Scheduled coordination task (itself) with delay of {}ms.", new Object[]{Coordinator.this.name, this.generation, delay});
                Coordinator.this.executorService.schedule(() -> {
                    this.scheduledGate.set(false);
                    this.run();
                }, delay, TimeUnit.MILLISECONDS);
            } else {
                logger.trace("Processor [{}] (Coordination Task [{}]). Skipped scheduling coordination task (delay={}ms). scheduledGate already set.", new Object[]{Coordinator.this.name, this.generation, delay});
            }
        }

        private void scheduleDelayedCoordinationTask(long delay) {
            if (!this.scheduledGate.get() && this.interruptibleScheduledGate.compareAndSet(false, true)) {
                logger.trace("Processor [{}] (Coordination Task [{}]). Scheduled delayed coordination task (itself) with delay of {}ms.", new Object[]{Coordinator.this.name, this.generation, delay});
                Coordinator.this.executorService.schedule(() -> {
                    this.interruptibleScheduledGate.set(false);
                    this.run();
                }, delay, TimeUnit.MILLISECONDS);
            } else {
                logger.trace("Processor [{}] (Coordination Task [{}]). Skipped scheduling delayed coordination task (delay={}ms). scheduledGate={}, interruptibleScheduledGate={}.", new Object[]{Coordinator.this.name, this.generation, delay, this.scheduledGate.get(), this.interruptibleScheduledGate.get()});
            }
        }

        private void abortAndScheduleRetry(Exception cause) {
            Coordinator.this.errorWaitBackOff = Math.min(Coordinator.this.errorWaitBackOff * 2, 60000);
            logger.info("Processor [{}] (Coordination Task [{}]) is releasing claims and scheduling a new coordination task in {}ms.", new Object[]{Coordinator.this.name, this.generation, Coordinator.this.errorWaitBackOff});
            this.abortWorkPackages(cause).whenComplete((unused, throwable) -> {
                if (throwable != null) {
                    logger.warn("Processor [{}] (Coordination Task [{}]). An exception occurred during work packages abort.", new Object[]{Coordinator.this.name, this.generation, throwable});
                } else {
                    logger.debug("Processor [{}] (Coordination Task [{}]). Work packages have aborted successfully.", (Object)Coordinator.this.name, (Object)this.generation);
                }
                logger.debug("Processor [{}] (Coordination Task [{}]). Scheduling new coordination task to run in {}ms.", new Object[]{Coordinator.this.name, this.generation, Coordinator.this.errorWaitBackOff});
                CoordinationTask task = new CoordinationTask();
                Coordinator.this.executorService.schedule(task, (long)Coordinator.this.errorWaitBackOff, TimeUnit.MILLISECONDS);
                Coordinator.this.coordinationTask.set(task);
                this.processingGate.set(false);
            });
            IOUtils.closeQuietly(this.eventStream);
        }

        private CompletableFuture<Void> abortWorkPackage(WorkPackage work, Exception cause) {
            int segmentId = work.segment().getSegmentId();
            return ((CompletableFuture)((CompletableFuture)work.abort(cause).thenRun(() -> {
                if (Coordinator.this.workPackages.remove(segmentId, work)) {
                    logger.debug("Processor [{}] (Coordination Task [{}]) released claim on {}.", new Object[]{Coordinator.this.name, this.generation, work.segment()});
                }
            })).thenRun(() -> Coordinator.this.transactionManager.executeInTransaction(() -> {
                Coordinator.this.tokenStore.releaseClaim(Coordinator.this.name, segmentId);
                Coordinator.this.segmentReleasedAction.accept(work.segment());
            }))).exceptionally(throwable -> {
                logger.info("Processor [{}] (Coordination Task [{}]). An exception occurred during the abort of work package [{}].", new Object[]{Coordinator.this.name, this.generation, segmentId, throwable});
                return null;
            });
        }

        private void advanceReleaseDeadlineFor(int segmentId) {
            int errorWaitTime = Coordinator.this.releasesLastBackOffSeconds.compute(segmentId, (i, current) -> current == null ? 1 : Math.min(current * 2, 60));
            Coordinator.this.releasesDeadlines.compute(segmentId, (i, current) -> {
                Instant now = Coordinator.this.clock.instant();
                Instant nextBackOffRetry = now.plusSeconds(errorWaitTime);
                Instant releaseDeadline = current != null && current.isAfter(nextBackOffRetry) ? current : nextBackOffRetry;
                logger.debug("Processor [{}] (Coordination Task [{}]) set release deadline claim to [{}] for Segment [#{}] using backoff of [{}] seconds.", new Object[]{Coordinator.this.name, this.generation, releaseDeadline, segmentId, errorWaitTime});
                return releaseDeadline;
            });
        }
    }

    private static class NoToken
    implements TrackingToken {
        public static final TrackingToken INSTANCE = new NoToken();

        private NoToken() {
        }

        @Override
        public TrackingToken lowerBound(TrackingToken other) {
            return other;
        }

        @Override
        public TrackingToken upperBound(TrackingToken other) {
            return other;
        }

        @Override
        public boolean covers(TrackingToken other) {
            return false;
        }

        @Override
        public boolean samePositionAs(TrackingToken other) {
            return other instanceof NoToken;
        }
    }

    static class Builder {
        private String name;
        private StreamableMessageSource<TrackedEventMessage<?>> messageSource;
        private TokenStore tokenStore;
        private TransactionManager transactionManager;
        private ScheduledExecutorService executorService;
        private BiFunction<Segment, TrackingToken, WorkPackage> workPackageFactory;
        private EventFilter eventFilter;
        private Consumer<? super TrackedEventMessage<?>> ignoredMessageHandler = i -> {};
        private BiConsumer<Integer, UnaryOperator<TrackerStatus>> processingStatusUpdater;
        private long tokenClaimInterval = 5000L;
        private long claimExtensionThreshold = 5000L;
        private Clock clock = GenericEventMessage.clock;
        private MaxSegmentProvider maxSegmentProvider;
        private int initialSegmentCount = 16;
        private Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
        private Runnable shutdownAction = () -> {};
        private boolean coordinatorExtendsClaims = false;
        private Consumer<Segment> segmentReleasedAction = segment -> {};

        Builder() {
        }

        Builder name(String name) {
            this.name = name;
            return this;
        }

        Builder messageSource(StreamableMessageSource<TrackedEventMessage<?>> messageSource) {
            this.messageSource = messageSource;
            return this;
        }

        Builder tokenStore(TokenStore tokenStore) {
            this.tokenStore = tokenStore;
            return this;
        }

        Builder transactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
            return this;
        }

        Builder executorService(ScheduledExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        Builder workPackageFactory(BiFunction<Segment, TrackingToken, WorkPackage> workPackageFactory) {
            this.workPackageFactory = workPackageFactory;
            return this;
        }

        Builder eventFilter(EventFilter eventFilter) {
            this.eventFilter = eventFilter;
            return this;
        }

        Builder onMessageIgnored(Consumer<? super TrackedEventMessage<?>> ignoredMessageHandler) {
            this.ignoredMessageHandler = ignoredMessageHandler;
            return this;
        }

        Builder processingStatusUpdater(BiConsumer<Integer, UnaryOperator<TrackerStatus>> processingStatusUpdater) {
            this.processingStatusUpdater = processingStatusUpdater;
            return this;
        }

        Builder tokenClaimInterval(long tokenClaimInterval) {
            this.tokenClaimInterval = tokenClaimInterval;
            return this;
        }

        Builder claimExtensionThreshold(long claimExtensionThreshold) {
            this.claimExtensionThreshold = claimExtensionThreshold;
            return this;
        }

        Builder clock(Clock clock) {
            this.clock = clock;
            return this;
        }

        Builder maxSegmentProvider(MaxSegmentProvider maxSegmentProvider) {
            this.maxSegmentProvider = maxSegmentProvider;
            return this;
        }

        Builder initialSegmentCount(int initialSegmentCount) {
            this.initialSegmentCount = initialSegmentCount;
            return this;
        }

        Builder initialToken(Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken) {
            this.initialToken = initialToken;
            return this;
        }

        Builder onShutdown(Runnable shutdownAction) {
            this.shutdownAction = shutdownAction;
            return this;
        }

        Builder coordinatorClaimExtension(boolean coordinatorExtendsClaims) {
            this.coordinatorExtendsClaims = coordinatorExtendsClaims;
            return this;
        }

        Builder segmentReleasedAction(Consumer<Segment> segmentReleasedAction) {
            this.segmentReleasedAction = segmentReleasedAction;
            return this;
        }

        Coordinator build() {
            return new Coordinator(this);
        }
    }

    @FunctionalInterface
    static interface EventFilter {
        public boolean canHandleTypeOf(TrackedEventMessage<?> var1);
    }

    private static class RunState {
        private final boolean isRunning;
        private final boolean wasStarted;
        private final CompletableFuture<Void> shutdownHandle;
        private final Runnable shutdownAction;

        private RunState(boolean isRunning, boolean wasStarted, CompletableFuture<Void> shutdownHandle, Runnable shutdownAction) {
            this.isRunning = isRunning;
            this.wasStarted = wasStarted;
            this.shutdownHandle = shutdownHandle;
            this.shutdownAction = shutdownAction;
        }

        public static RunState initial(Runnable shutdownAction) {
            return new RunState(false, false, CompletableFuture.completedFuture(null), shutdownAction);
        }

        public RunState attemptStart() {
            if (this.isRunning) {
                return new RunState(true, false, null, this.shutdownAction);
            }
            if (this.shutdownHandle.isDone()) {
                return new RunState(true, true, null, this.shutdownAction);
            }
            return this;
        }

        public RunState attemptStop() {
            if (!this.isRunning || this.shutdownHandle != null) {
                return this;
            }
            CompletableFuture<Void> newShutdownHandle = new CompletableFuture<Void>();
            newShutdownHandle.whenComplete((r, e) -> this.shutdownAction.run());
            return new RunState(false, false, newShutdownHandle, this.shutdownAction);
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        public boolean wasStarted() {
            return this.wasStarted;
        }

        public CompletableFuture<Void> shutdownHandle() {
            return this.shutdownHandle;
        }
    }
}

