/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.processors.streaming.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.ProcessUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.processors.streaming.pooled.ClaimTask;
import org.axonframework.eventhandling.processors.streaming.pooled.CoordinatorTask;
import org.axonframework.eventhandling.processors.streaming.pooled.MaxSegmentProvider;
import org.axonframework.eventhandling.processors.streaming.pooled.MergeTask;
import org.axonframework.eventhandling.processors.streaming.pooled.SplitTask;
import org.axonframework.eventhandling.processors.streaming.pooled.WorkPackage;
import org.axonframework.eventhandling.processors.streaming.segmenting.Segment;
import org.axonframework.eventhandling.processors.streaming.segmenting.TrackerStatus;
import org.axonframework.eventhandling.processors.streaming.token.TrackingToken;
import org.axonframework.eventhandling.processors.streaming.token.WrappedToken;
import org.axonframework.eventhandling.processors.streaming.token.store.TokenStore;
import org.axonframework.eventhandling.processors.streaming.token.store.UnableToClaimTokenException;
import org.axonframework.eventstreaming.EventCriteria;
import org.axonframework.eventstreaming.StreamableEventSource;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.eventstreaming.TrackingTokenSource;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.messaging.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Coordinator {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String name;
    private final StreamableEventSource eventSource;
    private final TokenStore tokenStore;
    private final UnitOfWorkFactory unitOfWorkFactory;
    private final ScheduledExecutorService executorService;
    private final BiFunction<Segment, TrackingToken, WorkPackage> workPackageFactory;
    private final Consumer<? super EventMessage> ignoredMessageHandler;
    private final BiConsumer<Integer, UnaryOperator<TrackerStatus>> processingStatusUpdater;
    private final long tokenClaimInterval;
    private final long claimExtensionThreshold;
    private final Clock clock;
    private final MaxSegmentProvider maxSegmentProvider;
    private final int initialSegmentCount;
    private final Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken;
    private final boolean coordinatorExtendsClaims;
    private final Consumer<Segment> segmentReleasedAction;
    private final EventCriteria eventCriteria;
    private final Map<Integer, WorkPackage> workPackages = new ConcurrentHashMap<Integer, WorkPackage>();
    private final AtomicReference<RunState> runState;
    private final Map<Integer, Instant> releasesDeadlines = new ConcurrentHashMap<Integer, Instant>();
    private final Map<Integer, Integer> releasesLastBackOffSeconds = new ConcurrentHashMap<Integer, Integer>();
    private final Queue<CoordinatorTask> coordinatorTasks = new ConcurrentLinkedQueue<CoordinatorTask>();
    private final AtomicReference<CoordinationTask> coordinationTask = new AtomicReference();
    private int errorWaitBackOff = 500;

    protected static Builder builder() {
        return new Builder();
    }

    private Coordinator(Builder builder) {
        this.name = builder.name;
        this.eventSource = builder.eventSource;
        this.tokenStore = builder.tokenStore;
        this.unitOfWorkFactory = builder.unitOfWorkFactory;
        this.executorService = builder.executorService;
        this.workPackageFactory = builder.workPackageFactory;
        this.ignoredMessageHandler = builder.ignoredMessageHandler;
        this.processingStatusUpdater = builder.processingStatusUpdater;
        this.tokenClaimInterval = builder.tokenClaimInterval;
        this.claimExtensionThreshold = builder.claimExtensionThreshold;
        this.clock = builder.clock;
        this.maxSegmentProvider = builder.maxSegmentProvider;
        this.initialSegmentCount = builder.initialSegmentCount;
        this.initialToken = builder.initialToken;
        this.runState = new AtomicReference<RunState>(RunState.initial(builder.shutdownAction));
        this.coordinatorExtendsClaims = builder.coordinatorExtendsClaims;
        this.segmentReleasedAction = builder.segmentReleasedAction;
        this.eventCriteria = builder.eventCriteria;
    }

    public void start() {
        RunState newState = this.runState.updateAndGet(RunState::attemptStart);
        if (newState.wasStarted()) {
            logger.debug("Starting Coordinator for Processor [{}].", (Object)this.name);
            try {
                ProcessUtils.executeUntilTrue(this::initializeTokenStore, 100L, 30L);
                CoordinationTask task = new CoordinationTask();
                this.executorService.submit(task);
                this.coordinationTask.set(task);
            }
            catch (Exception e) {
                this.runState.updateAndGet(RunState::attemptStop).shutdownHandle().complete(null);
                throw e;
            }
        } else if (!newState.isRunning) {
            throw new IllegalStateException("Cannot start a processor while it's in process of shutting down.");
        }
    }

    public CompletableFuture<Void> stop() {
        logger.debug("Stopping Coordinator for Processor [{}].", (Object)this.name);
        CompletableFuture<Void> handle = this.runState.updateAndGet(RunState::attemptStop).shutdownHandle();
        CoordinationTask task = this.coordinationTask.getAndSet(null);
        if (task != null) {
            task.scheduleImmediateCoordinationTask();
        }
        return handle;
    }

    public boolean isRunning() {
        return this.runState.get().isRunning();
    }

    private void scheduleCoordinator() {
        CoordinationTask coordinator = this.coordinationTask.get();
        if (coordinator != null) {
            coordinator.scheduleImmediateCoordinationTask();
        }
    }

    public boolean isError() {
        return this.errorWaitBackOff > 500;
    }

    public void releaseUntil(int segmentId, Instant releaseDuration) {
        logger.debug("Processor [{}] will release segment {} for processing until {}.", new Object[]{this.name, segmentId, releaseDuration});
        this.releasesDeadlines.put(segmentId, releaseDuration);
        this.scheduleCoordinator();
    }

    public CompletableFuture<Boolean> splitSegment(int segmentId) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.coordinatorTasks.add(new SplitTask(result, this.name, segmentId, this.workPackages, this.tokenStore, this.unitOfWorkFactory));
        this.scheduleCoordinator();
        return result;
    }

    public CompletableFuture<Boolean> mergeSegment(int segmentId) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.coordinatorTasks.add(new MergeTask(result, this.name, segmentId, this.workPackages, this.tokenStore, this.unitOfWorkFactory));
        this.scheduleCoordinator();
        return result;
    }

    public CompletableFuture<Boolean> claimSegment(int segmentId) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.coordinatorTasks.add(new ClaimTask(result, this.name, segmentId, this.workPackages, this.releasesDeadlines, this.tokenStore, this.unitOfWorkFactory));
        this.scheduleCoordinator();
        return result;
    }

    private boolean initializeTokenStore() {
        AtomicBoolean tokenStoreInitialized = new AtomicBoolean(false);
        try {
            FutureUtils.joinAndUnwrap(this.unitOfWorkFactory.create().executeWithResult(context -> {
                List<Segment> segments = FutureUtils.joinAndUnwrap(this.tokenStore.fetchSegments(this.name, (ProcessingContext)context));
                if (segments.isEmpty()) {
                    logger.info("Initializing segments for processor [{}] ({} segments)", (Object)this.name, (Object)this.initialSegmentCount);
                    FutureUtils.joinAndUnwrap(this.tokenStore.initializeTokenSegments(this.name, this.initialSegmentCount, FutureUtils.joinAndUnwrap(this.initialToken.apply(this.eventSource)), (ProcessingContext)context));
                }
                tokenStoreInitialized.set(true);
                return FutureUtils.emptyCompletedFuture();
            }));
        }
        catch (Exception e) {
            logger.info("Error while initializing the Token Store. This may simply indicate concurrent attempts to initialize.", (Throwable)e);
        }
        return tokenStoreInitialized.get();
    }

    static class Builder {
        private String name;
        private StreamableEventSource eventSource;
        private TokenStore tokenStore;
        private UnitOfWorkFactory unitOfWorkFactory;
        private ScheduledExecutorService executorService;
        private BiFunction<Segment, TrackingToken, WorkPackage> workPackageFactory;
        private Consumer<? super EventMessage> ignoredMessageHandler = i -> {};
        private BiConsumer<Integer, UnaryOperator<TrackerStatus>> processingStatusUpdater;
        private long tokenClaimInterval = 5000L;
        private long claimExtensionThreshold = 5000L;
        private Clock clock = GenericEventMessage.clock;
        private MaxSegmentProvider maxSegmentProvider;
        private int initialSegmentCount = 16;
        private Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken;
        private Runnable shutdownAction = () -> {};
        private boolean coordinatorExtendsClaims = false;
        private Consumer<Segment> segmentReleasedAction = segment -> {};
        private EventCriteria eventCriteria = EventCriteria.havingAnyTag();

        Builder() {
        }

        Builder name(String name) {
            this.name = name;
            return this;
        }

        Builder eventSource(StreamableEventSource eventSource) {
            this.eventSource = eventSource;
            return this;
        }

        Builder tokenStore(TokenStore tokenStore) {
            this.tokenStore = tokenStore;
            return this;
        }

        Builder unitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) {
            this.unitOfWorkFactory = unitOfWorkFactory;
            return this;
        }

        Builder executorService(ScheduledExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        Builder workPackageFactory(BiFunction<Segment, TrackingToken, WorkPackage> workPackageFactory) {
            this.workPackageFactory = workPackageFactory;
            return this;
        }

        Builder onMessageIgnored(Consumer<? super EventMessage> ignoredMessageHandler) {
            this.ignoredMessageHandler = ignoredMessageHandler;
            return this;
        }

        Builder processingStatusUpdater(BiConsumer<Integer, UnaryOperator<TrackerStatus>> processingStatusUpdater) {
            this.processingStatusUpdater = processingStatusUpdater;
            return this;
        }

        Builder tokenClaimInterval(long tokenClaimInterval) {
            this.tokenClaimInterval = tokenClaimInterval;
            return this;
        }

        Builder claimExtensionThreshold(long claimExtensionThreshold) {
            this.claimExtensionThreshold = claimExtensionThreshold;
            return this;
        }

        Builder clock(Clock clock) {
            this.clock = clock;
            return this;
        }

        Builder maxSegmentProvider(MaxSegmentProvider maxSegmentProvider) {
            this.maxSegmentProvider = maxSegmentProvider;
            BuilderUtils.assertStrictPositive(maxSegmentProvider.getMaxSegments(this.name), "Max claimed segments should be a higher valuer than zero");
            return this;
        }

        Builder initialSegmentCount(int initialSegmentCount) {
            this.initialSegmentCount = initialSegmentCount;
            return this;
        }

        Builder initialToken(Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken) {
            this.initialToken = initialToken;
            return this;
        }

        Builder onShutdown(Runnable shutdownAction) {
            this.shutdownAction = shutdownAction;
            return this;
        }

        Builder coordinatorClaimExtension(boolean coordinatorExtendsClaims) {
            this.coordinatorExtendsClaims = coordinatorExtendsClaims;
            return this;
        }

        Builder segmentReleasedAction(Consumer<Segment> segmentReleasedAction) {
            this.segmentReleasedAction = segmentReleasedAction;
            return this;
        }

        Builder eventCriteria(EventCriteria eventCriteria) {
            this.eventCriteria = eventCriteria;
            return this;
        }

        Coordinator build() {
            return new Coordinator(this);
        }
    }

    private static class RunState {
        private final boolean isRunning;
        private final boolean wasStarted;
        private final CompletableFuture<Void> shutdownHandle;
        private final Runnable shutdownAction;

        private RunState(boolean isRunning, boolean wasStarted, CompletableFuture<Void> shutdownHandle, Runnable shutdownAction) {
            this.isRunning = isRunning;
            this.wasStarted = wasStarted;
            this.shutdownHandle = shutdownHandle;
            this.shutdownAction = shutdownAction;
        }

        public static RunState initial(Runnable shutdownAction) {
            return new RunState(false, false, FutureUtils.emptyCompletedFuture(), shutdownAction);
        }

        public RunState attemptStart() {
            if (this.isRunning) {
                return new RunState(true, false, null, this.shutdownAction);
            }
            if (this.shutdownHandle.isDone()) {
                return new RunState(true, true, null, this.shutdownAction);
            }
            return this;
        }

        public RunState attemptStop() {
            if (!this.isRunning || this.shutdownHandle != null) {
                return this;
            }
            CompletableFuture<Void> newShutdownHandle = new CompletableFuture<Void>();
            newShutdownHandle.whenComplete((r, e) -> this.shutdownAction.run());
            return new RunState(false, false, newShutdownHandle, this.shutdownAction);
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        public boolean wasStarted() {
            return this.wasStarted;
        }

        public CompletableFuture<Void> shutdownHandle() {
            return this.shutdownHandle;
        }
    }

    private class CoordinationTask
    implements Runnable {
        private final AtomicBoolean processingGate = new AtomicBoolean();
        private final AtomicBoolean scheduledGate = new AtomicBoolean();
        private final AtomicBoolean interruptibleScheduledGate = new AtomicBoolean();
        private MessageStream<? extends EventMessage> eventStream;
        private TrackingToken lastScheduledToken = NoToken.INSTANCE;
        private boolean availabilityCallbackSupported;
        private long unclaimedSegmentValidationThreshold;

        private CoordinationTask() {
        }

        @Override
        public void run() {
            if (!this.processingGate.compareAndSet(false, true)) {
                return;
            }
            if (!Coordinator.this.runState.get().isRunning()) {
                logger.debug("Stopped processing. Runnable flag is false.\nReleasing claims and closing the event stream for Processor [{}].", (Object)Coordinator.this.name);
                this.abortWorkPackages(null).thenRun(() -> Coordinator.this.runState.get().shutdownHandle().complete(null));
                this.closeStreamQuietly();
                return;
            }
            Coordinator.this.workPackages.entrySet().stream().filter(entry -> this.isSegmentBlockedFromClaim((Integer)entry.getKey())).map(Map.Entry::getValue).forEach(workPackage -> {
                logger.info("Processor [{}] was requested and will comply with releasing claim for segment {}.", (Object)Coordinator.this.name, (Object)workPackage.segment().getSegmentId());
                this.abortWorkPackage((WorkPackage)workPackage, null);
            });
            if (Coordinator.this.coordinatorExtendsClaims) {
                logger.debug("Processor [{}] will extend the claim of work packages that are busy processing events and have met the claim threshold.", (Object)Coordinator.this.name);
                Coordinator.this.workPackages.values().stream().filter(workPackage -> !workPackage.isAbortTriggered()).filter(WorkPackage::isProcessingEvents).forEach(workPackage -> {
                    try {
                        workPackage.extendClaimIfThresholdIsMet();
                    }
                    catch (Exception e) {
                        logger.warn("Error while extending claim for Work Package [{}]-[{}]. Aborting Work Package...", new Object[]{workPackage.segment().getSegmentId(), Coordinator.this.name, e});
                        workPackage.abort(e);
                    }
                });
            }
            if (!Coordinator.this.coordinatorTasks.isEmpty()) {
                CoordinatorTask task = Coordinator.this.coordinatorTasks.remove();
                logger.debug("Processor [{}] found task [{}] to run.", (Object)Coordinator.this.name, (Object)task.getDescription());
                ((CompletableFuture)task.run().thenRun(() -> {
                    this.unclaimedSegmentValidationThreshold = 0L;
                })).whenComplete((result, exception) -> {
                    this.processingGate.set(false);
                    this.scheduleImmediateCoordinationTask();
                });
                return;
            }
            if (this.eventStream == null || this.unclaimedSegmentValidationThreshold <= Coordinator.this.clock.instant().toEpochMilli()) {
                this.unclaimedSegmentValidationThreshold = Coordinator.this.clock.instant().toEpochMilli() + Coordinator.this.tokenClaimInterval;
                try {
                    TrackingToken streamStartPosition = this.lastScheduledToken;
                    if (!this.releaseSegmentsIfTooManyClaimed()) {
                        logger.debug("Processor [{}] will try to claim new segments.", (Object)Coordinator.this.name);
                        Map<Segment, TrackingToken> newSegments = this.claimNewSegments();
                        for (Map.Entry<Segment, TrackingToken> entry2 : newSegments.entrySet()) {
                            Segment segment = entry2.getKey();
                            TrackingToken token = entry2.getValue();
                            TrackingToken otherUnwrapped = WrappedToken.unwrapLowerBound(token);
                            streamStartPosition = streamStartPosition == null || otherUnwrapped == null ? null : streamStartPosition.lowerBound(otherUnwrapped);
                            Coordinator.this.workPackages.computeIfAbsent(segment.getSegmentId(), wp -> this.createWorkPackage(segment, token));
                        }
                        if (logger.isInfoEnabled() && !newSegments.isEmpty()) {
                            logger.info("Processor [{}] claimed {} new segments for processing", (Object)Coordinator.this.name, (Object)newSegments.size());
                        }
                    }
                    this.ensureOpenStream(streamStartPosition);
                }
                catch (Exception e) {
                    logger.warn("Exception occurred while Processor [{}] started work packages and opened the event stream.", (Object)Coordinator.this.name, (Object)e);
                    this.abortAndScheduleRetry(e);
                    return;
                }
            }
            if (Coordinator.this.workPackages.isEmpty()) {
                logger.debug("No segments claimed. Will retry in {} milliseconds.", (Object)Coordinator.this.tokenClaimInterval);
                this.lastScheduledToken = NoToken.INSTANCE;
                this.closeStreamQuietly();
                this.eventStream = null;
                this.processingGate.set(false);
                this.scheduleDelayedCoordinationTask(Coordinator.this.tokenClaimInterval);
                return;
            }
            try {
                if (!this.hasNextEvent() && this.isDone()) {
                    Coordinator.this.workPackages.keySet().forEach(i -> Coordinator.this.processingStatusUpdater.accept((Integer)i, TrackerStatus::caughtUp));
                }
                this.coordinateWorkPackages();
                Coordinator.this.errorWaitBackOff = 500;
                this.processingGate.set(false);
                if (this.isSpaceAvailable() && this.hasNextEvent()) {
                    this.scheduleImmediateCoordinationTask();
                } else if (this.isSpaceAvailable()) {
                    if (!this.availabilityCallbackSupported) {
                        this.scheduleCoordinationTask(500L);
                    } else {
                        this.scheduleDelayedCoordinationTask(Math.min(Coordinator.this.claimExtensionThreshold, Coordinator.this.tokenClaimInterval));
                    }
                } else {
                    this.scheduleCoordinationTask(100L);
                }
            }
            catch (Exception e) {
                logger.warn("Exception occurred while Processor [{}] was coordinating the work packages.", (Object)Coordinator.this.name, (Object)e);
                if (e instanceof InterruptedException) {
                    logger.error("Processor [{}] was interrupted. Shutting down.", (Object)Coordinator.this.name, (Object)e);
                    Coordinator.this.stop();
                    Thread.currentThread().interrupt();
                }
                this.abortAndScheduleRetry(e);
            }
        }

        private void closeStreamQuietly() {
            if (this.eventStream != null) {
                try {
                    this.eventStream.close();
                }
                catch (Exception e) {
                    logger.debug("Exception occurred while closing event stream for Processor [{}].", (Object)Coordinator.this.name, (Object)e);
                }
            }
        }

        private WorkPackage createWorkPackage(Segment segment, TrackingToken token) {
            WorkPackage workPackage = Coordinator.this.workPackageFactory.apply(segment, token);
            workPackage.onBatchProcessed(() -> this.resetRetryExponentialBackoff(segment.getSegmentId()));
            return workPackage;
        }

        private void resetRetryExponentialBackoff(int segmentId) {
            Coordinator.this.releasesLastBackOffSeconds.compute(segmentId, (s, b) -> null);
            logger.debug("Processor [{}] reset release deadline backoff for Segment [#{}].", (Object)Coordinator.this.name, (Object)segmentId);
        }

        private CompletableFuture<Void> abortWorkPackages(Exception cause) {
            return Coordinator.this.workPackages.values().stream().map(wp -> this.abortWorkPackage((WorkPackage)wp, cause)).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).orElse(FutureUtils.emptyCompletedFuture()).thenRun(Coordinator.this.workPackages::clear);
        }

        private boolean releaseSegmentsIfTooManyClaimed() {
            boolean tooManySegmentsClaimed;
            int maxSegmentsPerNode = Coordinator.this.maxSegmentProvider.apply(Coordinator.this.name);
            boolean bl = tooManySegmentsClaimed = Coordinator.this.workPackages.size() > maxSegmentsPerNode;
            if (tooManySegmentsClaimed) {
                logger.info("Total segments [{}] for processor [{}] is above maxSegmentsPerNode = [{}], going to release surplus claimed segments.", new Object[]{Coordinator.this.workPackages.size(), Coordinator.this.name, maxSegmentsPerNode});
                Coordinator.this.workPackages.values().stream().limit(Coordinator.this.workPackages.size() - maxSegmentsPerNode).forEach(workPackage -> Coordinator.this.releaseUntil(workPackage.segment().getSegmentId(), GenericEventMessage.clock.instant().plusMillis(Coordinator.this.tokenClaimInterval)));
            }
            return tooManySegmentsClaimed;
        }

        private Map<Segment, TrackingToken> claimNewSegments() {
            HashMap<Segment, TrackingToken> newClaims = new HashMap<Segment, TrackingToken>();
            List segments = (List)FutureUtils.joinAndUnwrap(Coordinator.this.unitOfWorkFactory.create().executeWithResult(context -> Coordinator.this.tokenStore.fetchAvailableSegments(Coordinator.this.name, (ProcessingContext)context)));
            if (segments == null) {
                return newClaims;
            }
            List<Segment> unClaimedSegments = segments.stream().filter(segment -> !Coordinator.this.workPackages.containsKey(segment.getSegmentId())).toList();
            int maxSegmentsToClaim = Coordinator.this.maxSegmentProvider.apply(Coordinator.this.name) - Coordinator.this.workPackages.size();
            for (Segment segment2 : unClaimedSegments) {
                int segmentId = segment2.getSegmentId();
                if (this.isSegmentBlockedFromClaim(segmentId)) {
                    logger.debug("Segment {} is still marked to not be claimed by Processor [{}] till [{}].", new Object[]{segmentId, Coordinator.this.name, Coordinator.this.releasesDeadlines.get(segmentId)});
                    Coordinator.this.processingStatusUpdater.accept(segmentId, u -> null);
                    continue;
                }
                if (newClaims.size() >= maxSegmentsToClaim) continue;
                try {
                    TrackingToken token = (TrackingToken)FutureUtils.joinAndUnwrap(Coordinator.this.unitOfWorkFactory.create().executeWithResult(context -> Coordinator.this.tokenStore.fetchToken(Coordinator.this.name, segment2, (ProcessingContext)context)));
                    newClaims.put(segment2, token);
                    logger.info("Processor [{}] claimed the token for segment {}.", (Object)Coordinator.this.name, (Object)segmentId);
                }
                catch (UnableToClaimTokenException e) {
                    Coordinator.this.processingStatusUpdater.accept(segmentId, u -> null);
                    logger.debug("Processor [{}] is unable to claim the token for segment {}. It is owned by another process or has been split/merged concurrently.", (Object)Coordinator.this.name, (Object)segmentId);
                }
            }
            return newClaims;
        }

        private boolean isSegmentBlockedFromClaim(int segmentId) {
            Instant releaseDeadline = Coordinator.this.releasesDeadlines.compute(segmentId, (i, current) -> current == null || Coordinator.this.clock.instant().isAfter((Instant)current) ? null : current);
            return releaseDeadline != null;
        }

        private void ensureOpenStream(TrackingToken trackingToken) {
            if (this.eventStream != null && !Objects.equals(trackingToken, this.lastScheduledToken)) {
                logger.debug("Processor [{}] will close the current stream.", (Object)Coordinator.this.name);
                this.closeStreamQuietly();
                this.eventStream = null;
                this.lastScheduledToken = NoToken.INSTANCE;
            }
            if (this.eventStream == null && !Coordinator.this.workPackages.isEmpty() && !(trackingToken instanceof NoToken)) {
                TrackingToken startStreamingFrom = Objects.requireNonNullElse(trackingToken, FutureUtils.joinAndUnwrap(Coordinator.this.eventSource.firstToken(null)));
                this.eventStream = Coordinator.this.eventSource.open(StreamingCondition.conditionFor(startStreamingFrom, Coordinator.this.eventCriteria), null);
                logger.debug("Processor [{}] opened stream with tracking token [{}] and criteria [{}].", new Object[]{Coordinator.this.name, trackingToken, Coordinator.this.eventCriteria});
                this.availabilityCallbackSupported = true;
                this.eventStream.onAvailable(this::scheduleImmediateCoordinationTask);
                this.lastScheduledToken = trackingToken;
            }
        }

        private boolean isSpaceAvailable() {
            return Coordinator.this.workPackages.values().stream().allMatch(WorkPackage::hasRemainingCapacity);
        }

        private boolean isDone() {
            return Coordinator.this.workPackages.values().stream().allMatch(WorkPackage::isDone);
        }

        private MessageStream.Entry<? extends EventMessage> nextEventOrNull() {
            if (this.eventStream == null) {
                return null;
            }
            return this.eventStream.next().orElse(null);
        }

        private boolean hasNextEvent() {
            return this.eventStream != null && this.eventStream.hasNextAvailable();
        }

        private boolean eventsEqualingLastScheduledToken(TrackingToken lastScheduledToken) {
            MessageStream.Entry nextEntry = this.eventStream.peek().orElse(null);
            if (nextEntry == null || lastScheduledToken == null) {
                return false;
            }
            TrackingToken nextToken = TrackingToken.fromContext(nextEntry).orElse(null);
            return Objects.equals(lastScheduledToken, nextToken);
        }

        private void coordinateWorkPackages() {
            MessageStream.Entry<? extends EventMessage> eventEntry;
            Optional<Throwable> streamError;
            logger.debug("Processor [{}] is coordinating work to all its work packages.", (Object)Coordinator.this.name);
            if (this.eventStream != null && (streamError = this.eventStream.error()).isPresent()) {
                throw new RuntimeException("Event stream has an error", streamError.get());
            }
            for (int fetched = 0; fetched < 1024 && this.isSpaceAvailable() && this.hasNextEvent() && (eventEntry = this.nextEventOrNull()) != null; ++fetched) {
                TrackingToken eventToken;
                this.lastScheduledToken = eventToken = (TrackingToken)TrackingToken.fromContext(eventEntry).orElse(null);
                if (this.eventsEqualingLastScheduledToken(eventToken)) {
                    MessageStream.Entry<? extends EventMessage> nextEntry;
                    ArrayList<MessageStream.Entry<? extends EventMessage>> eventEntries = new ArrayList<MessageStream.Entry<? extends EventMessage>>();
                    eventEntries.add(eventEntry);
                    while (this.eventsEqualingLastScheduledToken(eventToken) && (nextEntry = this.nextEventOrNull()) != null) {
                        eventEntries.add(nextEntry);
                    }
                    this.offerEventsToWorkPackages(eventEntries);
                    continue;
                }
                this.offerEventToWorkPackages(eventEntry);
            }
            Coordinator.this.workPackages.values().stream().filter(WorkPackage::isAbortTriggered).forEach(workPackage -> {
                this.advanceReleaseDeadlineFor(workPackage.segment().getSegmentId());
                this.abortWorkPackage((WorkPackage)workPackage, null);
            });
            Coordinator.this.workPackages.values().forEach(WorkPackage::scheduleWorker);
        }

        private void offerEventToWorkPackages(MessageStream.Entry<? extends EventMessage> eventEntry) {
            boolean anyScheduled = false;
            for (WorkPackage workPackage : Coordinator.this.workPackages.values()) {
                boolean scheduled = workPackage.scheduleEvent(eventEntry);
                anyScheduled = anyScheduled || scheduled;
            }
            if (!anyScheduled) {
                EventMessage event = eventEntry.message();
                Coordinator.this.ignoredMessageHandler.accept(event);
            }
        }

        private void offerEventsToWorkPackages(List<MessageStream.Entry<? extends EventMessage>> eventEntries) {
            boolean anyScheduled = false;
            for (WorkPackage workPackage : Coordinator.this.workPackages.values()) {
                boolean scheduled = workPackage.scheduleEvents(eventEntries);
                anyScheduled = anyScheduled || scheduled;
            }
            if (!anyScheduled) {
                eventEntries.forEach(eventEntry -> {
                    EventMessage event = (EventMessage)eventEntry.message();
                    Coordinator.this.ignoredMessageHandler.accept(event);
                });
            }
        }

        private void scheduleImmediateCoordinationTask() {
            this.scheduleCoordinationTask(0L);
        }

        private void scheduleCoordinationTask(long delay) {
            if (this.scheduledGate.compareAndSet(false, true)) {
                Coordinator.this.executorService.schedule(() -> {
                    this.scheduledGate.set(false);
                    this.run();
                }, delay, TimeUnit.MILLISECONDS);
            }
        }

        private void scheduleDelayedCoordinationTask(long delay) {
            if (!this.scheduledGate.get() && this.interruptibleScheduledGate.compareAndSet(false, true)) {
                Coordinator.this.executorService.schedule(() -> {
                    this.interruptibleScheduledGate.set(false);
                    this.run();
                }, delay, TimeUnit.MILLISECONDS);
            }
        }

        private void abortAndScheduleRetry(Exception cause) {
            logger.info("Processor [{}] is releasing claims and scheduling a new coordination task in {}ms", (Object)Coordinator.this.name, (Object)Coordinator.this.errorWaitBackOff);
            Coordinator.this.errorWaitBackOff = Math.min(Coordinator.this.errorWaitBackOff * 2, 60000);
            this.abortWorkPackages(cause).whenComplete((unused, throwable) -> {
                if (throwable != null) {
                    logger.warn("An exception occurred during work packages abort on processor [{}].", (Object)Coordinator.this.name, throwable);
                } else {
                    logger.debug("Work packages have aborted successfully.");
                }
                this.processingGate.set(false);
                logger.debug("Scheduling new coordination task to run in {}ms", (Object)Coordinator.this.errorWaitBackOff);
                CoordinationTask task = new CoordinationTask();
                Coordinator.this.executorService.schedule(task, (long)Coordinator.this.errorWaitBackOff, TimeUnit.MILLISECONDS);
                Coordinator.this.coordinationTask.set(task);
            });
            this.closeStreamQuietly();
        }

        private CompletableFuture<Void> abortWorkPackage(WorkPackage work, Exception cause) {
            int segmentId = work.segment().getSegmentId();
            return ((CompletableFuture)((CompletableFuture)work.abort(cause).thenRun(() -> {
                if (Coordinator.this.workPackages.remove(segmentId, work)) {
                    logger.debug("Processor [{}] released claim on {}.", (Object)Coordinator.this.name, (Object)work.segment());
                }
            })).thenRun(() -> FutureUtils.joinAndUnwrap(Coordinator.this.unitOfWorkFactory.create().executeWithResult(context -> Coordinator.this.tokenStore.releaseClaim(Coordinator.this.name, segmentId, (ProcessingContext)context).thenRun(() -> Coordinator.this.segmentReleasedAction.accept(work.segment())))))).exceptionally(throwable -> {
                logger.info("An exception occurred during the abort of work package [{}] on [{}] processor.", new Object[]{segmentId, Coordinator.this.name, throwable});
                return null;
            });
        }

        private void advanceReleaseDeadlineFor(int segmentId) {
            int errorWaitTime = Coordinator.this.releasesLastBackOffSeconds.compute(segmentId, (i, current) -> current == null ? 1 : Math.min(current * 2, 60));
            Coordinator.this.releasesDeadlines.compute(segmentId, (i, current) -> {
                Instant now = Coordinator.this.clock.instant();
                Instant nextBackOffRetry = now.plusSeconds(errorWaitTime);
                Instant releaseDeadline = current != null && current.isAfter(nextBackOffRetry) ? current : nextBackOffRetry;
                logger.debug("Processor [{}] set release deadline claim to [{}] for Segment [#{}] using backoff of [{}] seconds.", new Object[]{Coordinator.this.name, releaseDeadline, segmentId, errorWaitTime});
                return releaseDeadline;
            });
        }
    }

    private static class NoToken
    implements TrackingToken {
        public static final TrackingToken INSTANCE = new NoToken();

        private NoToken() {
        }

        @Override
        public TrackingToken lowerBound(TrackingToken other) {
            return other;
        }

        @Override
        public TrackingToken upperBound(TrackingToken other) {
            return other;
        }

        @Override
        public boolean covers(TrackingToken other) {
            return false;
        }
    }
}

