/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.action;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksService;

public class StopDatafeedAction
extends Action<Request, Response, RequestBuilder> {
    public static final StopDatafeedAction INSTANCE = new StopDatafeedAction();
    public static final String NAME = "cluster:admin/xpack/ml/datafeed/stop";
    public static final ParseField TIMEOUT = new ParseField("timeout", new String[0]);
    public static final ParseField FORCE = new ParseField("force", new String[0]);
    public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes((long)5L);

    private StopDatafeedAction() {
        super(NAME);
    }

    public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
        return new RequestBuilder(client, this);
    }

    public Response newResponse() {
        return new Response();
    }

    static List<String> resolve(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
        if (!"_all".equals(datafeedId)) {
            return Collections.singletonList(datafeedId);
        }
        if (mlMetadata.getDatafeeds().isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<String> matched_datafeeds = new ArrayList<String>();
        for (Map.Entry<String, DatafeedConfig> datafeedEntry : mlMetadata.getDatafeeds().entrySet()) {
            String resolvedDatafeedId = datafeedEntry.getKey();
            DatafeedConfig datafeed = datafeedEntry.getValue();
            DatafeedState datafeedState = MlMetadata.getDatafeedState(((DatafeedConfig)datafeed.get()).getId(), tasks);
            if (datafeedState == DatafeedState.STOPPED) continue;
            matched_datafeeds.add(resolvedDatafeedId);
        }
        return matched_datafeeds;
    }

    static PersistentTasksCustomMetaData.PersistentTask<?> validateAndReturnDatafeedTask(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
        DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
        if (datafeed == null) {
            throw new ResourceNotFoundException(Messages.getMessage("No datafeed with id [{0}] exists", datafeedId), new Object[0]);
        }
        PersistentTasksCustomMetaData.PersistentTask<?> task = MlMetadata.getDatafeedTask(datafeedId, tasks);
        if (task == null) {
            throw org.elasticsearch.xpack.ml.utils.ExceptionsHelper.conflictStatusException("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped", new Object[0]);
        }
        return task;
    }

    public static class TransportAction
    extends TransportTasksAction<StartDatafeedAction.DatafeedTask, Request, Response, Response> {
        private final PersistentTasksService persistentTasksService;

        @Inject
        public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, PersistentTasksService persistentTasksService) {
            super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, Response::new, "ml_utility");
            this.persistentTasksService = persistentTasksService;
        }

        protected void doExecute(Task task, final Request request, final ActionListener<Response> listener) {
            ClusterState state = this.clusterService.state();
            DiscoveryNodes nodes = state.nodes();
            if (!nodes.isLocalNodeElectedMaster()) {
                if (nodes.getMasterNode() == null) {
                    listener.onFailure((Exception)new MasterNotDiscoveredException("no known master node"));
                } else {
                    this.transportService.sendRequest(nodes.getMasterNode(), this.actionName, (TransportRequest)request, (TransportResponseHandler)new ActionListenerResponseHandler(listener, Response::new));
                }
            } else {
                MlMetadata mlMetadata = (MlMetadata)state.getMetaData().custom("ml");
                PersistentTasksCustomMetaData tasks = (PersistentTasksCustomMetaData)state.getMetaData().custom("persistent_tasks");
                final List<String> resolvedDatafeeds = StopDatafeedAction.resolve(request.getDatafeedId(), mlMetadata, tasks);
                if (resolvedDatafeeds.isEmpty()) {
                    listener.onResponse((Object)new Response(true));
                    return;
                }
                request.setResolvedDatafeedIds(resolvedDatafeeds.toArray(new String[resolvedDatafeeds.size()]));
                if (request.force) {
                    final AtomicInteger counter = new AtomicInteger();
                    final AtomicArray failures = new AtomicArray(resolvedDatafeeds.size());
                    for (String datafeedId : resolvedDatafeeds) {
                        PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
                        if (datafeedTask != null) {
                            this.persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>(){

                                public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                                    if (counter.incrementAndGet() == resolvedDatafeeds.size()) {
                                        this.sendResponseOrFailure(request.getDatafeedId(), (ActionListener<Response>)listener, (AtomicArray<Exception>)failures);
                                    }
                                }

                                public void onFailure(Exception e) {
                                    int slot = counter.incrementAndGet();
                                    failures.set(slot - 1, (Object)e);
                                    if (slot == resolvedDatafeeds.size()) {
                                        this.sendResponseOrFailure(request.getDatafeedId(), (ActionListener<Response>)listener, (AtomicArray<Exception>)failures);
                                    }
                                }
                            });
                            continue;
                        }
                        String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but datafeed's task could not be found.";
                        this.logger.warn(msg);
                        int slot = counter.incrementAndGet();
                        failures.set(slot - 1, (Object)new RuntimeException(msg));
                        if (slot != resolvedDatafeeds.size()) continue;
                        this.sendResponseOrFailure(request.getDatafeedId(), listener, (AtomicArray<Exception>)failures);
                    }
                } else {
                    HashSet<String> executorNodes = new HashSet<String>();
                    HashMap<String, String> datafeedIdToPersistentTaskId = new HashMap<String, String>();
                    for (String datafeedId : resolvedDatafeeds) {
                        PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = StopDatafeedAction.validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks);
                        executorNodes.add(datafeedTask.getExecutorNode());
                        datafeedIdToPersistentTaskId.put(datafeedId, datafeedTask.getId());
                    }
                    ActionListener finalListener = ActionListener.wrap(r -> this.waitForDatafeedStopped((Map<String, String>)datafeedIdToPersistentTaskId, request, (Response)((Object)r), listener), arg_0 -> listener.onFailure(arg_0));
                    request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
                    super.doExecute(task, (BaseTasksRequest)request, finalListener);
                }
            }
        }

        private void sendResponseOrFailure(String datafeedId, ActionListener<Response> listener, AtomicArray<Exception> failures) {
            List catchedExceptions = failures.asList();
            if (catchedExceptions.size() == 0) {
                listener.onResponse((Object)new Response(true));
                return;
            }
            String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() + "] failures, rethrowing last, all Exceptions: [" + catchedExceptions.stream().map(Throwable::getMessage).collect(Collectors.joining(", ")) + "]";
            ElasticsearchException e = new ElasticsearchException(msg, (Throwable)catchedExceptions.get(0), new Object[0]);
            listener.onFailure((Exception)e);
        }

        void waitForDatafeedStopped(Map<String, String> datafeedIdToPersistentTaskId, Request request, final Response response, final ActionListener<Response> listener) {
            this.persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> {
                for (Map.Entry entry : datafeedIdToPersistentTaskId.entrySet()) {
                    String persistentTaskId = (String)entry.getValue();
                    if (persistentTasksCustomMetaData.getTask(persistentTaskId) == null) continue;
                    return false;
                }
                return true;
            }, request.getTimeout(), new ActionListener<Boolean>(){

                public void onResponse(Boolean result) {
                    listener.onResponse((Object)response);
                }

                public void onFailure(Exception e) {
                    listener.onFailure(e);
                }
            });
        }

        protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
            if (request.getResolvedDatafeedIds().length != tasks.size()) {
                if (!taskOperationFailures.isEmpty()) {
                    throw ExceptionsHelper.convertToElastic((Exception)taskOperationFailures.get(0).getCause());
                }
                if (!failedNodeExceptions.isEmpty()) {
                    throw ExceptionsHelper.convertToElastic((Exception)((Exception)failedNodeExceptions.get(0)));
                }
                return new Response(true);
            }
            return new Response(tasks.stream().allMatch(Response::isStopped));
        }

        protected Response readTaskResponse(StreamInput in) throws IOException {
            return new Response(in);
        }

        protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener<Response> listener) {
            task.stop("stop_datafeed (api)", request.getStopTimeout());
            listener.onResponse((Object)new Response(true));
        }

        protected boolean accumulateExceptions() {
            return true;
        }
    }

    static class RequestBuilder
    extends ActionRequestBuilder<Request, Response, RequestBuilder> {
        RequestBuilder(ElasticsearchClient client, StopDatafeedAction action) {
            super(client, (Action)action, (ActionRequest)new Request());
        }
    }

    public static class Response
    extends BaseTasksResponse
    implements Writeable {
        private boolean stopped;

        public Response(boolean stopped) {
            super(null, null);
            this.stopped = stopped;
        }

        public Response(StreamInput in) throws IOException {
            this.readFrom(in);
        }

        public Response() {
        }

        public boolean isStopped() {
            return this.stopped;
        }

        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.stopped = in.readBoolean();
        }

        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeBoolean(this.stopped);
        }
    }

    public static class Request
    extends BaseTasksRequest<Request>
    implements ToXContent {
        public static ObjectParser<Request, Void> PARSER = new ObjectParser("cluster:admin/xpack/ml/datafeed/stop", Request::new);
        private String datafeedId;
        private String[] resolvedDatafeedIds;
        private TimeValue stopTimeout = DEFAULT_TIMEOUT;
        private boolean force = false;

        public static Request fromXContent(XContentParser parser) {
            return Request.parseRequest(null, parser);
        }

        public static Request parseRequest(String datafeedId, XContentParser parser) {
            Request request = (Request)((Object)PARSER.apply(parser, null));
            if (datafeedId != null) {
                request.datafeedId = datafeedId;
            }
            return request;
        }

        public Request(String datafeedId) {
            this.datafeedId = org.elasticsearch.xpack.ml.utils.ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
            this.resolvedDatafeedIds = new String[]{datafeedId};
        }

        Request() {
        }

        private String getDatafeedId() {
            return this.datafeedId;
        }

        private String[] getResolvedDatafeedIds() {
            return this.resolvedDatafeedIds;
        }

        private void setResolvedDatafeedIds(String[] resolvedDatafeedIds) {
            this.resolvedDatafeedIds = resolvedDatafeedIds;
        }

        public TimeValue getStopTimeout() {
            return this.stopTimeout;
        }

        public void setStopTimeout(TimeValue stopTimeout) {
            this.stopTimeout = org.elasticsearch.xpack.ml.utils.ExceptionsHelper.requireNonNull(stopTimeout, TIMEOUT.getPreferredName());
        }

        public boolean isForce() {
            return this.force;
        }

        public void setForce(boolean force) {
            this.force = force;
        }

        public boolean match(Task task) {
            for (String id : this.resolvedDatafeedIds) {
                String expectedDescription = "datafeed-" + id;
                if (!(task instanceof StartDatafeedAction.DatafeedTask) || !expectedDescription.equals(task.getDescription())) continue;
                return true;
            }
            return false;
        }

        public ActionRequestValidationException validate() {
            return null;
        }

        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.datafeedId = in.readString();
            this.resolvedDatafeedIds = in.readStringArray();
            this.stopTimeout = new TimeValue(in);
            this.force = in.readBoolean();
        }

        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(this.datafeedId);
            out.writeStringArray(this.resolvedDatafeedIds);
            this.stopTimeout.writeTo(out);
            out.writeBoolean(this.force);
        }

        public int hashCode() {
            return Objects.hash(this.datafeedId, this.stopTimeout, this.force);
        }

        public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
            builder.startObject();
            builder.field(DatafeedConfig.ID.getPreferredName(), this.datafeedId);
            builder.field(TIMEOUT.getPreferredName(), this.stopTimeout.getStringRep());
            builder.field(FORCE.getPreferredName(), this.force);
            builder.endObject();
            return builder;
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (((Object)((Object)this)).getClass() != obj.getClass()) {
                return false;
            }
            Request other = (Request)((Object)obj);
            return Objects.equals(this.datafeedId, other.datafeedId) && Objects.equals(this.stopTimeout, other.stopTimeout) && Objects.equals(this.force, other.force);
        }

        static {
            PARSER.declareString((request, datafeedId) -> {
                request.datafeedId = datafeedId;
            }, DatafeedConfig.ID);
            PARSER.declareString((request, val) -> request.setStopTimeout(TimeValue.parseTimeValue((String)val, (String)TIMEOUT.getPreferredName())), TIMEOUT);
            PARSER.declareBoolean(Request::setForce, FORCE);
        }
    }
}

