/*
 * Decompiled with CFR 0.152.
 */
package com.vmware.xenon.common;

import com.vmware.xenon.common.FactoryService;
import com.vmware.xenon.common.NodeSelectorService;
import com.vmware.xenon.common.NodeSelectorState;
import com.vmware.xenon.common.Operation;
import com.vmware.xenon.common.Service;
import com.vmware.xenon.common.ServiceDocument;
import com.vmware.xenon.common.ServiceDocumentDescription;
import com.vmware.xenon.common.ServiceDocumentQueryResult;
import com.vmware.xenon.common.ServiceErrorResponse;
import com.vmware.xenon.common.ServiceStats;
import com.vmware.xenon.common.TaskState;
import com.vmware.xenon.common.UriUtils;
import com.vmware.xenon.common.Utils;
import com.vmware.xenon.common.config.XenonConfiguration;
import com.vmware.xenon.services.common.CheckpointService;
import com.vmware.xenon.services.common.NodeGroupBroadcastResponse;
import com.vmware.xenon.services.common.QueryTask;
import com.vmware.xenon.services.common.ServiceUriPaths;
import com.vmware.xenon.services.common.TaskService;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class SynchronizationTaskService
extends TaskService<State> {
    public static final String FACTORY_LINK = "/core/synch-tasks";
    public static final String PROPERTY_NAME_SYNCHRONIZATION_LOGGING = "xenon.SynchronizationTaskService.isDetailedLoggingEnabled";
    public static final String STAT_NAME_CHILD_SYNCH_RETRY_COUNT = "childSynchRetryCount";
    public static final String STAT_NAME_SYNCH_RETRY_COUNT = "synchRetryCount";
    public static final String PROPERTY_NAME_MAX_CHILD_SYNCH_RETRY_COUNT = "xenon.SynchronizationTaskService.MAX_CHILD_SYNCH_RETRY_COUNT";
    public static final int MAX_CHILD_SYNCH_RETRY_COUNT = Integer.getInteger("xenon.SynchronizationTaskService.MAX_CHILD_SYNCH_RETRY_COUNT", 8);
    private Supplier<Service> childServiceInstantiator;
    private FactoryService parent;
    private final boolean isDetailedLoggingEnabled = Boolean.getBoolean("xenon.SynchronizationTaskService.isDetailedLoggingEnabled");
    private final boolean isCheckpointEnabled = XenonConfiguration.bool(SynchronizationTaskService.class, "isCheckpointEnabled", true);
    private final long schedulePeriodSeconds = XenonConfiguration.number(SynchronizationTaskService.class, "schedulePeriodSeconds", TimeUnit.MINUTES.toSeconds(30L));

    public static SynchronizationTaskService create(Supplier<Service> childServiceInstantiator) {
        if (childServiceInstantiator.get() == null) {
            throw new IllegalArgumentException("childServiceInstantiator created null child service");
        }
        SynchronizationTaskService taskService = new SynchronizationTaskService();
        taskService.childServiceInstantiator = childServiceInstantiator;
        return taskService;
    }

    public SynchronizationTaskService() {
        super(State.class);
        this.toggleOption(Service.ServiceOption.IDEMPOTENT_POST, true);
        this.toggleOption(Service.ServiceOption.INSTRUMENTATION, true);
    }

    @Override
    public void handleStart(Operation post) {
        State initialState = this.validateStartPost(post);
        if (initialState == null) {
            return;
        }
        this.initializeState(initialState, post);
        if (this.isDetailedLoggingEnabled) {
            this.logInfo("Creating synchronization-task for factory %s", initialState.factorySelfLink);
        }
        post.setBody(initialState).setStatusCode(202).complete();
    }

    @Override
    protected void initializeState(State initialState, Operation post) {
        Service childTemplate = this.childServiceInstantiator.get();
        initialState.taskInfo = new TaskState();
        initialState.taskInfo.stage = TaskState.TaskStage.CREATED;
        initialState.childOptions = childTemplate.getOptions();
        initialState.childDocumentIndexLink = childTemplate.getDocumentIndexPath();
        initialState.documentExpirationTimeMicros = Long.MAX_VALUE;
        initialState.checkpoint = 0L;
    }

    @Override
    protected State validateStartPost(Operation post) {
        State task = (State)super.validateStartPost(post);
        if (task == null) {
            return null;
        }
        if (this.childServiceInstantiator == null) {
            post.fail(new IllegalArgumentException("childServiceInstantiator must be set."));
            return null;
        }
        if (task.factorySelfLink == null) {
            post.fail(new IllegalArgumentException("factorySelfLink must be set."));
            return null;
        }
        if (task.factoryStateKind == null) {
            post.fail(new IllegalArgumentException("factoryStateKind must be set."));
            return null;
        }
        if (task.nodeSelectorLink == null) {
            post.fail(new IllegalArgumentException("nodeSelectorLink must be set."));
            return null;
        }
        if (task.queryResultLimit <= 0) {
            post.fail(new IllegalArgumentException("queryResultLimit must be set."));
            return null;
        }
        if (task.taskInfo != null && task.taskInfo.stage != TaskState.TaskStage.CREATED) {
            post.fail(new IllegalArgumentException("taskInfo.stage must be set to CREATED."));
            return null;
        }
        if (task.childOptions != null) {
            post.fail(new IllegalArgumentException("childOptions must not be set."));
            return null;
        }
        if (task.membershipUpdateTimeMicros != null) {
            post.fail(new IllegalArgumentException("membershipUpdateTimeMicros must not be set."));
            return null;
        }
        if (task.subStage != null) {
            post.fail(new IllegalArgumentException("subStage must not be set."));
            return null;
        }
        if (task.queryPageReference != null) {
            post.fail(new IllegalArgumentException("queryPageReference must not be set."));
            return null;
        }
        return task;
    }

    @Override
    public void handlePut(Operation put) {
        if (!put.hasPragmaDirective("xn-post-to-put")) {
            put.fail(new IllegalStateException("PUT not supported for SynchronizationTaskService"));
            return;
        }
        State task = (State)this.getState(put);
        TaskState.TaskStage currentStage = task.taskInfo.stage;
        SubStage currentSubStage = task.subStage;
        State body = this.validatePutRequest(task, put);
        if (body == null) {
            return;
        }
        boolean startStateMachine = false;
        switch (task.taskInfo.stage) {
            case CREATED: {
                startStateMachine = true;
                break;
            }
            case STARTED: {
                this.logInfo("Restarting SynchronizationTask", new Object[0]);
                task.subStage = SubStage.RESTART;
                break;
            }
            case FAILED: 
            case CANCELLED: 
            case FINISHED: {
                startStateMachine = true;
                break;
            }
        }
        task.membershipUpdateTimeMicros = body.membershipUpdateTimeMicros;
        task.queryResultLimit = body.queryResultLimit;
        if (startStateMachine) {
            task.taskInfo.stage = TaskState.TaskStage.STARTED;
            if (this.parent != null && this.parent.hasChildOption(Service.ServiceOption.PERSISTENCE) && this.checkpointEnabled(task)) {
                task.subStage = SubStage.GET_CHECKPOINTS;
            } else {
                task.subStage = SubStage.QUERY;
                task.checkpoint = 0L;
            }
        }
        if (this.isDetailedLoggingEnabled) {
            this.logInfo("Transitioning task from %s-%s to %s-%s. Time %d", new Object[]{currentStage, currentSubStage, task.taskInfo.stage, task.subStage, task.membershipUpdateTimeMicros});
        }
        if (startStateMachine) {
            task.startTimeMicros = Utils.getNowMicrosUtc();
            task.synchCompletionCount = 0;
            this.setStat(STAT_NAME_CHILD_SYNCH_RETRY_COUNT, 0.0);
            this.setStat("childSynchFailureCount", 0.0);
            this.setFactoryAvailability(task, false, o -> this.handleSubStage(task), put);
        } else {
            put.complete();
        }
    }

    public State validatePutRequest(State currentTask, Operation put) {
        State putTask = (State)this.getBody(put);
        if (putTask == null) {
            put.fail(new IllegalArgumentException("Request contains empty body"));
            return null;
        }
        if (putTask.queryResultLimit <= 0) {
            put.fail(new IllegalArgumentException("queryResultLimit must be set."));
            return null;
        }
        boolean isMembershipTimeSet = putTask.membershipUpdateTimeMicros != null;
        boolean hasReplicationOption = currentTask.childOptions.contains((Object)Service.ServiceOption.REPLICATION);
        if (!isMembershipTimeSet && hasReplicationOption || isMembershipTimeSet && !hasReplicationOption) {
            put.fail(new IllegalArgumentException("membershipUpdateTimeMicros not set correctly: " + putTask.membershipUpdateTimeMicros));
            return null;
        }
        if (currentTask.membershipUpdateTimeMicros != null && currentTask.membershipUpdateTimeMicros > putTask.membershipUpdateTimeMicros) {
            String msg = String.format("Passed membershipUpdateTimeMicros is outdated. Passed %d, Current %d", putTask.membershipUpdateTimeMicros, currentTask.membershipUpdateTimeMicros);
            IllegalArgumentException e = new IllegalArgumentException(msg);
            ServiceErrorResponse rsp = Utils.toServiceErrorResponse(e);
            rsp.setInternalErrorCode(-2147483647);
            if (TaskState.isFinished(currentTask.taskInfo)) {
                this.setFactoryAvailability(currentTask, true, o -> put.fail(400, e, rsp), null);
            } else {
                put.fail(400, e, rsp);
            }
            return null;
        }
        return putTask;
    }

    @Override
    protected boolean validateTransition(Operation patch, State currentTask, State patchBody) {
        boolean validTransition = super.validateTransition(patch, currentTask, patchBody);
        if (!validTransition) {
            return false;
        }
        if (!TaskState.isInProgress(currentTask.taskInfo) && !TaskState.isInProgress(patchBody.taskInfo)) {
            patch.fail(new IllegalArgumentException("Task stage cannot transitioned to same stopped state"));
            return false;
        }
        return true;
    }

    @Override
    public void handlePatch(Operation patch) {
        State body;
        State task = (State)this.getState(patch);
        if (!this.validateTransition(patch, task, body = (State)this.getBody(patch))) {
            return;
        }
        TaskState.TaskStage currentStage = task.taskInfo.stage;
        SubStage currentSubStage = task.subStage;
        if (task.subStage == SubStage.RESTART) {
            task.taskInfo.stage = TaskState.TaskStage.STARTED;
            if (this.parent != null && this.parent.hasChildOption(Service.ServiceOption.PERSISTENCE) && this.checkpointEnabled(task)) {
                task.subStage = SubStage.GET_CHECKPOINTS;
            } else {
                task.subStage = SubStage.QUERY;
                task.checkpoint = 0L;
            }
            task.synchCompletionCount = 0;
            this.setStat(STAT_NAME_CHILD_SYNCH_RETRY_COUNT, 0.0);
            this.setStat("childSynchFailureCount", 0.0);
        } else {
            this.updateState(task, body);
        }
        this.logInfo("Transitioning task from %s-%s to %s-%s, Services synchronized: %d", new Object[]{currentStage, currentSubStage, task.taskInfo.stage, task.subStage, task.synchCompletionCount});
        boolean isTaskFinished = TaskState.isFinished(task.taskInfo);
        if (isTaskFinished) {
            this.setFactoryAvailability(task, true, null, patch);
        } else {
            patch.complete();
        }
        switch (task.taskInfo.stage) {
            case STARTED: {
                this.handleSubStage(task);
                break;
            }
            case CANCELLED: {
                this.logInfo("Task canceled: not implemented, ignoring", new Object[0]);
                break;
            }
            case FINISHED: {
                break;
            }
            case FAILED: {
                this.logWarning("Task failed: %s", task.failureMessage != null ? task.failureMessage : "No reason given");
                break;
            }
        }
    }

    public void handleSubStage(State task) {
        switch (task.subStage) {
            case GET_CHECKPOINTS: {
                this.handleCheckpointStage(task);
                break;
            }
            case QUERY: {
                this.handleQueryStage(task);
                break;
            }
            case SYNCHRONIZE: {
                this.handleSynchronizeStage(task, true);
                break;
            }
            case CHECK_NG_AVAILABILITY: {
                this.handleCheckNodeGroupAvailabilityStage(task);
                break;
            }
            default: {
                this.logWarning("Unexpected sub stage: %s", new Object[]{task.subStage});
            }
        }
    }

    private void handleCheckpointStage(State task) {
        String checkPointServiceLink = UriUtils.buildUriPath("/core/checkpoints", UriUtils.convertPathCharsFromLink(this.parent.getSelfLink()));
        Operation get = Operation.createGet(UriUtils.buildUri(this.getHost(), checkPointServiceLink)).setReferer(this.getUri()).setCompletion((o, e) -> {
            if (e != null) {
                this.logInfo("broadcast get checkpoints failed %s, starting synchronization from timestamp 0", e.toString());
                task.checkpoint = 0L;
                this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.QUERY));
                return;
            }
            NodeGroupBroadcastResponse rsp = o.getBody(NodeGroupBroadcastResponse.class);
            if (!rsp.failures.isEmpty()) {
                for (Map.Entry<URI, ServiceErrorResponse> failure : rsp.failures.entrySet()) {
                    if (failure.getValue().statusCode == 404) continue;
                    this.logInfo("get checkpoint failed with status %d from %s", failure.getValue().errorCode, failure.getKey());
                }
                this.logInfo("starting synchronization from timestamp 0", new Object[0]);
                task.checkpoint = 0L;
                this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.QUERY));
                return;
            }
            List<Long> checkPoints = rsp.jsonResponses.values().stream().map(s -> {
                CheckpointService.CheckpointState checkpointState = Utils.fromJson(s, CheckpointService.CheckpointState.class);
                return checkpointState.timestamp;
            }).collect(Collectors.toList());
            task.checkpoint = this.findMinimumCheckpoint(checkPoints);
            task.checkpointNodes = new HashSet<String>(rsp.selectedNodes.keySet());
            if (task.checkpoint > 0L) {
                this.logInfo("synch %s from check point %d", task.factorySelfLink, task.checkpoint);
            }
            this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.QUERY));
        });
        this.getHost().broadcastRequest(task.nodeSelectorLink, checkPointServiceLink, false, get);
    }

    private long findMinimumCheckpoint(List<Long> checkpoints) {
        long minimumCheckpoint = Long.MAX_VALUE;
        for (Long checkpoint : checkpoints) {
            minimumCheckpoint = Long.min(minimumCheckpoint, checkpoint);
        }
        return minimumCheckpoint;
    }

    private void handleQueryStage(State task) {
        QueryTask queryTask = this.buildChildQueryTask(task);
        Operation queryPost = Operation.createPost(this, ServiceUriPaths.CORE_LOCAL_QUERY_TASKS).setBody(queryTask).setConnectionSharing(true).setCompletion((o, e) -> {
            if (this.getHost().isStopping()) {
                this.sendSelfCancellationPatch(task, "host is stopping");
                return;
            }
            if (e != null) {
                if (!this.getHost().isStopping()) {
                    this.logWarning("Query failed with %s", e.toString());
                }
                this.sendSelfFailurePatch(task, e.getMessage());
                return;
            }
            ServiceDocumentQueryResult rsp = o.getBody(QueryTask.class).results;
            if (rsp == null || rsp.nextPageLink == null) {
                this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.CHECK_NG_AVAILABILITY));
                return;
            }
            URI queryTaskUri = UriUtils.buildUri(this.getHost(), ServiceUriPaths.CORE_LOCAL_QUERY_TASKS);
            task.queryPageReference = UriUtils.buildUri(queryTaskUri, rsp.nextPageLink);
            this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.SYNCHRONIZE));
        });
        this.sendRequest(queryPost);
    }

    private QueryTask buildChildQueryTask(State task) {
        QueryTask queryTask = new QueryTask();
        queryTask.querySpec = new QueryTask.QuerySpecification();
        queryTask.indexLink = task.childDocumentIndexLink;
        queryTask.taskInfo.isDirect = true;
        QueryTask.Query uriPrefixClause = new QueryTask.Query().setTermPropertyName("documentSelfLink").setTermMatchType(QueryTask.QueryTerm.MatchType.WILDCARD).setTermMatchValue(task.factorySelfLink + "/" + "*");
        queryTask.querySpec.query.addBooleanClause(uriPrefixClause);
        QueryTask.Query kindClause = new QueryTask.Query().setTermPropertyName("documentKind").setTermMatchValue(task.factoryStateKind);
        queryTask.querySpec.query.addBooleanClause(kindClause);
        if (this.parent != null && this.parent.hasChildOption(Service.ServiceOption.PERSISTENCE) && this.checkpointEnabled(task)) {
            if (this.isDetailedLoggingEnabled) {
                this.logInfo("query %s from checkpoint %d", task.factorySelfLink, task.checkpoint);
            }
            QueryTask.NumericRange<Long> timeRange = QueryTask.NumericRange.createLongRange(task.checkpoint, Long.MAX_VALUE, false, true);
            QueryTask.Query timeClause = new QueryTask.Query().setTermPropertyName("documentUpdateTimeMicros").setNumericRange(timeRange);
            queryTask.querySpec.query.addBooleanClause(timeClause);
        }
        long timeoutMicros = TimeUnit.SECONDS.toMicros(this.getHost().getPeerSynchronizationTimeLimitSeconds());
        timeoutMicros = Math.max(timeoutMicros, this.getHost().getOperationTimeoutMicros());
        queryTask.documentExpirationTimeMicros = Utils.fromNowMicrosUtc(timeoutMicros);
        queryTask.querySpec.options = EnumSet.of(QueryTask.QuerySpecification.QueryOption.BROADCAST, QueryTask.QuerySpecification.QueryOption.FORWARD_ONLY);
        queryTask.nodeSelectorLink = task.nodeSelectorLink;
        queryTask.querySpec.resultLimit = task.queryResultLimit;
        return queryTask;
    }

    private void handleSynchronizeStage(State task, boolean verifyOwnership) {
        if (task.queryPageReference == null) {
            this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.CHECK_NG_AVAILABILITY));
            return;
        }
        if (this.getHost().isStopping()) {
            this.sendSelfCancellationPatch(task, "host is stopping");
            return;
        }
        if (verifyOwnership && task.childOptions.contains((Object)Service.ServiceOption.REPLICATION)) {
            Consumer<NodeSelectorService.SelectOwnerResponse> ownerHandler = selectOwnerResponse -> this.handleSynchronizeStage(task, false);
            Consumer<NodeSelectorService.SelectOwnerResponse> nonOwnerHandler = selectOwnerResponse -> {
                this.logWarning("Current node %s is no longer owner for the factory %s. Cancelling synchronization", this.getHost().getId(), task.factorySelfLink);
                this.sendSelfCancellationPatch(task, "Local node is no longer owner for this factory.");
            };
            Consumer<Throwable> failureHandler = e -> this.sendSelfFailurePatch(task, e.getMessage());
            this.verifySynchronizationOwnership(task, ownerHandler, nonOwnerHandler, failureHandler);
            return;
        }
        Operation.CompletionHandler c = (o, e) -> {
            if (e != null) {
                if (!this.getHost().isStopping()) {
                    this.logWarning("Failure retrieving query results from %s: %s", task.queryPageReference, e.toString());
                }
                this.sendSelfFailurePatch(task, "failure retrieving query page results");
                return;
            }
            ServiceDocumentQueryResult rsp = o.getBody(QueryTask.class).results;
            Operation.createDelete(task.queryPageReference).setConnectionTag("xn-cnx-tag-synch").setCompletion((op, ex) -> {
                if (ex != null) {
                    this.logWarning("Failed to delete query result page %s: %s", rsp.documentSelfLink, Utils.toString(ex));
                }
            }).sendWith(this);
            if (rsp.documentCount == 0L || rsp.documentLinks.isEmpty()) {
                this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.CHECK_NG_AVAILABILITY));
                return;
            }
            ArrayList<String> list = new ArrayList<String>(rsp.documentLinks);
            this.synchronizeChildrenInQueryPage(task, rsp, list, 0, list.size());
        };
        this.sendRequest(Operation.createGet(task.queryPageReference).setConnectionSharing(true).setConnectionTag("xn-cnx-tag-synch").setRetryCount(3).setCompletion(c));
    }

    private void synchronizeChildrenInQueryPage(State task, ServiceDocumentQueryResult rsp, List<String> documentLinks, int retryCount, int totalServiceCount) {
        if (this.getHost().isStopping()) {
            this.sendSelfCancellationPatch(task, "host is stopping");
            return;
        }
        ArrayList failedServices = new ArrayList();
        AtomicInteger pendingStarts = new AtomicInteger(documentLinks.size());
        Operation.CompletionHandler c = (o, e) -> {
            if (e != null && !this.getHost().isStopping()) {
                this.logWarning("Synchronization failed for service %s with status code %d, message %s", o.getUri().getPath(), o.getStatusCode(), e.getMessage());
                if (o.getStatusCode() >= 500 || o.getStatusCode() == 408) {
                    SynchronizationTaskService synchronizationTaskService = this;
                    synchronized (synchronizationTaskService) {
                        failedServices.add(o.getUri().getPath());
                    }
                }
            }
            int r = pendingStarts.decrementAndGet();
            if (this.getHost().isStopping()) {
                this.sendSelfCancellationPatch(task, "host is stopping");
                return;
            }
            if (r != 0) {
                return;
            }
            if (!failedServices.isEmpty()) {
                if (failedServices.size() <= task.queryResultLimit / 2) {
                    if (retryCount < MAX_CHILD_SYNCH_RETRY_COUNT) {
                        SynchronizationTaskService synchronizationTaskService = this;
                        synchronized (synchronizationTaskService) {
                            if (!this.getHost().isStopping()) {
                                this.logWarning("Retrying synchronization for %d failed services", failedServices.size());
                                this.scheduleRetry(() -> this.synchronizeChildrenInQueryPage(task, rsp, failedServices, retryCount + 1, totalServiceCount), STAT_NAME_CHILD_SYNCH_RETRY_COUNT);
                                this.adjustStat(STAT_NAME_SYNCH_RETRY_COUNT, 1.0);
                            }
                            return;
                        }
                    }
                    if (!this.getHost().isStopping()) {
                        this.logSevere("Synchronization failed for %d services", failedServices.size());
                    }
                    this.adjustStat("childSynchFailureCount", (double)failedServices.size());
                    task.synchCompletionCount += totalServiceCount - failedServices.size();
                    this.sendSelfFailurePatch(task, "Too many retries in synchronizing child services");
                    return;
                }
                this.adjustStat("childSynchFailureCount", (double)failedServices.size());
                task.synchCompletionCount += totalServiceCount - failedServices.size();
                this.sendSelfFailurePatch(task, "Too many failures in synchronizing child services");
                return;
            }
            this.setStat(STAT_NAME_CHILD_SYNCH_RETRY_COUNT, 0.0);
            task.queryPageReference = rsp.nextPageLink != null ? UriUtils.buildUri(task.queryPageReference, rsp.nextPageLink) : null;
            task.synchCompletionCount += totalServiceCount;
            if (task.queryPageReference == null) {
                this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.CHECK_NG_AVAILABILITY));
                return;
            }
            this.sendSelfPatch(task, TaskState.TaskStage.STARTED, this.subStageSetter(SubStage.SYNCHRONIZE));
        };
        for (String link : documentLinks) {
            if (this.getHost().isStopping()) {
                this.sendSelfCancellationPatch(task, "host is stopping");
                return;
            }
            this.synchronizeService(task, link, c);
        }
    }

    private void scheduleRetry(Runnable task, String statNameRetryCount) {
        this.adjustStat(statNameRetryCount, 1.0);
        ServiceStats.ServiceStat stat = this.getStat(statNameRetryCount);
        long retryCounter = 0L;
        if (stat != null) {
            retryCounter = (long)stat.latestValue;
        }
        long delay = this.getExponentialDelay(statNameRetryCount);
        this.logWarning("%s: Scheduling retry #%d of task (counter:%s) in %d microseconds", this.getSelfLink(), retryCounter, statNameRetryCount, delay);
        this.getHost().scheduleCore(task, delay, TimeUnit.MICROSECONDS);
    }

    private long getExponentialDelay(String statNameRetryCount) {
        long delay = this.getHost().getMaintenanceIntervalMicros();
        ServiceStats.ServiceStat stat = this.getStat(statNameRetryCount);
        if (stat != null && stat.latestValue > 0.0) {
            return (long)(1 << (int)stat.latestValue) * delay;
        }
        return delay;
    }

    private void verifySynchronizationOwnership(State task, Consumer<NodeSelectorService.SelectOwnerResponse> ownerHandler, Consumer<NodeSelectorService.SelectOwnerResponse> nonOwnerHandler, Consumer<Throwable> failureHandler) {
        Operation selectOp = Operation.createPost(null).setExpiration(task.documentExpirationTimeMicros).setCompletion((o, e) -> {
            if (e != null) {
                if (failureHandler != null) {
                    failureHandler.accept(e);
                }
                return;
            }
            NodeSelectorService.SelectOwnerResponse rsp = o.getBody(NodeSelectorService.SelectOwnerResponse.class);
            if (!rsp.isLocalHostOwner) {
                if (nonOwnerHandler != null) {
                    nonOwnerHandler.accept(rsp);
                }
                return;
            }
            if (ownerHandler != null) {
                ownerHandler.accept(rsp);
            }
        });
        this.getHost().selectOwner(task.nodeSelectorLink, task.factorySelfLink, selectOp);
    }

    private void synchronizeService(State task, String link, Operation.CompletionHandler c) {
        ServiceDocument d = new ServiceDocument();
        d.documentSelfLink = UriUtils.getLastPathSegment(link);
        Operation synchRequest = Operation.createPost(this, task.factorySelfLink).setBody(d).setCompletion(c).setReferer(this.getUri()).setConnectionSharing(true).setConnectionTag("xn-cnx-tag-synch").addPragmaDirective("xn-synch-owner").setRetryCount(0);
        try {
            this.sendRequest(synchRequest);
        }
        catch (Exception e) {
            this.logSevere(e);
            synchRequest.fail(e);
        }
    }

    private void handleCheckNodeGroupAvailabilityStage(State task) {
        Operation getNodeSelectorStateOp = Operation.createGet(this.getHost(), task.nodeSelectorLink).setCompletion((o, e) -> {
            if (e != null || !o.hasBody()) {
                this.sendSelfFailurePatch(task, "failed to get node selector state");
                return;
            }
            NodeSelectorState nsState = o.getBody(NodeSelectorState.class);
            if (!NodeSelectorState.isAvailable(nsState)) {
                this.sendSelfFailurePatch(task, "node group is not available");
                return;
            }
            if (this.parent != null && this.parent.hasChildOption(Service.ServiceOption.PERSISTENCE) && this.checkpointEnabled(task)) {
                Consumer<NodeSelectorService.SelectOwnerResponse> ownerHandler = selectOwnerResponse -> {
                    if (selectOwnerResponse.availableNodeCount > 1) {
                        this.createCheckpointsAndReschedule(task);
                    } else {
                        this.sendSelfFinishedPatch(task);
                    }
                };
                Consumer<NodeSelectorService.SelectOwnerResponse> nonOwnerHandler = selectOwnerResponse -> this.sendSelfFinishedPatch(task);
                Consumer<Throwable> failureHandler = ex -> this.sendSelfFinishedPatch(task);
                this.verifySynchronizationOwnership(task, ownerHandler, nonOwnerHandler, failureHandler);
                return;
            }
            this.sendSelfFinishedPatch(task);
        });
        this.sendRequest(getNodeSelectorStateOp);
    }

    private void createCheckpointsAndReschedule(State task) {
        CheckpointService.CheckpointState s = new CheckpointService.CheckpointState();
        s.timestamp = task.startTimeMicros;
        s.factoryLink = this.parent.getSelfLink();
        this.logInfo("Creating checkpoints for factory %s with timestamp: %d", s.factoryLink, s.timestamp);
        Operation post = Operation.createPost(UriUtils.buildUri(this.getHost(), "/core/checkpoints")).setBody(s).setReferer(this.getUri()).setCompletion((op, ex) -> {
            this.getHost().scheduleCore(() -> {
                State scheduleTask = this.parent.createSynchronizationTaskState(task.membershipUpdateTimeMicros);
                Operation.createPost(this, FACTORY_LINK).setBody(scheduleTask).sendWith(this);
            }, this.schedulePeriodSeconds, TimeUnit.SECONDS);
            this.sendSelfFinishedPatch(task);
        });
        this.getHost().broadcastRequest(this.parent.getPeerNodeSelectorPath(), "/core/checkpoints", false, post, task.checkpointNodes);
    }

    private void setFactoryAvailability(State task, boolean isAvailable, Consumer<Operation> action, Operation parentOp) {
        ServiceStats.ServiceStat body = new ServiceStats.ServiceStat();
        body.name = "isAvailable";
        body.latestValue = isAvailable ? 1.0 : 0.0;
        Operation put = Operation.createPut(UriUtils.buildAvailableUri(this.getHost(), task.factorySelfLink)).setBody(body).setConnectionSharing(true).setConnectionTag("xn-cnx-tag-synch").setCompletion((o, e) -> {
            if (parentOp != null) {
                parentOp.complete();
            }
            if (e != null) {
                this.logSevere("Setting factory availability failed with error %s", e.getMessage());
                this.sendSelfFailurePatch(task, "Failed to set Factory Availability");
                return;
            }
            if (action != null) {
                action.accept(o);
            }
        });
        this.sendRequest(put);
    }

    public void setParentService(FactoryService factoryService) {
        this.parent = factoryService;
    }

    @Override
    protected void sendSelfPatch(State taskState, TaskState.TaskStage stage, Consumer<State> updateTaskState) {
        taskState.failureMessage = "";
        super.sendSelfPatch(taskState, stage, updateTaskState);
    }

    private Consumer<State> subStageSetter(SubStage subStage) {
        return taskState -> {
            taskState.subStage = subStage;
        };
    }

    private boolean checkpointEnabled(State task) {
        return this.isCheckpointEnabled && !"/core/in-memory-document-index".equals(task.childDocumentIndexLink);
    }

    public static class State
    extends TaskService.TaskServiceState {
        public String factorySelfLink;
        public String factoryStateKind;
        public String nodeSelectorLink;
        public EnumSet<Service.ServiceOption> childOptions;
        public String childDocumentIndexLink;
        public int queryResultLimit;
        public Long membershipUpdateTimeMicros;
        public Long startTimeMicros;
        @ServiceDocument.UsageOption(option=ServiceDocumentDescription.PropertyUsageOption.AUTO_MERGE_IF_NOT_NULL)
        public Long checkpoint;
        @ServiceDocument.UsageOption(option=ServiceDocumentDescription.PropertyUsageOption.AUTO_MERGE_IF_NOT_NULL)
        public Collection<String> checkpointNodes;
        @ServiceDocument.UsageOption(option=ServiceDocumentDescription.PropertyUsageOption.AUTO_MERGE_IF_NOT_NULL)
        public SubStage subStage;
        @ServiceDocument.UsageOption(option=ServiceDocumentDescription.PropertyUsageOption.AUTO_MERGE_IF_NOT_NULL)
        public URI queryPageReference;
        @ServiceDocument.UsageOption(option=ServiceDocumentDescription.PropertyUsageOption.AUTO_MERGE_IF_NOT_NULL)
        public int synchCompletionCount;
    }

    public static enum SubStage {
        GET_CHECKPOINTS,
        QUERY,
        SYNCHRONIZE,
        RESTART,
        CHECK_NG_AVAILABILITY;

    }
}

