/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.concurrent.executor;

import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.concurrent.executor.ClusteredAssignment;
import com.oracle.coherence.concurrent.executor.ComposableContinuation;
import com.oracle.coherence.concurrent.executor.ExecutionPlan;
import com.oracle.coherence.concurrent.executor.ExecutionStrategy;
import com.oracle.coherence.concurrent.executor.PortableAbstractProcessor;
import com.oracle.coherence.concurrent.executor.Result;
import com.oracle.coherence.concurrent.executor.Task;
import com.oracle.coherence.concurrent.executor.TaskExecutorService;
import com.oracle.coherence.concurrent.executor.internal.Cause;
import com.oracle.coherence.concurrent.executor.internal.ExecutorTrace;
import com.oracle.coherence.concurrent.executor.internal.LiveObject;
import com.oracle.coherence.concurrent.executor.options.Debugging;
import com.oracle.coherence.concurrent.executor.processors.LocalOnlyProcessor;
import com.oracle.coherence.concurrent.executor.util.Caches;
import com.oracle.coherence.concurrent.executor.util.OptionsByType;
import com.tangosol.internal.tracing.Span;
import com.tangosol.internal.tracing.SpanContext;
import com.tangosol.internal.tracing.TracingHelper;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.CacheService;
import com.tangosol.net.NamedCache;
import com.tangosol.util.Base;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.Filters;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.KeyAssociatedFilter;
import com.tangosol.util.filter.PresentFilter;
import com.tangosol.util.function.Remote;
import com.tangosol.util.processor.ConditionalRemove;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;

