/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.concurrent.executor;

import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.concurrent.executor.ClusteredAssignment;
import com.oracle.coherence.concurrent.executor.ClusteredProperties;
import com.oracle.coherence.concurrent.executor.ComposableContinuation;
import com.oracle.coherence.concurrent.executor.ExecutionPlan;
import com.oracle.coherence.concurrent.executor.ExecutionStrategy;
import com.oracle.coherence.concurrent.executor.PortableAbstractProcessor;
import com.oracle.coherence.concurrent.executor.Result;
import com.oracle.coherence.concurrent.executor.Task;
import com.oracle.coherence.concurrent.executor.TaskExecutorService;
import com.oracle.coherence.concurrent.executor.internal.Cause;
import com.oracle.coherence.concurrent.executor.internal.ExecutorTrace;
import com.oracle.coherence.concurrent.executor.internal.LiveObject;
import com.oracle.coherence.concurrent.executor.options.Debugging;
import com.oracle.coherence.concurrent.executor.processors.LocalOnlyProcessor;
import com.oracle.coherence.concurrent.executor.util.FilteringIterable;
import com.oracle.coherence.concurrent.executor.util.OptionsByType;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.CacheService;
import com.tangosol.net.NamedCache;
import com.tangosol.util.Base;
import com.tangosol.util.Filter;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.KeyAssociatedFilter;
import com.tangosol.util.filter.PresentFilter;
import com.tangosol.util.function.Remote;
import com.tangosol.util.processor.ConditionalRemove;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;

