/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.eventhandling.processing.streaming.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.axonframework.common.FutureUtils;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;
import org.axonframework.messaging.eventhandling.processing.streaming.pooled.CoordinatorTask;
import org.axonframework.messaging.eventhandling.processing.streaming.pooled.WorkPackage;
import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.Segment;
import org.axonframework.messaging.eventhandling.processing.streaming.token.MergedTrackingToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.TrackingToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MergeTask
extends CoordinatorTask {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String name;
    private final int segmentId;
    private final Map<Integer, WorkPackage> workPackages;
    private final TokenStore tokenStore;
    private final UnitOfWorkFactory unitOfWorkFactory;
    private final Map<Integer, Instant> releasesDeadlines;
    private final Clock clock;

    MergeTask(CompletableFuture<Boolean> result, String name, int segmentId, Map<Integer, WorkPackage> workPackages, Map<Integer, Instant> releasesDeadlines, TokenStore tokenStore, UnitOfWorkFactory unitOfWorkFactory, Clock clock) {
        super(result, name);
        this.name = name;
        this.segmentId = segmentId;
        this.workPackages = workPackages;
        this.releasesDeadlines = releasesDeadlines;
        this.unitOfWorkFactory = unitOfWorkFactory;
        this.tokenStore = tokenStore;
        this.clock = clock;
    }

    @Override
    protected CompletableFuture<Boolean> task() {
        logger.debug("Processor [{}] will perform merge instruction for segment {}.", (Object)this.name, (Object)this.segmentId);
        Segment thisSegment = (Segment)FutureUtils.joinAndUnwrap(this.unitOfWorkFactory.create().executeWithResult(context -> this.tokenStore.fetchSegment(this.name, this.segmentId, (ProcessingContext)context)));
        if (this.segmentId == thisSegment.mergeableSegmentId()) {
            logger.debug("Processor [{}] cannot merge segment {}. A merge request can only be fulfilled if there is more than one segment.", (Object)this.name, (Object)this.segmentId);
            return CompletableFuture.completedFuture(false);
        }
        Segment thatSegment = (Segment)FutureUtils.joinAndUnwrap(this.unitOfWorkFactory.create().executeWithResult(context -> this.tokenStore.fetchSegment(this.name, thisSegment.mergeableSegmentId(), (ProcessingContext)context)));
        CompletableFuture<TrackingToken> thisTokenFuture = this.tokenFor(thisSegment.getSegmentId());
        CompletableFuture<TrackingToken> thatTokenFuture = this.tokenFor(thatSegment.getSegmentId());
        return thisTokenFuture.thenCombine(thatTokenFuture, (thisToken, thatToken) -> this.mergeSegments(thisSegment, (TrackingToken)thisToken, thatSegment, (TrackingToken)thatToken));
    }

    private CompletableFuture<TrackingToken> tokenFor(int segmentId) {
        this.releasesDeadlines.put(segmentId, this.clock.instant().plusSeconds(60L));
        return this.workPackages.containsKey(segmentId) ? this.workPackages.remove(segmentId).abort(null).thenCompose(e -> this.fetchTokenInUnitOfWork(segmentId)) : this.fetchTokenInUnitOfWork(segmentId);
    }

    private CompletableFuture<TrackingToken> fetchTokenInUnitOfWork(int segmentId) {
        return this.unitOfWorkFactory.create().executeWithResult(context -> this.tokenStore.fetchToken(this.name, segmentId, (ProcessingContext)context));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Boolean mergeSegments(Segment thisSegment, TrackingToken thisToken, Segment thatSegment, TrackingToken thatToken) {
        try {
            Segment mergedSegment = thisSegment.mergedWith(thatSegment);
            TrackingToken mergedToken = thatSegment.getSegmentId() < thisSegment.getSegmentId() ? MergedTrackingToken.merged(thatToken, thisToken) : MergedTrackingToken.merged(thisToken, thatToken);
            FutureUtils.joinAndUnwrap(this.unitOfWorkFactory.create().executeWithResult(context -> ((CompletableFuture)((CompletableFuture)this.tokenStore.deleteToken(this.name, thisSegment.getSegmentId(), (ProcessingContext)context).thenCompose(result -> this.tokenStore.deleteToken(this.name, thatSegment.getSegmentId(), (ProcessingContext)context))).thenCompose(result -> this.tokenStore.initializeSegment(mergedToken, this.name, mergedSegment, (ProcessingContext)context))).thenCompose(result -> this.tokenStore.releaseClaim(this.name, mergedSegment.getSegmentId(), (ProcessingContext)context))));
            logger.info("Processor [{}] successfully merged {} with {} into {}.", new Object[]{this.name, thisSegment, thatSegment, mergedSegment});
        }
        finally {
            this.releasesDeadlines.remove(thisSegment.getSegmentId());
            this.releasesDeadlines.remove(thatSegment.getSegmentId());
        }
        return true;
    }

    @Override
    String getDescription() {
        return "Merge Task for segment " + this.segmentId;
    }
}