public class ClusteredTaskManager<T, A, R>
implements ExternalizableLite,
LiveObject,
PortableObject {
    protected static final Filter<TaskExecutorService.ExecutorInfo> RUNNING_EXECUTOR_FILTER = Filters.equal(TaskExecutorService.ExecutorInfo::getState, (Object)((Object)TaskExecutorService.ExecutorInfo.State.JOINING)).or(Filters.equal(TaskExecutorService.ExecutorInfo::getState, (Object)((Object)TaskExecutorService.ExecutorInfo.State.RUNNING)));
    protected String m_sTaskId;
    protected volatile long m_lTaskSequence;
    protected int m_nPartitionId;
    protected Task<T> m_task;
    protected ExecutionStrategy m_executionStrategy;
    protected Task.Collector<T, A, R> m_collector;
    protected Remote.Predicate<? super R> m_completionPredicate;
    protected Task.CompletionRunnable<? super R> m_completionRunnable;
    protected boolean m_fRunCompletionRunnable;
    protected Duration m_retainDuration;
    protected Debugging m_debugging;
    protected ExecutionPlan m_executionPlan;
    protected int m_cPendingExecutionStrategyUpdateCount;
    protected int m_cPendingExecutionPlanOptimizationCount;
    protected volatile Result<R> m_lastResult;
    protected int m_nResultVersion;
    protected List<Result<T>> m_listResults;
    protected long m_lCurrentResultGeneration;
    protected long m_lProcessedResultGeneration;
    protected volatile boolean m_fCompleted;
    protected volatile boolean m_fCancelled;
    protected volatile State m_state;
    private SpanContext m_parentSpanContext = SpanContext.Noop.INSTANCE;

    public ClusteredTaskManager() {
    }

    public ClusteredTaskManager(String sTaskId, Task<T> task, ExecutionStrategy executionStrategy, Task.Collector<? super T, A, R> collector, Remote.Predicate<? super R> completionPredicate, Task.CompletionRunnable<? super R> completionRunnable, Duration retainDuration, OptionsByType<Task.Option> optionsByType) {
        this.m_sTaskId = sTaskId;
        this.m_task = task;
        this.m_executionStrategy = executionStrategy;
        this.m_collector = collector == null ? null : collector;
        this.m_completionPredicate = completionPredicate;
        this.m_completionRunnable = completionRunnable;
        this.m_fRunCompletionRunnable = completionRunnable != null;
        this.m_retainDuration = retainDuration;
        this.m_debugging = optionsByType.get(Debugging.class, Debugging.of(7));
        this.m_lastResult = Result.none();
        this.m_nResultVersion = 0;
        this.m_executionPlan = null;
        this.m_cPendingExecutionStrategyUpdateCount = 1;
        this.m_cPendingExecutionPlanOptimizationCount = 0;
        if (collector != null) {
            this.m_listResults = new ArrayList<Result<T>>();
        }
        this.m_lCurrentResultGeneration = 0L;
        this.m_lProcessedResultGeneration = 0L;
        this.m_fCancelled = false;
        this.m_fCompleted = false;
        this.m_state = State.ORCHESTRATED;
        Span parentSpan = TracingHelper.getActiveSpan();
        if (parentSpan != null) {
            this.m_parentSpanContext = parentSpan.getContext();
        }
    }

    public ComposableContinuation onProcess(CacheService service, InvocableMap.Entry entry, Cause cause) {
        String sTaskId = this.getTaskId();
        ExecutorTrace.entering(ClusteredTaskManager.class, "onProcess", new Object[]{service, sTaskId, entry, cause, this.m_state});
        ComposableContinuation continuation = null;
        switch (this.m_state.ordinal()) {
            case 1: {
                continuation = new AsyncProcessChangesContinuation(service, (String)entry.getKey(), cause);
                break;
            }
            case 2: {
                continuation = new CleanupContinuation(service, (String)entry.getKey());
            }
        }
        ExecutorTrace.exiting(ClusteredTaskManager.class, "onProcess", (Object)sTaskId, continuation);
        return continuation;
    }

    public boolean isOwner(String executorId) {
        ExecutionPlan.Action action = this.m_executionPlan.getAction(executorId);
        return action != null && action.isEffectivelyAssigned();
    }

    public String getTaskId() {
        return this.m_sTaskId;
    }

    public State getState() {
        return this.m_state;
    }

    public void setState(State state) {
        this.m_state = state;
    }

    public boolean getRunCompletionRunnable() {
        return this.m_fRunCompletionRunnable;
    }

    public void setRunCompletionRunnable(boolean value) {
        this.m_fRunCompletionRunnable = value;
    }

    public Task.CompletionRunnable<? super R> getCompletionRunnable() {
        return this.m_completionRunnable;
    }

    public Duration getRetainDuration() {
        return this.m_retainDuration;
    }

    public Debugging getDebugging() {
        return this.m_debugging;
    }

    public void asyncProcessChanges(CacheService service, String key, Cause cause) {
        ExecutorTrace.entering(ClusteredTaskManager.class, "asyncProcessChanges", new Object[]{service, key, cause});
        long newResultCount = this.m_lCurrentResultGeneration - this.m_lProcessedResultGeneration;
        Debugging debug = this.m_debugging.getLogLevel() < 7 ? new Debugging() : this.m_debugging;
        ExecutorTrace.log(() -> "------------------------------------", debug);
        ExecutorTrace.log(() -> String.format("Task                               : %s", key), debug);
        ExecutorTrace.log(() -> String.format("State                              : %s", new Object[]{this.m_state}), debug);
        ExecutorTrace.log(() -> String.format("Completed ?                        : %s", this.m_fCompleted), debug);
        ExecutorTrace.log(() -> String.format("Cancelled ?                        : %s", this.m_fCancelled), debug);
        ExecutorTrace.log(() -> String.format("Last Result                        : %s", this.m_lastResult), debug);
        ExecutorTrace.log(() -> String.format("Result Version                     : %s", this.m_nResultVersion), debug);
        ExecutorTrace.log(() -> String.format("Pending Results from Executors     : %s", newResultCount), debug);
        ExecutorTrace.log(() -> String.format("Total Results from Executors       : %s", this.m_lCurrentResultGeneration), debug);
        ExecutorTrace.log(() -> String.format("Pending Execution Strategy Updates : %s", this.m_cPendingExecutionStrategyUpdateCount), debug);
        ExecutorTrace.log(() -> String.format("Pending Execution Plan Updates     : %s", this.m_cPendingExecutionPlanOptimizationCount), debug);
        ExecutorTrace.log(() -> String.format("Execution Plan                     : %s", this.m_executionPlan), debug);
        ExecutorTrace.log(() -> "------------------------------------", debug);
        ExecutorTrace.log("Acquiring Updated Executor Information", debug);
        boolean resultChanged = false;
        Result<R> originalResult = this.m_lastResult;
        boolean executionPlanChanged = false;
        int originalPendingExecutionStrategyUpdateCount = this.m_cPendingExecutionStrategyUpdateCount;
        ExecutorTrace.log("Building Rationales For Updating the Result", debug);
        EnumSet<ExecutionStrategy.EvaluationRationale> rationales = EnumSet.noneOf(ExecutionStrategy.EvaluationRationale.class);
        if (this.m_executionPlan == null) {
            rationales.add(ExecutionStrategy.EvaluationRationale.TASK_CREATED);
        }
        if (this.m_cPendingExecutionStrategyUpdateCount > 0) {
            rationales.add(ExecutionStrategy.EvaluationRationale.EXECUTOR_SERVICES_CHANGED);
        }
        if (newResultCount > 0L) {
            rationales.add(ExecutionStrategy.EvaluationRationale.TASK_RESULT_PROVIDED);
        }
        if (cause == Cause.PARTITIONING) {
            rationales.add(ExecutionStrategy.EvaluationRationale.TASK_RECOVERED);
        }
        ExecutorTrace.log(() -> String.format("Evaluation Rationales              : %s", rationales), debug);
        if (!(this.m_state != State.ORCHESTRATED || this.m_fCompleted || this.m_fCancelled || newResultCount <= 0L && this.m_cPendingExecutionStrategyUpdateCount <= 0)) {
            if (newResultCount > 0L) {
                ExecutorTrace.log("(re) Evaluating Task Result", debug);
                resultChanged = this.asyncEvaluateResult(originalResult);
            }
            if (!(this.m_fCompleted || this.m_fCancelled || newResultCount <= 0L && originalPendingExecutionStrategyUpdateCount <= 0)) {
                ExecutorTrace.log("(re) Evaluating Task Execution Strategy", debug);
                executionPlanChanged = this.asyncEvaluateExecutionStrategy(service, rationales);
            }
        }
        if (Logger.isEnabled((int)debug.getLogLevel())) {
            ExecutorTrace.log(String.format("Result Changed?                    : %s", resultChanged), debug);
            ExecutorTrace.log(String.format("Execution Plan Changed?            : %s", executionPlanChanged), debug);
        }
        ChainedProcessor chain = ChainedProcessor.empty();
        if (resultChanged || newResultCount > 0L) {
            chain.andThen((InvocableMap.EntryProcessor<String, ClusteredTaskManager, ?>)new UpdateCollectedResultProcessor<R>(this.m_lastResult, this.m_lCurrentResultGeneration, this.m_fCompleted));
        }
        if (executionPlanChanged) {
            ExecutorTrace.log(() -> String.format("Updated Execution Plan             : %s", this.m_executionPlan), debug);
            chain.andThen((InvocableMap.EntryProcessor<String, ClusteredTaskManager, ?>)new UpdateExecutionPlanProcessor(this.m_executionPlan, originalPendingExecutionStrategyUpdateCount));
        }
        boolean isLocal = true;
        if (!chain.isEmpty()) {
            ExecutorTrace.log(() -> String.format("Updating Task [%s]", this.m_sTaskId), debug);
            Result result = (Result)Caches.tasks(service).invoke((Object)this.m_sTaskId, LocalOnlyProcessor.of(chain));
            isLocal = result.isPresent();
        }
        if (isLocal || cause == Cause.PARTITIONING) {
            if (!this.m_fCompleted && !this.m_fCancelled && this.m_cPendingExecutionPlanOptimizationCount > 0) {
                ExecutorTrace.log(() -> String.format("Commenced Assigning and Optimizing Execution Plan for Task [%s] using [%s]", this.m_sTaskId, this.m_executionPlan), debug);
                ClusteredAssignment.registerAssignments(this.m_sTaskId, this.m_executionPlan, service);
                ExecutorTrace.log(() -> String.format("Optimizing Execution Plan for Task [%s]", this.m_sTaskId), debug);
                executionPlanChanged = this.m_executionPlan.optimize();
                if (Logger.isEnabled((int)debug.getLogLevel())) {
                    ExecutorTrace.log(String.format("Execution Plan for Task [%s] %s", this.m_sTaskId, executionPlanChanged ? "was optimized" : "did not require optimization"), debug);
                }
                Caches.tasks(service).invoke((Object)this.m_sTaskId, LocalOnlyProcessor.of(new OptimizeExecutionPlanProcessor(this.m_executionPlan, this.m_cPendingExecutionPlanOptimizationCount)));
                ExecutorTrace.log(() -> String.format("Completed Assigning and Optimizing Execution Plan for Task [%s]", this.m_sTaskId), debug);
            } else if (Logger.isEnabled((int)debug.getLogLevel())) {
                String sMsg = "Skipping Optimization of Execution Plan for Task [%s] as it is completed, cancelled or has no pending optimizations to perform";
                ExecutorTrace.log(String.format(sMsg, this.m_sTaskId), debug);
            }
        } else {
            ExecutorTrace.log(() -> String.format("Abandoned Performing Updates as the Task [%s] as it is no longer local", this.m_sTaskId), debug);
        }
        ExecutorTrace.exiting(ClusteredTaskManager.class, "asyncProcessChanges");
    }

    public Task<T> getTask() {
        return this.m_task;
    }

    public boolean isCompleted() {
        return this.m_fCompleted;
    }

    public boolean isCancelled() {
        return this.m_fCancelled;
    }

    public boolean isDone() {
        return this.m_fCompleted || this.m_fCancelled || this.m_state == State.TERMINATING;
    }

    public Result<R> getLastResult() {
        return this.m_lastResult;
    }

    public int getResultVersion() {
        return this.m_nResultVersion;
    }

    public void setResult(Result<T> result) {
        if (this.m_collector == null) {
            ++this.m_nResultVersion;
            this.m_lastResult = result;
        } else {
            this.m_listResults.add(result);
        }
        ++this.m_lCurrentResultGeneration;
    }

    public int getPartitionId() {
        return this.m_nPartitionId;
    }

    public void setPartitionId(int nPartitionId) {
        this.m_nPartitionId = nPartitionId;
    }

    public long getTaskSequence() {
        return this.m_lTaskSequence;
    }

    public void setTaskSequence(long sequence) {
        this.m_lTaskSequence = sequence;
    }

    public void readExternal(DataInput in) throws IOException {
        this.m_sTaskId = ExternalizableHelper.readUTF((DataInput)in);
        this.m_task = (Task)ExternalizableHelper.readObject((DataInput)in);
        this.m_executionStrategy = (ExecutionStrategy)ExternalizableHelper.readObject((DataInput)in);
        this.m_collector = (Task.Collector)ExternalizableHelper.readObject((DataInput)in);
        this.m_completionPredicate = (Remote.Predicate)ExternalizableHelper.readObject((DataInput)in);
        this.m_completionRunnable = (Task.CompletionRunnable)ExternalizableHelper.readObject((DataInput)in);
        this.m_fRunCompletionRunnable = this.m_completionRunnable != null;
        long retainSeconds = ExternalizableHelper.readLong((DataInput)in);
        this.m_retainDuration = retainSeconds == -1L ? null : Duration.ofSeconds(retainSeconds);
        this.m_debugging = (Debugging)ExternalizableHelper.readObject((DataInput)in);
        this.m_lastResult = (Result)ExternalizableHelper.readObject((DataInput)in);
        this.m_nResultVersion = ExternalizableHelper.readInt((DataInput)in);
        this.m_executionPlan = (ExecutionPlan)ExternalizableHelper.readObject((DataInput)in);
        this.m_cPendingExecutionStrategyUpdateCount = ExternalizableHelper.readInt((DataInput)in);
        this.m_cPendingExecutionPlanOptimizationCount = ExternalizableHelper.readInt((DataInput)in);
        if (this.m_collector != null) {
            this.m_listResults = new ArrayList<Result<T>>();
            ExternalizableHelper.readCollection((DataInput)in, this.m_listResults, null);
        }
        this.m_lCurrentResultGeneration = ExternalizableHelper.readLong((DataInput)in);
        this.m_lProcessedResultGeneration = ExternalizableHelper.readLong((DataInput)in);
        this.m_fCancelled = in.readBoolean();
        this.m_fCompleted = in.readBoolean();
        this.m_state = (State)((Object)ExternalizableHelper.readObject((DataInput)in));
        LinkedHashMap wireTracingContext = new LinkedHashMap();
        ExternalizableHelper.readMap((DataInput)in, wireTracingContext, (ClassLoader)Base.getContextClassLoader());
        this.m_parentSpanContext = wireTracingContext.isEmpty() ? SpanContext.Noop.INSTANCE : TracingHelper.getTracer().extract(wireTracingContext);
    }

    public void writeExternal(DataOutput out) throws IOException {
        ExternalizableHelper.writeUTF((DataOutput)out, (String)this.m_sTaskId);
        ExternalizableHelper.writeObject((DataOutput)out, this.m_task);
        ExternalizableHelper.writeObject((DataOutput)out, (Object)this.m_executionStrategy);
        ExternalizableHelper.writeObject((DataOutput)out, this.m_collector);
        ExternalizableHelper.writeObject((DataOutput)out, this.m_completionPredicate);
        ExternalizableHelper.writeObject((DataOutput)out, this.m_completionRunnable);
        ExternalizableHelper.writeLong((DataOutput)out, (long)(this.m_retainDuration == null ? -1L : this.m_retainDuration.getSeconds()));
        ExternalizableHelper.writeObject((DataOutput)out, (Object)this.m_debugging);
        ExternalizableHelper.writeObject((DataOutput)out, this.m_lastResult);
        ExternalizableHelper.writeInt((DataOutput)out, (int)this.m_nResultVersion);
        ExternalizableHelper.writeObject((DataOutput)out, (Object)this.m_executionPlan);
        ExternalizableHelper.writeInt((DataOutput)out, (int)this.m_cPendingExecutionStrategyUpdateCount);
        ExternalizableHelper.writeInt((DataOutput)out, (int)this.m_cPendingExecutionPlanOptimizationCount);
        if (this.m_collector != null) {
            ExternalizableHelper.writeCollection((DataOutput)out, this.m_listResults);
        }
        ExternalizableHelper.writeLong((DataOutput)out, (long)this.m_lCurrentResultGeneration);
        ExternalizableHelper.writeLong((DataOutput)out, (long)this.m_lProcessedResultGeneration);
        out.writeBoolean(this.m_fCancelled);
        out.writeBoolean(this.m_fCompleted);
        ExternalizableHelper.writeObject((DataOutput)out, (Object)((Object)this.m_state));
        Map injectMap = TracingHelper.getTracer().inject(this.m_parentSpanContext);
        ExternalizableHelper.writeMap((DataOutput)out, (Map)(injectMap == null ? Collections.emptyMap() : injectMap));
    }

    public void readExternal(PofReader in) throws IOException {
        this.m_sTaskId = in.readString(0);
        this.m_task = (Task)in.readObject(1);
        this.m_executionStrategy = (ExecutionStrategy)in.readObject(2);
        this.m_collector = (Task.Collector)in.readObject(3);
        this.m_completionPredicate = (Remote.Predicate)in.readObject(4);
        this.m_completionRunnable = (Task.CompletionRunnable)in.readObject(5);
        this.m_fRunCompletionRunnable = this.m_completionRunnable != null;
        long retainSeconds = in.readLong(6);
        this.m_retainDuration = retainSeconds == -1L ? null : Duration.ofSeconds(retainSeconds);
        this.m_debugging = (Debugging)in.readObject(7);
        this.m_lastResult = (Result)in.readObject(8);
        this.m_nResultVersion = in.readInt(9);
        this.m_executionPlan = (ExecutionPlan)in.readObject(10);
        this.m_cPendingExecutionStrategyUpdateCount = in.readInt(11);
        this.m_cPendingExecutionPlanOptimizationCount = in.readInt(12);
        if (this.m_collector != null) {
            this.m_listResults = new ArrayList<Result<T>>();
            this.m_listResults = (List)in.readCollection(13, this.m_listResults);
        }
        this.m_lCurrentResultGeneration = in.readLong(14);
        this.m_lProcessedResultGeneration = in.readLong(15);
        this.m_fCancelled = in.readBoolean(16);
        this.m_fCompleted = in.readBoolean(17);
        this.m_state = (State)((Object)in.readObject(18));
        LinkedHashMap wireTracingContext = new LinkedHashMap();
        in.readMap(19, wireTracingContext);
        this.m_parentSpanContext = TracingHelper.getTracer().extract(wireTracingContext);
    }

    public void writeExternal(PofWriter out) throws IOException {
        out.writeString(0, this.m_sTaskId);
        out.writeObject(1, this.m_task);
        out.writeObject(2, (Object)this.m_executionStrategy);
        out.writeObject(3, this.m_collector);
        out.writeObject(4, this.m_completionPredicate);
        out.writeObject(5, this.m_completionRunnable);
        out.writeLong(6, this.m_retainDuration == null ? -1L : this.m_retainDuration.getSeconds());
        out.writeObject(7, (Object)this.m_debugging);
        out.writeObject(8, this.m_lastResult);
        out.writeInt(9, this.m_nResultVersion);
        out.writeObject(10, (Object)this.m_executionPlan);
        out.writeInt(11, this.m_cPendingExecutionStrategyUpdateCount);
        out.writeInt(12, this.m_cPendingExecutionPlanOptimizationCount);
        out.writeCollection(13, this.m_listResults);
        out.writeLong(14, this.m_lCurrentResultGeneration);
        out.writeLong(15, this.m_lProcessedResultGeneration);
        out.writeBoolean(16, this.m_fCancelled);
        out.writeBoolean(17, this.m_fCompleted);
        out.writeObject(18, (Object)this.m_state);
        out.writeObject(19, (Object)TracingHelper.getTracer().inject(this.m_parentSpanContext));
    }

    public String toString() {
        return "ClusteredTaskManager{taskId='" + this.m_sTaskId + "', resultVersion=" + this.m_nResultVersion + ", currentResultGeneration=" + this.m_lCurrentResultGeneration + ", processedResultGeneration=" + this.m_lProcessedResultGeneration + ", completed=" + this.m_fCompleted + ", cancelled=" + this.m_fCancelled + ", state=" + String.valueOf((Object)this.m_state) + "}";
    }

    protected SpanContext getParentSpanContext() {
        return this.m_parentSpanContext;
    }

    protected boolean asyncEvaluateResult(Result<R> originalResult) {
        String sTaskId = this.getTaskId();
        ExecutorTrace.entering(ClusteredTaskManager.class, "asyncEvaluateResult", sTaskId, originalResult);
        Debugging debug = this.m_debugging.getLogLevel() < 7 ? new Debugging() : this.m_debugging;
        boolean resultChanged = false;
        if (this.m_fCompleted) {
            ExecutorTrace.log(() -> String.format("Skipping result collection for Task [%s] as it is completed", this.m_sTaskId), debug);
        } else if (this.m_collector == null) {
            if (this.m_lastResult.isThrowable()) {
                this.m_fCompleted = true;
            } else {
                resultChanged = true;
            }
            if (!this.m_fCompleted && this.m_executionPlan.isSatisfied()) {
                boolean fCompleted = true;
                Iterator<String> iter = this.m_executionPlan.getIds();
                while (iter.hasNext() && fCompleted) {
                    String executorId = iter.next();
                    if (this.m_executionPlan.getAction(executorId) == ExecutionPlan.Action.COMPLETED) continue;
                    fCompleted = false;
                }
                this.m_fCompleted = fCompleted;
                if (this.m_fCompleted) {
                    ExecutorTrace.log(() -> String.format("Task [%s] has completed on all assigned Executors", this.m_sTaskId), debug);
                }
            }
        } else {
            ExecutorTrace.log(() -> String.format("Collecting result for Task [%s] using collector [%s]", this.m_sTaskId, this.m_collector), debug);
            A container = this.m_collector.supplier().get();
            Remote.Predicate<A> finishable = this.m_collector.finishable();
            BiConsumer<A, A> accumulator = this.m_collector.accumulator();
            try {
                Iterator<Result<T>> iterator = this.m_listResults.iterator();
                while (iterator.hasNext() && !finishable.test(container)) {
                    Result<T> result = iterator.next();
                    if (!result.isPresent()) continue;
                    accumulator.accept(container, result.get());
                }
                Result<R> result = Result.of(this.m_collector.finisher().apply(container));
                ExecutorTrace.log(() -> String.format("Collected result [%s] for Task [%s]", result, this.m_sTaskId), debug);
                if (this.m_executionPlan.isSatisfied() && this.m_completionPredicate.test(result.get()) && !this.m_fCompleted) {
                    this.m_fCompleted = true;
                    this.m_lastResult = result;
                    resultChanged = true;
                }
                if (originalResult == null || !originalResult.equals(result)) {
                    this.m_lastResult = result;
                    resultChanged = true;
                } else {
                    ExecutorTrace.log(() -> String.format("Collected result [%s] for Task [%s] hasn't changed (no changes to publish)", result, this.m_sTaskId), debug);
                }
            }
            catch (Throwable t) {
                this.m_fCompleted = true;
                this.m_lastResult = Result.throwable(t);
                ExecutorTrace.log(() -> String.format("Task [%s] failed due to [%s]", this.m_sTaskId, t), debug);
            }
        }
        ExecutorTrace.exiting(ClusteredTaskManager.class, "asyncEvaluateResult", (Object)sTaskId, resultChanged);
        return resultChanged;
    }

    protected boolean asyncEvaluateExecutionStrategy(CacheService service, EnumSet<ExecutionStrategy.EvaluationRationale> rationales) {
        boolean fExecutionPlanUpdated;
        Debugging debug = this.m_debugging.getLogLevel() < 7 ? new Debugging() : this.m_debugging;
        ExecutorTrace.entering(ClusteredTaskManager.class, "asyncEvaluateExecutionStrategy", service, this.m_sTaskId, rationales);
        ExecutorTrace.log(() -> String.format("Evaluating the Execution Plan for Task [%s] due to [%s]", this.m_sTaskId, rationales), debug);
        try {
            NamedCache executorInfoCache = Caches.executors(service);
            Map executors = executorInfoCache.invokeAll(RUNNING_EXECUTOR_FILTER, InvocableMap.Entry::getValue);
            ExecutionPlan executionPlan = this.m_executionStrategy.analyze(this.m_executionPlan, executors, rationales);
            boolean bl = fExecutionPlanUpdated = executionPlan == null && this.m_executionPlan != null || executionPlan != null && this.m_executionPlan == null || executionPlan != null && !executionPlan.equals(this.m_executionPlan);
            if (ExecutorTrace.isEnabled()) {
                ExecutorTrace.log(String.format("Current execution plan [%s]", this.m_executionPlan));
                ExecutorTrace.log(String.format("Updated(?) execution plan [%s]", executionPlan));
            }
            if (fExecutionPlanUpdated) {
                this.m_executionPlan = executionPlan;
                ExecutorTrace.log(() -> String.format("Execution Plan for Task [%s] has changed.  Will be updated.", this.m_sTaskId), debug);
                ++this.m_cPendingExecutionPlanOptimizationCount;
            } else {
                ExecutorTrace.log(() -> String.format("Execution Plan for Task [%s] was not changed.  Will not be updated.", this.m_sTaskId), debug);
            }
        }
        catch (Exception e) {
            ExecutorTrace.throwing(ClusteredTaskManager.class, "asyncEvaluateExecutionStrategy", e, new Object[0]);
            throw Base.ensureRuntimeException((Throwable)e);
        }
        this.m_cPendingExecutionStrategyUpdateCount = 0;
        ExecutorTrace.exiting(ClusteredTaskManager.class, "asyncEvaluateExecutionStrategy", (Object)this.m_sTaskId, fExecutionPlanUpdated);
        return fExecutionPlanUpdated;
    }

    protected void cleanup(CacheService service, String sKey) {
        String sTaskId = this.getTaskId();
        Duration retain = this.m_retainDuration;
        ExecutorTrace.entering(ClusteredTaskManager.class, "cleanup", service, sKey, sTaskId, retain);
        ClusteredAssignment.removeAssignments(sTaskId, service);
        if (retain == null) {
            Caches.tasks(service).remove((Object)sKey);
        } else if (retain != Duration.ZERO) {
            long cRetainMillis = this.m_retainDuration.toMillis();
            ClusteredTaskManager value = this;
            this.m_retainDuration = Duration.ZERO;
            Caches.tasks(service).put((Object)sKey, (Object)value, cRetainMillis);
        }
        this.cleanProperties(service);
        ExecutorTrace.exiting(ClusteredTaskManager.class, "cleanup", (Object)sTaskId, new Object[0]);
    }

    protected void cleanProperties(CacheService service) {
        String sTaskId = this.getTaskId();
        KeyAssociatedFilter filterAsc = new KeyAssociatedFilter((Filter)new EqualsFilter("getTaskId", (Object)sTaskId), (Object)sTaskId);
        ExecutorTrace.entering(ClusteredTaskManager.class, "cleanProperties", sTaskId);
        Caches.properties(service).invokeAll((Filter)filterAsc, (InvocableMap.EntryProcessor)new ConditionalRemove((Filter)PresentFilter.INSTANCE, false));
        ExecutorTrace.exiting(ClusteredTaskManager.class, "cleanProperties", (Object)sTaskId, new Object[0]);
    }

    @Override
    public ComposableContinuation onInserted(CacheService service, InvocableMap.Entry entry, Cause cause) {
        return this.onProcess(service, entry, cause);
    }

    @Override
    public ComposableContinuation onUpdated(CacheService service, InvocableMap.Entry entry, Cause cause) {
        return this.onProcess(service, entry, cause);
    }

    @Override
    public ComposableContinuation onDeleted(CacheService service, InvocableMap.Entry entry, Cause cause) {
        return null;
    }

    public static enum State {
        PENDING,
        ORCHESTRATED,
        TERMINATING;

    }

    public class AsyncProcessChangesContinuation
    implements ComposableContinuation {
        protected final CacheService f_cacheService;
        protected final String f_sTaskId;
        protected final Cause f_cause;
        protected final String f_sId;

        public AsyncProcessChangesContinuation(CacheService cacheService, String sTaskId, Cause cause) {
            this.f_cacheService = cacheService;
            this.f_sTaskId = sTaskId;
            this.f_cause = cause;
            this.f_sId = Integer.toHexString(this.hashCode());
        }

        public void proceed(Object o) {
            ClusteredTaskManager.this.asyncProcessChanges(this.f_cacheService, this.f_sTaskId, this.f_cause);
        }

        @Override
        public ComposableContinuation compose(ComposableContinuation continuation) {
            return continuation;
        }

        public String toString() {
            return "AsyncProcessChangesContinuation{id=" + this.f_sId + ", taskId=" + this.f_sTaskId + ", cause=" + String.valueOf((Object)this.f_cause) + "}";
        }
    }

    public class CleanupContinuation
    implements ComposableContinuation {
        protected final CacheService f_cacheService;
        protected final String f_sTaskId;

        public CleanupContinuation(CacheService cacheService, String sTaskId) {
            this.f_cacheService = cacheService;
            this.f_sTaskId = sTaskId;
        }

        public void proceed(Object o) {
            ClusteredTaskManager.this.cleanup(this.f_cacheService, this.f_sTaskId);
        }

        @Override
        public ComposableContinuation compose(ComposableContinuation continuation) {
            return this;
        }

        public String toString() {
            return "CleanupContinuation{taskId='" + this.f_sTaskId + "'}";
        }
    }

    public static class ChainedProcessor
    extends PortableAbstractProcessor<String, ClusteredTaskManager, Void> {
        protected ArrayList<InvocableMap.EntryProcessor> m_listProcessors = new ArrayList();

        public ChainedProcessor andThen(InvocableMap.EntryProcessor<String, ClusteredTaskManager, ?> processor) {
            this.m_listProcessors.add(processor);
            return this;
        }

        public static ChainedProcessor empty() {
            return new ChainedProcessor();
        }

        public boolean isEmpty() {
            return this.m_listProcessors.isEmpty();
        }

        public Void process(InvocableMap.Entry<String, ClusteredTaskManager> entry) {
            for (InvocableMap.EntryProcessor processor : this.m_listProcessors) {
                processor.process(entry);
            }
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_listProcessors = (ArrayList)in.readCollection(0, this.m_listProcessors);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeCollection(0, this.m_listProcessors);
        }
    }

    public static class UpdateCollectedResultProcessor<T>
    extends PortableAbstractProcessor<String, ClusteredTaskManager, Void> {
        protected Result<T> m_newResult;
        protected long m_lProcessedResultMapGeneration;
        protected boolean m_fCompleted;

        public UpdateCollectedResultProcessor() {
        }

        public UpdateCollectedResultProcessor(Result<T> newResult, long lProcessedResultMapGeneration, boolean fCompleted) {
            this.m_lProcessedResultMapGeneration = lProcessedResultMapGeneration;
            this.m_newResult = newResult;
            this.m_fCompleted = fCompleted;
        }

        public Void process(InvocableMap.Entry<String, ClusteredTaskManager> entry) {
            ExecutorTrace.entering(UpdateCollectedResultProcessor.class, "process", () -> String.format("key=%s, value=%s", entry.getKey(), entry.getValue()));
            if (entry.isPresent()) {
                Debugging debug;
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                Debugging debugging = debug = manager.m_debugging.getLogLevel() < 7 ? new Debugging() : manager.m_debugging;
                if (manager.m_collector != null) {
                    if (manager.m_lastResult == null) {
                        ExecutorTrace.log(() -> String.format("Task [%s] has a newly collected result [%s]", manager.getTaskId(), this.m_newResult), debug);
                    } else {
                        ExecutorTrace.log(() -> String.format("Task [%s] collected result will be updated from [%s] to [%s]", manager.getTaskId(), manager.getLastResult(), this.m_newResult), debug);
                    }
                    if (manager.m_lastResult == null && this.m_newResult != null || !Objects.equals(manager.m_lastResult, this.m_newResult)) {
                        manager.m_lastResult = this.m_newResult;
                        ++manager.m_nResultVersion;
                    } else {
                        ExecutorTrace.log(() -> String.format("Task [%s] result [%s] has not changed.  No update will be performed", manager.getTaskId(), manager.getLastResult()), debug);
                    }
                }
                manager.m_lProcessedResultGeneration = this.m_lProcessedResultMapGeneration;
                if (!manager.m_fCompleted && this.m_fCompleted) {
                    manager.m_fCompleted = true;
                    manager.m_state = State.TERMINATING;
                }
                ExecutorTrace.log(() -> String.format("Task [%s] (completed=[%s], cancelled=[%s], state=[%s], resultVersion[%s]", new Object[]{manager.getTaskId(), manager.isCompleted(), manager.isCancelled(), manager.m_state, manager.m_nResultVersion}), debug);
                entry.setValue((Object)manager);
            } else {
                Logger.fine(() -> String.format("Ignoring request to update Task [%s] as it is no longer present", entry.getKey()));
            }
            ExecutorTrace.exiting(UpdateCollectedResultProcessor.class, "process");
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_newResult = (Result)in.readObject(0);
            this.m_lProcessedResultMapGeneration = in.readLong(1);
            this.m_fCompleted = in.readBoolean(2);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeObject(0, this.m_newResult);
            out.writeLong(1, this.m_lProcessedResultMapGeneration);
            out.writeBoolean(2, this.m_fCompleted);
        }
    }

    public static class UpdateExecutionPlanProcessor
    extends PortableAbstractProcessor<String, ClusteredTaskManager, Void> {
        protected ExecutionPlan m_executionPlan;
        protected int m_cPendingExecutionStrategyUpdateCount;

        public UpdateExecutionPlanProcessor() {
        }

        public UpdateExecutionPlanProcessor(ExecutionPlan executionPlan, int cPendingExecutionStrategyUpdateCount) {
            this.m_executionPlan = executionPlan;
            this.m_cPendingExecutionStrategyUpdateCount = cPendingExecutionStrategyUpdateCount;
        }

        public Void process(InvocableMap.Entry<String, ClusteredTaskManager> entry) {
            ExecutorTrace.entering(UpdateExecutionPlanProcessor.class, "process", () -> String.format("key=%s, value=%s", entry.getKey(), entry.getValue()));
            if (entry.isPresent()) {
                Debugging debug;
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                Debugging debugging = debug = manager.m_debugging.getLogLevel() < 7 ? new Debugging() : manager.m_debugging;
                if (manager.isCompleted()) {
                    ExecutorTrace.log(() -> String.format("Skipping Execution Plan Update for Task [%s] as it is completed", manager.getTaskId()), debug);
                } else {
                    ExecutorTrace.log(() -> String.format("Updating Execution Plan for Task [%s]", manager.getTaskId()), debug);
                    manager.m_executionPlan = this.m_executionPlan;
                    manager.m_cPendingExecutionStrategyUpdateCount = Math.max(0, manager.m_cPendingExecutionStrategyUpdateCount - this.m_cPendingExecutionStrategyUpdateCount);
                    ++manager.m_cPendingExecutionPlanOptimizationCount;
                    entry.setValue((Object)manager);
                }
            }
            ExecutorTrace.exiting(UpdateExecutionPlanProcessor.class, "process");
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_executionPlan = (ExecutionPlan)in.readObject(0);
            this.m_cPendingExecutionStrategyUpdateCount = in.readInt(1);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeObject(0, (Object)this.m_executionPlan);
            out.writeInt(1, this.m_cPendingExecutionStrategyUpdateCount);
        }
    }

    public static class OptimizeExecutionPlanProcessor
    extends PortableAbstractProcessor<String, ClusteredTaskManager, Void> {
        protected ExecutionPlan m_executionPlan;
        protected int m_cPendingExecutionPlanOptimizationCount;

        public OptimizeExecutionPlanProcessor() {
        }

        public OptimizeExecutionPlanProcessor(ExecutionPlan executionPlan, int cPendingExecutionPlanOptimizationCount) {
            this.m_executionPlan = executionPlan;
            this.m_cPendingExecutionPlanOptimizationCount = cPendingExecutionPlanOptimizationCount;
        }

        public Void process(InvocableMap.Entry<String, ClusteredTaskManager> entry) {
            ExecutorTrace.entering(OptimizeExecutionPlanProcessor.class, "process", () -> String.format("key=%s, value=%s", entry.getKey(), entry.getValue()));
            if (entry.isPresent()) {
                Debugging debug;
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                Debugging debugging = debug = manager.m_debugging.getLogLevel() < 7 ? new Debugging() : manager.m_debugging;
                if (manager.isCompleted()) {
                    ExecutorTrace.log(() -> String.format("Skipping Execution Plan Optimization for Task [%s] as the Task is completed", manager.getTaskId()), debug);
                } else {
                    ExecutionPlan currentPlan = manager.m_executionPlan;
                    if (currentPlan != null) {
                        Iterator<String> iter = this.m_executionPlan.getIds();
                        while (iter.hasNext()) {
                            String executorId = iter.next();
                            ExecutionPlan.Action currentAction = currentPlan.getAction(executorId);
                            if (currentAction != ExecutionPlan.Action.COMPLETED) continue;
                            this.m_executionPlan.setAction(executorId, ExecutionPlan.Action.COMPLETED);
                        }
                    }
                    ExecutorTrace.log(() -> String.format("Optimized Execution Plan for Task [%s].  Now [%s]", manager.m_sTaskId, this.m_executionPlan), debug);
                    manager.m_executionPlan = this.m_executionPlan;
                    manager.m_cPendingExecutionPlanOptimizationCount = Math.max(0, manager.m_cPendingExecutionPlanOptimizationCount - this.m_cPendingExecutionPlanOptimizationCount);
                    entry.setValue((Object)manager);
                }
            }
            ExecutorTrace.exiting(OptimizeExecutionPlanProcessor.class, "process");
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_executionPlan = (ExecutionPlan)in.readObject(0);
            this.m_cPendingExecutionPlanOptimizationCount = in.readInt(1);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeObject(0, (Object)this.m_executionPlan);
            out.writeInt(1, this.m_cPendingExecutionPlanOptimizationCount);
        }
    }

    public static class UpdateContributedResultProcessor
    extends PortableAbstractProcessor<String, ClusteredTaskManager, Boolean> {
        protected String m_sExecutorId;
        protected Result m_result;

        public UpdateContributedResultProcessor() {
        }

        public UpdateContributedResultProcessor(String sExecutorId, Result result) {
            this.m_sExecutorId = sExecutorId;
            this.m_result = result;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Boolean process(InvocableMap.Entry<String, ClusteredTaskManager> entry) {
            ExecutorTrace.entering(UpdateContributedResultProcessor.class, "process", () -> String.format("key=%s, value=%s", entry.getKey(), entry.getValue()));
            try {
                if (entry.isPresent()) {
                    Debugging debug;
                    ClusteredTaskManager taskManager = (ClusteredTaskManager)entry.getValue();
                    Debugging debugging = debug = taskManager.m_debugging.getLogLevel() < 7 ? new Debugging() : taskManager.m_debugging;
                    if (taskManager.isOwner(this.m_sExecutorId)) {
                        taskManager.setResult(this.m_result);
                        entry.setValue((Object)taskManager);
                        ExecutorTrace.log(() -> String.format("Result[%s] contributed for Task [%s] by Executor [%s]: %s", taskManager.m_lCurrentResultGeneration, taskManager.getTaskId(), this.m_sExecutorId, this.m_result), debug);
                        Boolean bl = true;
                        return bl;
                    }
                    if (Logger.isEnabled((int)debug.getLogLevel())) {
                        String sMsg = "Ignoring result contributed for Task [%s] as the Task is no longer assigned to Executor [%s]: %s";
                        ExecutorTrace.log(String.format(sMsg, taskManager.getTaskId(), this.m_sExecutorId, this.m_result), debug);
                    }
                    Boolean bl = false;
                    return bl;
                }
                if (Logger.isEnabled((int)5)) {
                    Logger.fine((String)String.format("Ignoring result contributed for Task [%s] as the Task is no longer present. Executor [%s]: %s", entry.getKey(), this.m_sExecutorId, this.m_result));
                }
                Boolean bl = false;
                return bl;
            }
            finally {
                ExecutorTrace.exiting(UpdateContributedResultProcessor.class, "process");
            }
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_sExecutorId = in.readString(0);
            this.m_result = (Result)in.readObject(1);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeString(0, this.m_sExecutorId);
            out.writeObject(1, (Object)this.m_result);
        }
    }

    public static class CancellationProcessor
    extends PortableAbstractProcessor<String, ClusteredTaskManager, Boolean> {
        public Boolean process(InvocableMap.Entry<String, ClusteredTaskManager> entry) {
            if (entry.isPresent()) {
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                if (manager.m_state == State.ORCHESTRATED) {
                    manager.m_fCancelled = true;
                    manager.m_state = State.TERMINATING;
                    entry.setValue((Object)manager);
                    return true;
                }
            }
            return false;
        }
    }

    public static class SetActionProcessor
    extends PortableAbstractProcessor<String, ClusteredTaskManager, Boolean> {
        protected String m_sExecutorId;
        protected EnumSet<ExecutionPlan.Action> m_previous;
        protected ExecutionPlan.Action m_desired;

        public SetActionProcessor() {
        }

        public SetActionProcessor(String sExecutorId, ExecutionPlan.Action desired) {
            this.m_sExecutorId = sExecutorId;
            this.m_previous = null;
            this.m_desired = desired;
        }

        public SetActionProcessor(String sExecutorId, EnumSet<ExecutionPlan.Action> previous, ExecutionPlan.Action desired) {
            this.m_sExecutorId = sExecutorId;
            this.m_previous = previous;
            this.m_desired = desired;
        }

        public SetActionProcessor(String sExecutorId, ExecutionPlan.Action previous, ExecutionPlan.Action desired) {
            this.m_sExecutorId = sExecutorId;
            this.m_previous = EnumSet.of(previous);
            this.m_desired = desired;
        }

        public Boolean process(InvocableMap.Entry<String, ClusteredTaskManager> entry) {
            boolean result = false;
            ExecutorTrace.entering(SetActionProcessor.class, "process", () -> String.format("key=%s, value=%s", entry.getKey(), entry.getValue()));
            if (entry.isPresent()) {
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                ExecutionPlan.Action existing = manager.m_executionPlan.getAction(this.m_sExecutorId);
                if (existing != null && this.m_previous != null && this.m_previous.contains((Object)existing) || this.m_previous == null || this.m_previous.isEmpty()) {
                    Debugging debug = manager.m_debugging.getLogLevel() < 7 ? new Debugging() : manager.m_debugging;
                    ExecutorTrace.log(() -> String.format("Changing Executor [%s] action from [%s] to [%s]", new Object[]{this.m_sExecutorId, existing, this.m_desired}), debug);
                    boolean isStateSet = manager.m_executionPlan.setAction(this.m_sExecutorId, this.m_desired);
                    if (isStateSet) {
                        entry.setValue((Object)manager);
                    }
                    result = isStateSet;
                }
            }
            ExecutorTrace.exiting(SetActionProcessor.class, "process", (Object)result, new Object[0]);
            return result;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_sExecutorId = in.readString(0);
            ExecutionPlan.Action action = (ExecutionPlan.Action)((Object)in.readObject(1));
            if (action != null) {
                this.m_previous = EnumSet.of(action);
            }
            this.m_desired = (ExecutionPlan.Action)((Object)in.readObject(2));
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeString(0, this.m_sExecutorId);
            out.writeObject(1, this.m_previous == null ? null : this.m_previous.iterator().next());
            out.writeObject(2, (Object)this.m_desired);
        }
    }

    public static class NotifyExecutionStrategyProcessor
    extends PortableAbstractProcessor<String, ClusteredTaskManager, Boolean> {
        public Boolean process(InvocableMap.Entry<String, ClusteredTaskManager> entry) {
            if (entry.isPresent()) {
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                if (!manager.isCompleted()) {
                    ++manager.m_cPendingExecutionStrategyUpdateCount;
                    entry.setValue((Object)manager);
                    return true;
                }
                return false;
            }
            return false;
        }
    }
}