public class ClusteredTaskManager<T, A, R>
implements Serializable,
LiveObject,
PortableObject {
    public static String CACHE_NAME = "executor-tasks";
    protected String m_sTaskId;
    protected long m_lTaskSequence;
    protected int m_nPartitionId;
    protected Task<T> m_task;
    protected ExecutionStrategy m_executionStrategy;
    protected Task.Collector<T, A, R> m_collector;
    protected Remote.Predicate<? super R> m_completionPredicate;
    protected Task.CompletionRunnable<? super R> m_completionRunnable;
    protected boolean m_fRunCompletionRunnable;
    protected Duration m_retainDuration;
    protected Debugging m_debugging;
    protected ExecutionPlan m_executionPlan;
    protected int m_cPendingExecutionStrategyUpdateCount;
    protected int m_cPendingExecutionPlanOptimizationCount;
    protected Result<R> m_lastResult;
    protected int m_nResultVersion;
    protected List<Result<T>> m_listResults;
    protected long m_lCurrentResultGeneration;
    protected long m_lProcessedResultGeneration;
    protected boolean m_fCompleted;
    protected boolean m_fCancelled;
    protected State m_state;

    public ClusteredTaskManager() {
    }

    public ClusteredTaskManager(String sTaskId, Task<T> task, ExecutionStrategy executionStrategy, Task.Collector<? super T, A, R> collector, Remote.Predicate<? super R> completionPredicate, Task.CompletionRunnable<? super R> completionRunnable, Duration retainDuration, OptionsByType<Task.Option> optionsByType) {
        this.m_sTaskId = sTaskId;
        this.m_task = task;
        this.m_executionStrategy = executionStrategy;
        this.m_collector = collector == null ? null : collector;
        this.m_completionPredicate = completionPredicate;
        this.m_completionRunnable = completionRunnable;
        this.m_fRunCompletionRunnable = completionRunnable != null;
        this.m_retainDuration = retainDuration;
        this.m_debugging = optionsByType.get(Debugging.class, Debugging.of(7));
        this.m_lastResult = Result.none();
        this.m_nResultVersion = 0;
        this.m_executionPlan = null;
        this.m_cPendingExecutionStrategyUpdateCount = 1;
        this.m_cPendingExecutionPlanOptimizationCount = 0;
        if (collector != null) {
            this.m_listResults = new ArrayList<Result<T>>();
        }
        this.m_lCurrentResultGeneration = 0L;
        this.m_lProcessedResultGeneration = 0L;
        this.m_fCancelled = false;
        this.m_fCompleted = false;
        this.m_state = State.ORCHESTRATED;
    }

    public ComposableContinuation onProcess(CacheService service, InvocableMap.Entry entry, Cause cause) {
        ComposableContinuation continuation = null;
        switch (this.m_state) {
            case ORCHESTRATED: {
                continuation = new AsyncProcessChangesContinuation(service, (String)entry.getKey(), cause);
                break;
            }
            case TERMINATING: {
                continuation = new CleanupContinuation(service, (String)entry.getKey());
            }
        }
        return continuation;
    }

    public boolean isOwner(String executorId) {
        ExecutionPlan.Action action = this.m_executionPlan.getAction(executorId);
        return action != null && action.isEffectivelyAssigned();
    }

    public String getTaskId() {
        return this.m_sTaskId;
    }

    public State getState() {
        return this.m_state;
    }

    public void setState(State state) {
        this.m_state = state;
    }

    public boolean getRunCompletionRunnable() {
        return this.m_fRunCompletionRunnable;
    }

    public void setRunCompletionRunnable(boolean value) {
        this.m_fRunCompletionRunnable = value;
    }

    public Task.CompletionRunnable<? super R> getCompletionRunnable() {
        return this.m_completionRunnable;
    }

    public Duration getRetainDuration() {
        return this.m_retainDuration;
    }

    public Debugging getDebugging() {
        return this.m_debugging;
    }

    public void asyncProcessChanges(CacheService service, String key, Cause cause) {
        long newResultCount = this.m_lCurrentResultGeneration - this.m_lProcessedResultGeneration;
        Debugging debug = this.m_debugging.getLogLevel() < 7 ? new Debugging() : this.m_debugging;
        ExecutorTrace.log(() -> "------------------------------------", debug);
        ExecutorTrace.log(() -> String.format("Task                               : %s", key), debug);
        ExecutorTrace.log(() -> String.format("State                              : %s", new Object[]{this.m_state}), debug);
        ExecutorTrace.log(() -> String.format("Completed ?                        : %s", this.m_fCompleted), debug);
        ExecutorTrace.log(() -> String.format("Cancelled ?                        : %s", this.m_fCompleted), debug);
        ExecutorTrace.log(() -> String.format("Last Result                        : %s", this.m_lastResult), debug);
        ExecutorTrace.log(() -> String.format("Result Version                     : %s", this.m_nResultVersion), debug);
        ExecutorTrace.log(() -> String.format("Pending Results from Executors     : %s", newResultCount), debug);
        ExecutorTrace.log(() -> String.format("Total Results from Executors       : %s", this.m_lCurrentResultGeneration), debug);
        ExecutorTrace.log(() -> String.format("Pending Execution Strategy Updates : %s", this.m_cPendingExecutionStrategyUpdateCount), debug);
        ExecutorTrace.log(() -> String.format("Pending Execution Plan Updates     : %s", this.m_cPendingExecutionPlanOptimizationCount), debug);
        ExecutorTrace.log(() -> String.format("Execution Plan                     : %s", this.m_executionPlan), debug);
        ExecutorTrace.log(() -> "------------------------------------", debug);
        ExecutorTrace.log("Acquiring Updated Executor Information", debug);
        boolean resultChanged = false;
        Result<R> originalResult = this.m_lastResult;
        boolean executionPlanChanged = false;
        int originalPendingExecutionStrategyUpdateCount = this.m_cPendingExecutionStrategyUpdateCount;
        ExecutorTrace.log("Building Rationales For Updating the Result", debug);
        EnumSet<ExecutionStrategy.EvaluationRationale> rationales = EnumSet.noneOf(ExecutionStrategy.EvaluationRationale.class);
        if (this.m_executionPlan == null) {
            rationales.add(ExecutionStrategy.EvaluationRationale.TASK_CREATED);
        }
        if (this.m_cPendingExecutionStrategyUpdateCount > 0) {
            rationales.add(ExecutionStrategy.EvaluationRationale.EXECUTOR_SERVICES_CHANGED);
        }
        if (newResultCount > 0L) {
            rationales.add(ExecutionStrategy.EvaluationRationale.TASK_RESULT_PROVIDED);
        }
        if (cause == Cause.PARTITIONING) {
            rationales.add(ExecutionStrategy.EvaluationRationale.TASK_RECOVERED);
        }
        ExecutorTrace.log(() -> String.format("Evaluation Rationales              : %s", rationales), debug);
        if (!(this.m_state != State.ORCHESTRATED || this.m_fCompleted || this.m_fCancelled || newResultCount <= 0L && this.m_cPendingExecutionStrategyUpdateCount <= 0)) {
            if (newResultCount > 0L) {
                ExecutorTrace.log("(re) Evaluating Task Result", debug);
                resultChanged = this.asyncEvaluateResult(originalResult);
            }
            if (!(this.m_fCompleted || this.m_fCancelled || newResultCount <= 0L && originalPendingExecutionStrategyUpdateCount <= 0)) {
                ExecutorTrace.log("(re) Evaluating Task Execution Strategy", debug);
                executionPlanChanged = this.asyncEvaluateExecutionStrategy(service, rationales);
            }
        }
        if (Logger.isEnabled((int)debug.getLogLevel())) {
            ExecutorTrace.log(String.format("Result Changed?                    : %s", resultChanged), debug);
            ExecutorTrace.log(String.format("Execution Plan Changed?            : %s", executionPlanChanged), debug);
        }
        ChainedProcessor chain = ChainedProcessor.empty();
        if (resultChanged || newResultCount > 0L) {
            chain.andThen((InvocableMap.EntryProcessor)new UpdateCollectedResultProcessor<R>(this.m_lastResult, this.m_lCurrentResultGeneration, this.m_fCompleted));
        }
        if (executionPlanChanged) {
            ExecutorTrace.log(() -> String.format("Updated Execution Plan             : %s", this.m_executionPlan), debug);
            chain.andThen((InvocableMap.EntryProcessor)new UpdateExecutionPlanProcessor(this.m_executionPlan, originalPendingExecutionStrategyUpdateCount));
        }
        boolean isLocal = true;
        if (!chain.isEmpty()) {
            ExecutorTrace.log(() -> String.format("Updating Task [%s]", this.m_sTaskId), debug);
            Result result = (Result)service.ensureCache(CACHE_NAME, null).invoke((Object)this.m_sTaskId, (InvocableMap.EntryProcessor)LocalOnlyProcessor.of((InvocableMap.EntryProcessor)chain));
            isLocal = result.isPresent();
        }
        if (isLocal || cause == Cause.PARTITIONING) {
            if (!this.m_fCompleted && !this.m_fCancelled && this.m_cPendingExecutionPlanOptimizationCount > 0) {
                ExecutorTrace.log(() -> String.format("Commenced Assigning and Optimizing Execution Plan for Task [%s] using [%s]", this.m_sTaskId, this.m_executionPlan), debug);
                ClusteredAssignment.registerAssignments(this.m_sTaskId, this.m_executionPlan, service);
                ExecutorTrace.log(() -> String.format("Optimizing Execution Plan for Task [%s]", this.m_sTaskId), debug);
                executionPlanChanged = this.m_executionPlan.optimize();
                if (Logger.isEnabled((int)debug.getLogLevel())) {
                    ExecutorTrace.log(String.format("Execution Plan for Task [%s] %s", this.m_sTaskId, executionPlanChanged ? "was optimized" : "did not require optimization"), debug);
                }
                service.ensureCache(CACHE_NAME, null).invoke((Object)this.m_sTaskId, (InvocableMap.EntryProcessor)LocalOnlyProcessor.of((InvocableMap.EntryProcessor)new OptimizeExecutionPlanProcessor(this.m_executionPlan, this.m_cPendingExecutionPlanOptimizationCount)));
                ExecutorTrace.log(() -> String.format("Completed Assigning and Optimizing Execution Plan for Task [%s]", this.m_sTaskId), debug);
            } else if (Logger.isEnabled((int)debug.getLogLevel())) {
                String sMsg = "Skipping Optimization of Execution Plan for Task [%s] as it is completed, cancelled or has no pending optimizations to perform";
                ExecutorTrace.log(String.format(sMsg, this.m_sTaskId), debug);
            }
        } else {
            ExecutorTrace.log(() -> String.format("Abandoned Performing Updates as the Task [%s] as it is no longer local", this.m_sTaskId), debug);
        }
    }

    public Task<T> getTask() {
        return this.m_task;
    }

    public boolean isCompleted() {
        return this.m_fCompleted;
    }

    public boolean isCancelled() {
        return this.m_fCancelled;
    }

    public boolean isDone() {
        return this.m_fCompleted || this.m_fCancelled || this.m_state == State.TERMINATING;
    }

    public Result<R> getLastResult() {
        return this.m_lastResult;
    }

    public int getResultVersion() {
        return this.m_nResultVersion;
    }

    public void setResult(String sExecutorId, Result<T> result) {
        if (this.m_collector == null) {
            ++this.m_nResultVersion;
            this.m_lastResult = result;
        } else {
            this.m_listResults.add(result);
        }
        ++this.m_lCurrentResultGeneration;
    }

    public void setExecutionPlan(ExecutionPlan executionPlan) {
        this.m_executionPlan = executionPlan;
    }

    public int getPartitionId() {
        return this.m_nPartitionId;
    }

    public void setPartitionId(int nPartitionId) {
        this.m_nPartitionId = nPartitionId;
    }

    public long getTaskSequence() {
        return this.m_lTaskSequence;
    }

    public void setTaskSequence(long sequence) {
        this.m_lTaskSequence = sequence;
    }

    public void readExternal(PofReader in) throws IOException {
        this.m_sTaskId = in.readString(0);
        this.m_task = (Task)in.readObject(1);
        this.m_executionStrategy = (ExecutionStrategy)in.readObject(2);
        this.m_collector = (Task.Collector)in.readObject(3);
        this.m_completionPredicate = (Remote.Predicate)in.readObject(4);
        this.m_completionRunnable = (Task.CompletionRunnable)in.readObject(5);
        this.m_fRunCompletionRunnable = this.m_completionRunnable != null;
        long retainSeconds = in.readLong(6);
        this.m_retainDuration = retainSeconds == -1L ? null : Duration.ofSeconds(retainSeconds);
        this.m_debugging = (Debugging)in.readObject(7);
        this.m_lastResult = (Result)in.readObject(8);
        this.m_nResultVersion = in.readInt(9);
        this.m_executionPlan = (ExecutionPlan)in.readObject(10);
        this.m_cPendingExecutionStrategyUpdateCount = in.readInt(11);
        this.m_cPendingExecutionPlanOptimizationCount = in.readInt(12);
        if (this.m_collector != null) {
            this.m_listResults = new ArrayList<Result<T>>();
            this.m_listResults = (List)in.readCollection(13, this.m_listResults);
        }
        this.m_lCurrentResultGeneration = in.readLong(14);
        this.m_lProcessedResultGeneration = in.readLong(15);
        this.m_fCancelled = in.readBoolean(16);
        this.m_fCompleted = in.readBoolean(17);
        this.m_state = (State)((Object)in.readObject(18));
    }

    public void writeExternal(PofWriter out) throws IOException {
        out.writeString(0, this.m_sTaskId);
        out.writeObject(1, this.m_task);
        out.writeObject(2, (Object)this.m_executionStrategy);
        out.writeObject(3, this.m_collector);
        out.writeObject(4, this.m_completionPredicate);
        out.writeObject(5, this.m_completionRunnable);
        out.writeLong(6, this.m_retainDuration == null ? -1L : this.m_retainDuration.getSeconds());
        out.writeObject(7, (Object)this.m_debugging);
        out.writeObject(8, this.m_lastResult);
        out.writeInt(9, this.m_nResultVersion);
        out.writeObject(10, (Object)this.m_executionPlan);
        out.writeInt(11, this.m_cPendingExecutionStrategyUpdateCount);
        out.writeInt(12, this.m_cPendingExecutionPlanOptimizationCount);
        out.writeCollection(13, this.m_listResults);
        out.writeLong(14, this.m_lCurrentResultGeneration);
        out.writeLong(15, this.m_lProcessedResultGeneration);
        out.writeBoolean(16, this.m_fCancelled);
        out.writeBoolean(17, this.m_fCompleted);
        out.writeObject(18, (Object)this.m_state);
    }

    protected boolean asyncEvaluateResult(Result<R> originalResult) {
        Debugging debug = this.m_debugging.getLogLevel() < 7 ? new Debugging() : this.m_debugging;
        boolean resultChanged = false;
        if (this.m_fCompleted) {
            ExecutorTrace.log(() -> String.format("Skipping result collection for Task [%s] as it is completed", this.m_sTaskId), debug);
        } else if (this.m_collector == null) {
            if (this.m_lastResult.isThrowable()) {
                this.m_fCompleted = true;
            } else {
                resultChanged = true;
            }
            if (!this.m_fCompleted && this.m_executionPlan.isSatisfied()) {
                boolean fCompleted = true;
                Iterator<String> iter = this.m_executionPlan.getIds();
                while (iter.hasNext() && fCompleted) {
                    String executorId = iter.next();
                    if (this.m_executionPlan.getAction(executorId) == ExecutionPlan.Action.COMPLETED) continue;
                    fCompleted = false;
                }
                this.m_fCompleted = fCompleted;
                if (this.m_fCompleted) {
                    ExecutorTrace.log(() -> String.format("Task [%s] has completed on all assigned Executors", this.m_sTaskId), debug);
                }
            }
        } else {
            ExecutorTrace.log(() -> String.format("Collecting result for Task [%s] using collector [%s]", this.m_sTaskId, this.m_collector), debug);
            A container = this.m_collector.supplier().get();
            Remote.Predicate<A> finishable = this.m_collector.finishable();
            BiConsumer<A, A> accumulator = this.m_collector.accumulator();
            try {
                Iterator<Result<T>> iterator = this.m_listResults.iterator();
                while (iterator.hasNext() && !finishable.test(container)) {
                    Result<T> result = iterator.next();
                    if (!result.isPresent()) continue;
                    accumulator.accept(container, result.get());
                }
                Result<R> result = Result.of(this.m_collector.finisher().apply(container));
                ExecutorTrace.log(() -> String.format("Collected result [%s] for Task [%s]", result, this.m_sTaskId), debug);
                if (this.m_executionPlan.isSatisfied() && this.m_completionPredicate.test(result.get()) && !this.m_fCompleted) {
                    this.m_fCompleted = true;
                    this.m_lastResult = result;
                    resultChanged = true;
                }
                if (originalResult == null || !originalResult.equals(result)) {
                    this.m_lastResult = result;
                    resultChanged = true;
                } else {
                    ExecutorTrace.log(() -> String.format("Collected result [%s] for Task [%s] hasn't changed (no changes to publish)", result, this.m_sTaskId), debug);
                }
            }
            catch (Throwable t) {
                this.m_fCompleted = true;
                this.m_lastResult = Result.throwable(t);
                ExecutorTrace.log(() -> String.format("Task [%s] failed due to [%s]", this.m_sTaskId, t), debug);
            }
        }
        return resultChanged;
    }

    protected boolean asyncEvaluateExecutionStrategy(CacheService service, EnumSet<ExecutionStrategy.EvaluationRationale> rationales) {
        boolean fExecutionPlanUpdated;
        Debugging debug = this.m_debugging.getLogLevel() < 7 ? new Debugging() : this.m_debugging;
        ExecutorTrace.log(() -> String.format("Evaluating the Execution Plan for Task [%s] due to [%s]", this.m_sTaskId, rationales), debug);
        try {
            NamedCache executorInfoCache = service.ensureCache("executor-executors", null);
            HashMap<String, TaskExecutorService.ExecutorInfo> executorInfoMap = new HashMap<String, TaskExecutorService.ExecutorInfo>();
            FilteringIterable iterable = new FilteringIterable(executorInfoCache.values(), (Remote.Predicate & Serializable)executorInfo -> executorInfo.getState() == TaskExecutorService.ExecutorInfo.State.JOINING || executorInfo.getState() == TaskExecutorService.ExecutorInfo.State.RUNNING);
            for (TaskExecutorService.ExecutorInfo info : iterable) {
                executorInfoMap.put(info.getId(), info);
            }
            ExecutionPlan executionPlan = this.m_executionStrategy.analyze(this.m_executionPlan, executorInfoMap, rationales);
            boolean bl = fExecutionPlanUpdated = executionPlan == null && this.m_executionPlan != null || executionPlan != null && this.m_executionPlan == null || executionPlan != null && !executionPlan.equals(this.m_executionPlan);
            if (fExecutionPlanUpdated) {
                this.m_executionPlan = executionPlan;
                ExecutorTrace.log(() -> String.format("Execution Plan for Task [%s] has changed.  Will be updated.", this.m_sTaskId), debug);
                ++this.m_cPendingExecutionPlanOptimizationCount;
            } else {
                ExecutorTrace.log(() -> String.format("Execution Plan for Task [%s] was not changed.  Will not be updated.", this.m_sTaskId), debug);
            }
        }
        catch (Exception e) {
            throw Base.ensureRuntimeException((Throwable)e);
        }
        this.m_cPendingExecutionStrategyUpdateCount = 0;
        return fExecutionPlanUpdated;
    }

    protected void cleanup(CacheService service, String sKey) {
        String sTaskId = this.getTaskId();
        Duration retain = this.m_retainDuration;
        ClusteredAssignment.removeAssignments(sTaskId, service);
        if (retain == null) {
            service.ensureCache(CACHE_NAME, null).remove((Object)sKey);
        } else if (retain != Duration.ZERO) {
            long cRetainMillis = this.m_retainDuration.toMillis();
            ClusteredTaskManager value = this;
            this.m_retainDuration = Duration.ZERO;
            service.ensureCache(CACHE_NAME, null).put((Object)sKey, (Object)value, cRetainMillis);
        }
        this.cleanProperties(service);
    }

    protected void cleanProperties(CacheService service) {
        String sTaskId = this.getTaskId();
        KeyAssociatedFilter filterAsc = new KeyAssociatedFilter((Filter)new EqualsFilter("getTaskId", (Object)sTaskId), (Object)sTaskId);
        service.ensureCache(ClusteredProperties.CACHE_NAME, null).invokeAll((Filter)filterAsc, (InvocableMap.EntryProcessor)new ConditionalRemove((Filter)PresentFilter.INSTANCE, false));
    }

    @Override
    public ComposableContinuation onInserted(CacheService service, InvocableMap.Entry entry, Cause cause) {
        return this.onProcess(service, entry, cause);
    }

    @Override
    public ComposableContinuation onUpdated(CacheService service, InvocableMap.Entry entry, Cause cause) {
        return this.onProcess(service, entry, cause);
    }

    @Override
    public ComposableContinuation onDeleted(CacheService service, InvocableMap.Entry entry, Cause cause) {
        return null;
    }

    public static enum State {
        PENDING,
        ORCHESTRATED,
        TERMINATING;

    }

    public static class UpdateExecutionPlanProcessor
    extends PortableAbstractProcessor {
        protected ExecutionPlan m_executionPlan;
        protected int m_cPendingExecutionStrategyUpdateCount;

        public UpdateExecutionPlanProcessor() {
        }

        public UpdateExecutionPlanProcessor(ExecutionPlan executionPlan, int cPendingExecutionStrategyUpdateCount) {
            this.m_executionPlan = executionPlan;
            this.m_cPendingExecutionStrategyUpdateCount = cPendingExecutionStrategyUpdateCount;
        }

        public Object process(InvocableMap.Entry entry) {
            if (entry.isPresent()) {
                Debugging debug;
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                Debugging debugging = debug = manager.m_debugging.getLogLevel() < 7 ? new Debugging() : manager.m_debugging;
                if (manager.isCompleted()) {
                    ExecutorTrace.log(() -> String.format("Skipping Execution Plan Update for Task [%s] as it is completed", manager.getTaskId()), debug);
                } else {
                    ExecutorTrace.log(() -> String.format("Updating Execution Plan for Task [%s]", manager.getTaskId()), debug);
                    manager.m_executionPlan = this.m_executionPlan;
                    manager.m_cPendingExecutionStrategyUpdateCount = Math.max(0, manager.m_cPendingExecutionStrategyUpdateCount - this.m_cPendingExecutionStrategyUpdateCount);
                    ++manager.m_cPendingExecutionPlanOptimizationCount;
                    entry.setValue((Object)manager);
                }
            }
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_executionPlan = (ExecutionPlan)in.readObject(0);
            this.m_cPendingExecutionStrategyUpdateCount = in.readInt(1);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeObject(0, (Object)this.m_executionPlan);
            out.writeInt(1, this.m_cPendingExecutionStrategyUpdateCount);
        }
    }

    public static class UpdateContributedResultProcessor
    extends PortableAbstractProcessor {
        protected String m_sExecutorId;
        protected Result m_result;

        public UpdateContributedResultProcessor() {
        }

        public UpdateContributedResultProcessor(String sExecutorId, Result result) {
            this.m_sExecutorId = sExecutorId;
            this.m_result = result;
        }

        public Object process(InvocableMap.Entry entry) {
            if (entry.isPresent()) {
                Debugging debug;
                ClusteredTaskManager taskManager = (ClusteredTaskManager)entry.getValue();
                Debugging debugging = debug = taskManager.m_debugging.getLogLevel() < 7 ? new Debugging() : taskManager.m_debugging;
                if (taskManager.isOwner(this.m_sExecutorId)) {
                    taskManager.setResult(this.m_sExecutorId, this.m_result);
                    entry.setValue((Object)taskManager);
                    ExecutorTrace.log(() -> String.format("Result[%s] contributed for Task [%s] by Executor [%s]: %s", taskManager.m_lCurrentResultGeneration, taskManager.getTaskId(), this.m_sExecutorId, this.m_result), debug);
                    return true;
                }
                if (Logger.isEnabled((int)debug.getLogLevel())) {
                    String sMsg = "Ignoring result contributed for Task [%s] as the Task is no longer assigned to Executor [%s]: %s";
                    ExecutorTrace.log(String.format(sMsg, taskManager.getTaskId(), this.m_sExecutorId, this.m_result), debug);
                }
                return false;
            }
            if (Logger.isEnabled((int)5)) {
                Logger.fine((String)String.format("Ignoring result contributed for Task [%s] as the Task is no longer present. Executor [%s]: %s", entry.getKey(), this.m_sExecutorId, this.m_result));
            }
            return false;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_sExecutorId = in.readString(0);
            this.m_result = (Result)in.readObject(1);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeString(0, this.m_sExecutorId);
            out.writeObject(1, (Object)this.m_result);
        }
    }

    public static class UpdateCollectedResultProcessor<T>
    extends PortableAbstractProcessor {
        protected Result<T> m_newResult;
        protected long m_lProcessedResultMapGeneration;
        protected boolean m_fCompleted;

        public UpdateCollectedResultProcessor() {
        }

        public UpdateCollectedResultProcessor(Result<T> newResult, long lProcessedResultMapGeneration, boolean fCompleted) {
            this.m_lProcessedResultMapGeneration = lProcessedResultMapGeneration;
            this.m_newResult = newResult;
            this.m_fCompleted = fCompleted;
        }

        public Object process(InvocableMap.Entry entry) {
            if (entry.isPresent()) {
                Debugging debug;
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                Debugging debugging = debug = manager.m_debugging.getLogLevel() < 7 ? new Debugging() : manager.m_debugging;
                if (manager.m_collector != null) {
                    if (manager.m_lastResult == null) {
                        ExecutorTrace.log(() -> String.format("Task [%s] has a newly collected result [%s]", manager.getTaskId(), this.m_newResult), debug);
                    } else {
                        ExecutorTrace.log(() -> String.format("Task [%s] collected result will be updated from [%s] to [%s]", manager.getTaskId(), manager.getLastResult(), this.m_newResult), debug);
                    }
                    if (manager.m_lastResult == null && this.m_newResult != null || !manager.m_lastResult.equals(this.m_newResult)) {
                        manager.m_lastResult = this.m_newResult;
                        ++manager.m_nResultVersion;
                    } else {
                        ExecutorTrace.log(() -> String.format("Task [%s] result [%s] has not changed.  No update will be performed", manager.getTaskId(), manager.getLastResult()), debug);
                    }
                }
                manager.m_lProcessedResultGeneration = this.m_lProcessedResultMapGeneration;
                if (!manager.m_fCompleted && this.m_fCompleted) {
                    manager.m_fCompleted = this.m_fCompleted;
                    manager.m_state = State.TERMINATING;
                }
                ExecutorTrace.log(() -> String.format("Task [%s] (completed=[%s], cancelled=[%s], state=[%s], resultVersion[%s]", new Object[]{manager.getTaskId(), manager.isCompleted(), manager.isCancelled(), manager.m_state, manager.m_nResultVersion}), debug);
                entry.setValue((Object)manager);
            } else {
                Logger.fine(() -> String.format("Ignoring request to update Task [%s] as it is no longer present", entry.getKey()));
            }
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_newResult = (Result)in.readObject(0);
            this.m_lProcessedResultMapGeneration = in.readLong(1);
            this.m_fCompleted = in.readBoolean(2);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeObject(0, this.m_newResult);
            out.writeLong(1, this.m_lProcessedResultMapGeneration);
            out.writeBoolean(2, this.m_fCompleted);
        }
    }

    public static class TerminateProcessor
    extends PortableAbstractProcessor {
        private boolean m_fCancelled;

        public TerminateProcessor() {
        }

        public TerminateProcessor(boolean fCancelled) {
            this.m_fCancelled = fCancelled;
        }

        public Object process(InvocableMap.Entry entry) {
            if (entry.isPresent()) {
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                if (manager.m_state == State.ORCHESTRATED) {
                    manager.m_fCancelled = this.m_fCancelled;
                    manager.m_state = State.TERMINATING;
                    entry.setValue((Object)manager);
                    return true;
                }
                return false;
            }
            return false;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_fCancelled = in.readBoolean(0);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeBoolean(0, this.m_fCancelled);
        }
    }

    public static class SetActionProcessor
    extends PortableAbstractProcessor {
        protected String m_sExecutorId;
        protected EnumSet<ExecutionPlan.Action> m_previous;
        protected ExecutionPlan.Action m_desired;

        public SetActionProcessor() {
        }

        public SetActionProcessor(String sExecutorId, ExecutionPlan.Action desired) {
            this.m_sExecutorId = sExecutorId;
            this.m_previous = null;
            this.m_desired = desired;
        }

        public SetActionProcessor(String sExecutorId, EnumSet<ExecutionPlan.Action> previous, ExecutionPlan.Action desired) {
            this.m_sExecutorId = sExecutorId;
            this.m_previous = previous;
            this.m_desired = desired;
        }

        public SetActionProcessor(String sExecutorId, ExecutionPlan.Action previous, ExecutionPlan.Action desired) {
            this.m_sExecutorId = sExecutorId;
            this.m_previous = EnumSet.of(previous);
            this.m_desired = desired;
        }

        public Object process(InvocableMap.Entry entry) {
            boolean result = false;
            if (entry.isPresent()) {
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                ExecutionPlan.Action existing = manager.m_executionPlan.getAction(this.m_sExecutorId);
                if (existing != null && this.m_previous != null && this.m_previous.contains((Object)existing) || this.m_previous == null || this.m_previous.isEmpty()) {
                    Debugging debug = manager.m_debugging.getLogLevel() < 7 ? new Debugging() : manager.m_debugging;
                    ExecutorTrace.log(() -> String.format("Changing Executor [%s] action from [%s] to [%s]", new Object[]{this.m_sExecutorId, existing, this.m_desired}), debug);
                    boolean isStateSet = manager.m_executionPlan.setAction(this.m_sExecutorId, this.m_desired);
                    if (isStateSet) {
                        entry.setValue((Object)manager);
                    }
                    result = isStateSet;
                }
            }
            return result;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_sExecutorId = in.readString(0);
            ExecutionPlan.Action action = (ExecutionPlan.Action)((Object)in.readObject(1));
            if (action != null) {
                this.m_previous = EnumSet.of(action);
            }
            this.m_desired = (ExecutionPlan.Action)((Object)in.readObject(2));
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeString(0, this.m_sExecutorId);
            out.writeObject(1, this.m_previous == null ? null : this.m_previous.iterator().next());
            out.writeObject(2, (Object)this.m_desired);
        }
    }

    public static class OptimizeExecutionPlanProcessor
    extends PortableAbstractProcessor {
        protected ExecutionPlan m_executionPlan;
        protected int m_cPendingExecutionPlanOptimizationCount;

        public OptimizeExecutionPlanProcessor() {
        }

        public OptimizeExecutionPlanProcessor(ExecutionPlan executionPlan, int cPendingExecutionPlanOptimizationCount) {
            this.m_executionPlan = executionPlan;
            this.m_cPendingExecutionPlanOptimizationCount = cPendingExecutionPlanOptimizationCount;
        }

        public Object process(InvocableMap.Entry entry) {
            if (entry.isPresent()) {
                Debugging debug;
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                Debugging debugging = debug = manager.m_debugging.getLogLevel() < 7 ? new Debugging() : manager.m_debugging;
                if (manager.isCompleted()) {
                    ExecutorTrace.log(() -> String.format("Skipping Execution Plan Optimization for Task [%s] as the Task is completed", manager.getTaskId()), debug);
                } else {
                    ExecutionPlan currentPlan = manager.m_executionPlan;
                    if (currentPlan != null) {
                        Iterator<String> iter = this.m_executionPlan.getIds();
                        while (iter.hasNext()) {
                            String executorId = iter.next();
                            ExecutionPlan.Action currentAction = currentPlan.getAction(executorId);
                            if (currentAction != ExecutionPlan.Action.COMPLETED) continue;
                            this.m_executionPlan.setAction(executorId, ExecutionPlan.Action.COMPLETED);
                        }
                    }
                    ExecutorTrace.log(() -> String.format("Optimized Execution Plan for Task [%s].  Now [%s]", manager.m_sTaskId, this.m_executionPlan), debug);
                    manager.m_executionPlan = this.m_executionPlan;
                    manager.m_cPendingExecutionPlanOptimizationCount = Math.max(0, manager.m_cPendingExecutionPlanOptimizationCount - this.m_cPendingExecutionPlanOptimizationCount);
                    entry.setValue((Object)manager);
                }
            }
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_executionPlan = (ExecutionPlan)in.readObject(0);
            this.m_cPendingExecutionPlanOptimizationCount = in.readInt(1);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeObject(0, (Object)this.m_executionPlan);
            out.writeInt(1, this.m_cPendingExecutionPlanOptimizationCount);
        }
    }

    public static class NotifyExecutionStrategyProcessor
    extends PortableAbstractProcessor {
        public Object process(InvocableMap.Entry entry) {
            if (entry.isPresent()) {
                ClusteredTaskManager manager = (ClusteredTaskManager)entry.getValue();
                if (!manager.isCompleted()) {
                    ++manager.m_cPendingExecutionStrategyUpdateCount;
                    entry.setValue((Object)manager);
                    return true;
                }
                return false;
            }
            return false;
        }
    }

    public class CleanupContinuation
    implements ComposableContinuation {
        protected final CacheService f_cacheService;
        protected final String f_sTaskId;

        public CleanupContinuation(CacheService cacheService, String sTaskId) {
            this.f_cacheService = cacheService;
            this.f_sTaskId = sTaskId;
        }

        public void proceed(Object o) {
            ClusteredTaskManager.this.cleanup(this.f_cacheService, this.f_sTaskId);
        }

        @Override
        public ComposableContinuation compose(ComposableContinuation continuation) {
            return this;
        }

        public String toString() {
            return "CleanupContinuation{taskId='" + this.f_sTaskId + "'}";
        }
    }

    public static class ChainedProcessor
    extends PortableAbstractProcessor {
        protected ArrayList<InvocableMap.EntryProcessor> m_listProcessors = new ArrayList();

        public ChainedProcessor andThen(InvocableMap.EntryProcessor processor) {
            this.m_listProcessors.add(processor);
            return this;
        }

        public static ChainedProcessor empty() {
            return new ChainedProcessor();
        }

        public boolean isEmpty() {
            return this.m_listProcessors.isEmpty();
        }

        public Object process(InvocableMap.Entry entry) {
            for (InvocableMap.EntryProcessor processor : this.m_listProcessors) {
                processor.process(entry);
            }
            return null;
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_listProcessors = (ArrayList)in.readCollection(0, this.m_listProcessors);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeCollection(0, this.m_listProcessors);
        }
    }

    public class AsyncProcessChangesContinuation
    implements ComposableContinuation {
        protected final CacheService f_cacheService;
        protected final String f_sTaskId;
        protected final Cause f_cause;

        public AsyncProcessChangesContinuation(CacheService cacheService, String sTaskId, Cause cause) {
            this.f_cacheService = cacheService;
            this.f_sTaskId = sTaskId;
            this.f_cause = cause;
        }

        public void proceed(Object o) {
            ClusteredTaskManager.this.asyncProcessChanges(this.f_cacheService, this.f_sTaskId, this.f_cause);
        }

        @Override
        public ComposableContinuation compose(ComposableContinuation continuation) {
            return continuation;
        }

        public String toString() {
            return "AsyncProcessChangesContinuation{taskId='" + this.f_sTaskId + "', cause=" + this.f_cause + "}";
        }
    }
}

