package org.elasticsearch.xpack.ml.datafeed;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ClientHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJob;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksService;

/* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/DatafeedManager.class */
public class DatafeedManager extends AbstractComponent {
    private final Client client;
    private final ClusterService clusterService;
    private final PersistentTasksService persistentTasksService;
    private final ThreadPool threadPool;
    private final Supplier<Long> currentTimeSupplier;
    private final Auditor auditor;
    private final ConcurrentMap<Long, Holder> runningDatafeedsOnThisNode;
    private final DatafeedJobBuilder datafeedJobBuilder;
    private final TaskRunner taskRunner;
    private volatile boolean isolated;

    /* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/DatafeedManager$Holder.class */
    public class Holder {
        private final String taskId;
        private final long allocationId;
        private final DatafeedConfig datafeed;
        private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
        private final DatafeedJob datafeedJob;
        private final boolean autoCloseJob;
        private final ProblemTracker problemTracker;
        private final Consumer<Exception> handler;
        volatile Future<?> future;
        private volatile boolean isRelocating;

        Holder(String str, long j, DatafeedConfig datafeedConfig, DatafeedJob datafeedJob, boolean z, ProblemTracker problemTracker, Consumer<Exception> consumer) {
            this.taskId = str;
            this.allocationId = j;
            this.datafeed = datafeedConfig;
            this.datafeedJob = datafeedJob;
            this.autoCloseJob = z;
            this.problemTracker = problemTracker;
            this.handler = consumer;
        }

        String getJobId() {
            return this.datafeed.getJobId();
        }

        boolean isRunning() {
            return this.datafeedJob.isRunning();
        }

        boolean isIsolated() {
            return this.datafeedJob.isIsolated();
        }

        public void stop(String str, TimeValue timeValue, Exception exc) {
            if (this.isRelocating) {
                return;
            }
            DatafeedManager.this.logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", str, this.datafeed.getId(), this.datafeed.getJobId());
            if (!this.datafeedJob.stop()) {
                DatafeedManager.this.logger.info("[{}] datafeed [{}] for job [{}] was already stopped", str, this.datafeed.getId(), this.datafeed.getJobId());
                return;
            }
            boolean z = false;
            try {
                try {
                    DatafeedManager.this.logger.info("[{}] try lock [{}] to stop datafeed [{}] for job [{}]...", str, timeValue, this.datafeed.getId(), this.datafeed.getJobId());
                    z = this.datafeedJobLock.tryLock(timeValue.millis(), TimeUnit.MILLISECONDS);
                    DatafeedManager.this.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", str, this.datafeed.getId(), this.datafeed.getJobId(), Boolean.valueOf(z));
                    DatafeedManager.this.runningDatafeedsOnThisNode.remove(Long.valueOf(this.allocationId));
                    FutureUtils.cancel(this.future);
                    DatafeedManager.this.auditor.info(this.datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
                    this.handler.accept(exc);
                    DatafeedManager.this.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", str, this.datafeed.getId(), this.datafeed.getJobId(), z ? "" : ", but there may be pending tasks as the timeout [" + timeValue.getStringRep() + "] expired");
                    if (this.autoCloseJob) {
                        closeJob();
                    }
                    if (z) {
                        this.datafeedJobLock.unlock();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    DatafeedManager.this.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", str, this.datafeed.getId(), this.datafeed.getJobId(), Boolean.valueOf(z));
                    DatafeedManager.this.runningDatafeedsOnThisNode.remove(Long.valueOf(this.allocationId));
                    FutureUtils.cancel(this.future);
                    DatafeedManager.this.auditor.info(this.datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
                    this.handler.accept(exc);
                    DatafeedManager.this.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", str, this.datafeed.getId(), this.datafeed.getJobId(), z ? "" : ", but there may be pending tasks as the timeout [" + timeValue.getStringRep() + "] expired");
                    if (this.autoCloseJob) {
                        closeJob();
                    }
                    if (z) {
                        this.datafeedJobLock.unlock();
                    }
                }
            } catch (Throwable th) {
                DatafeedManager.this.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", str, this.datafeed.getId(), this.datafeed.getJobId(), Boolean.valueOf(z));
                DatafeedManager.this.runningDatafeedsOnThisNode.remove(Long.valueOf(this.allocationId));
                FutureUtils.cancel(this.future);
                DatafeedManager.this.auditor.info(this.datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
                this.handler.accept(exc);
                DatafeedManager.this.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", str, this.datafeed.getId(), this.datafeed.getJobId(), z ? "" : ", but there may be pending tasks as the timeout [" + timeValue.getStringRep() + "] expired");
                if (this.autoCloseJob) {
                    closeJob();
                }
                if (z) {
                    this.datafeedJobLock.unlock();
                }
                throw th;
            }
        }

        public void isolateDatafeed() {
            this.datafeedJob.isolate();
        }

        public void setRelocating() {
            this.isRelocating = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Long executeLoopBack(long j, Long l) throws Exception {
            this.datafeedJobLock.lock();
            try {
                if (!isRunning() || isIsolated()) {
                    return null;
                }
                Long runLookBack = this.datafeedJob.runLookBack(j, l);
                this.datafeedJobLock.unlock();
                return runLookBack;
            } finally {
                this.datafeedJobLock.unlock();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long executeRealTime() throws Exception {
            this.datafeedJobLock.lock();
            try {
                if (!isRunning() || isIsolated()) {
                    return -1L;
                }
                return this.datafeedJob.runRealtime();
            } finally {
                this.datafeedJobLock.unlock();
            }
        }

        private void closeJob() {
            JobState jobState = MlMetadata.getJobState(getJobId(), (PersistentTasksCustomMetaData) DatafeedManager.this.clusterService.state().getMetaData().custom("persistent_tasks"));
            if (jobState != JobState.OPENED) {
                DatafeedManager.this.logger.debug("[{}] No need to auto-close job as job state is [{}]", getJobId(), jobState);
            } else {
                DatafeedManager.this.persistentTasksService.waitForPersistentTaskStatus(this.taskId, (v0) -> {
                    return Objects.isNull(v0);
                }, TimeValue.timeValueSeconds(20L), new PersistentTasksService.WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.Holder.1
                    @Override // org.elasticsearch.action.ActionListener
                    public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
                        CloseJobAction.Request request = new CloseJobAction.Request(Holder.this.getJobId());
                        request.setLocal(true);
                        ClientHelper.executeAsyncWithOrigin(DatafeedManager.this.client, "ml", CloseJobAction.INSTANCE, request, new ActionListener<CloseJobAction.Response>() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.Holder.1.1
                            @Override // org.elasticsearch.action.ActionListener
                            public void onResponse(CloseJobAction.Response response) {
                                if (response.isClosed()) {
                                    return;
                                }
                                DatafeedManager.this.logger.error("[{}] job close action was not acknowledged", Holder.this.getJobId());
                            }

                            @Override // org.elasticsearch.action.ActionListener
                            public void onFailure(Exception exc) {
                                if ((exc instanceof ElasticsearchStatusException) && ((ElasticsearchStatusException) exc).status() == RestStatus.CONFLICT) {
                                    DatafeedManager.this.logger.debug("[{}] {}", Holder.this.getJobId(), exc.getMessage());
                                } else {
                                    DatafeedManager.this.logger.error("[" + Holder.this.getJobId() + "] failed to auto-close job", (Throwable) exc);
                                }
                            }
                        });
                    }

                    @Override // org.elasticsearch.action.ActionListener
                    public void onFailure(Exception exc) {
                        DatafeedManager.this.logger.error("Failed to remove datafeed persistent task - will not auto close job [" + Holder.this.getJobId() + "]", (Throwable) exc);
                    }
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/DatafeedManager$TaskRunner.class */
    public class TaskRunner implements ClusterStateListener {
        private final List<StartDatafeedAction.DatafeedTask> tasksToRun;

        private TaskRunner() {
            this.tasksToRun = new CopyOnWriteArrayList();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void runWhenJobIsOpened(StartDatafeedAction.DatafeedTask datafeedTask) {
            if (DatafeedManager.this.getJobState((PersistentTasksCustomMetaData) DatafeedManager.this.clusterService.state().getMetaData().custom("persistent_tasks"), datafeedTask) == JobState.OPENED) {
                runTask(datafeedTask);
            } else {
                DatafeedManager.this.logger.info("Datafeed [{}] is waiting for job [{}] to be opened", datafeedTask.getDatafeedId(), DatafeedManager.this.getJobId(datafeedTask));
                this.tasksToRun.add(datafeedTask);
            }
        }

        private void runTask(StartDatafeedAction.DatafeedTask datafeedTask) {
            DatafeedManager.this.innerRun((Holder) DatafeedManager.this.runningDatafeedsOnThisNode.get(Long.valueOf(datafeedTask.getAllocationId())), datafeedTask.getDatafeedStartTime(), datafeedTask.getEndTime());
        }

        @Override // org.elasticsearch.cluster.ClusterStateListener
        public void clusterChanged(ClusterChangedEvent clusterChangedEvent) {
            if (this.tasksToRun.isEmpty() || !clusterChangedEvent.metaDataChanged()) {
                return;
            }
            PersistentTasksCustomMetaData persistentTasksCustomMetaData = (PersistentTasksCustomMetaData) clusterChangedEvent.previousState().getMetaData().custom("persistent_tasks");
            PersistentTasksCustomMetaData persistentTasksCustomMetaData2 = (PersistentTasksCustomMetaData) clusterChangedEvent.state().getMetaData().custom("persistent_tasks");
            if (Objects.equals(persistentTasksCustomMetaData, persistentTasksCustomMetaData2)) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            for (StartDatafeedAction.DatafeedTask datafeedTask : this.tasksToRun) {
                if (DatafeedManager.this.runningDatafeedsOnThisNode.containsKey(Long.valueOf(datafeedTask.getAllocationId()))) {
                    JobState jobState = DatafeedManager.this.getJobState(persistentTasksCustomMetaData2, datafeedTask);
                    if (jobState == JobState.OPENED) {
                        runTask(datafeedTask);
                    } else if (jobState == JobState.OPENING) {
                        arrayList.add(datafeedTask);
                    } else {
                        DatafeedManager.this.logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]", datafeedTask.getDatafeedId(), DatafeedManager.this.getJobId(datafeedTask), jobState);
                        datafeedTask.stop("job_never_opened", TimeValue.timeValueSeconds(20L));
                    }
                }
            }
            this.tasksToRun.retainAll(arrayList);
        }
    }

    public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, Supplier<Long> supplier, Auditor auditor, PersistentTasksService persistentTasksService) {
        super(Settings.EMPTY);
        this.runningDatafeedsOnThisNode = new ConcurrentHashMap();
        this.taskRunner = new TaskRunner();
        this.client = (Client) Objects.requireNonNull(client);
        this.clusterService = (ClusterService) Objects.requireNonNull(clusterService);
        this.threadPool = threadPool;
        this.currentTimeSupplier = (Supplier) Objects.requireNonNull(supplier);
        this.auditor = (Auditor) Objects.requireNonNull(auditor);
        this.persistentTasksService = (PersistentTasksService) Objects.requireNonNull(persistentTasksService);
        this.datafeedJobBuilder = (DatafeedJobBuilder) Objects.requireNonNull(datafeedJobBuilder);
        clusterService.addListener(this.taskRunner);
    }

    public void run(StartDatafeedAction.DatafeedTask datafeedTask, Consumer<Exception> consumer) {
        String datafeedId = datafeedTask.getDatafeedId();
        MlMetadata mlMetadata = (MlMetadata) this.clusterService.state().metaData().custom("ml");
        if (mlMetadata == null) {
            mlMetadata = MlMetadata.EMPTY_METADATA;
        }
        DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
        Job job = mlMetadata.getJobs().get(datafeed.getJobId());
        CheckedConsumer checkedConsumer = datafeedJob -> {
            this.runningDatafeedsOnThisNode.put(Long.valueOf(datafeedTask.getAllocationId()), new Holder(datafeedTask.getPersistentTaskId(), datafeedTask.getAllocationId(), datafeed, datafeedJob, datafeedTask.isLookbackOnly(), new ProblemTracker(this.auditor, job.getId()), consumer));
            datafeedTask.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.1
                @Override // org.elasticsearch.action.ActionListener
                public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                    DatafeedManager.this.taskRunner.runWhenJobIsOpened(datafeedTask);
                }

                @Override // org.elasticsearch.action.ActionListener
                public void onFailure(Exception exc) {
                    consumer.accept(exc);
                }
            });
        };
        consumer.getClass();
        this.datafeedJobBuilder.build(job, datafeed, ActionListener.wrap(checkedConsumer, (v1) -> {
            r1.accept(v1);
        }));
    }

    public void stopDatafeed(StartDatafeedAction.DatafeedTask datafeedTask, String str, TimeValue timeValue) {
        this.logger.info("[{}] attempt to stop datafeed [{}] [{}]", str, datafeedTask.getDatafeedId(), Long.valueOf(datafeedTask.getAllocationId()));
        Holder remove = this.runningDatafeedsOnThisNode.remove(Long.valueOf(datafeedTask.getAllocationId()));
        if (remove != null) {
            remove.stop(str, timeValue, null);
        }
    }

    public void stopAllDatafeedsOnThisNode(String str) {
        int size = this.runningDatafeedsOnThisNode.size();
        if (size != 0) {
            this.logger.info("Closing [{}] datafeeds, because [{}]", Integer.valueOf(size), str);
            Iterator<Holder> it = this.runningDatafeedsOnThisNode.values().iterator();
            while (it.hasNext()) {
                it.next().stop(str, TimeValue.timeValueSeconds(20L), null);
            }
        }
    }

    public void isolateAllDatafeedsOnThisNode() {
        this.isolated = true;
        Iterator<Holder> it = this.runningDatafeedsOnThisNode.values().iterator();
        while (it.hasNext()) {
            Holder next = it.next();
            next.isolateDatafeed();
            next.setRelocating();
            it.remove();
        }
    }

    public void isolateDatafeed(long j) {
        Holder holder = this.runningDatafeedsOnThisNode.get(Long.valueOf(j));
        if (holder != null) {
            holder.isolateDatafeed();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void innerRun(final Holder holder, final long j, final Long l) {
        holder.future = this.threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.2
            @Override // org.elasticsearch.common.util.concurrent.AbstractRunnable
            public void onFailure(Exception exc) {
                DatafeedManager.this.logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", (Throwable) exc);
                holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20L), exc);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.elasticsearch.common.util.concurrent.AbstractRunnable
            public void doRun() throws Exception {
                Long l2 = null;
                try {
                    l2 = holder.executeLoopBack(j, l);
                } catch (DatafeedJob.AnalysisProblemException e) {
                    if (l == null) {
                        l2 = Long.valueOf(e.nextDelayInMsSinceEpoch);
                    }
                    holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
                    if (e.shouldStop) {
                        holder.stop("lookback_analysis_error", TimeValue.timeValueSeconds(20L), e);
                        return;
                    }
                } catch (DatafeedJob.EmptyDataCountException e2) {
                    if (l == null) {
                        holder.problemTracker.reportEmptyDataCount();
                        l2 = Long.valueOf(e2.nextDelayInMsSinceEpoch);
                    } else {
                        String message = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA);
                        DatafeedManager.this.logger.warn("[{}] {}", holder.datafeed.getJobId(), message);
                        DatafeedManager.this.auditor.warning(holder.datafeed.getJobId(), message);
                    }
                } catch (DatafeedJob.ExtractionProblemException e3) {
                    if (l == null) {
                        l2 = Long.valueOf(e3.nextDelayInMsSinceEpoch);
                    }
                    holder.problemTracker.reportExtractionProblem(e3.getCause().getMessage());
                } catch (Exception e4) {
                    DatafeedManager.this.logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", (Throwable) e4);
                    holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20L), e4);
                    return;
                }
                if (DatafeedManager.this.isolated) {
                    return;
                }
                if (l2 != null) {
                    DatafeedManager.this.doDatafeedRealtime(l2.longValue(), holder.datafeed.getJobId(), holder);
                } else {
                    holder.stop("no_realtime", TimeValue.timeValueSeconds(20L), null);
                    holder.problemTracker.finishReport();
                }
            }
        });
    }

    void doDatafeedRealtime(long j, final String str, final Holder holder) {
        if (!holder.isRunning() || holder.isIsolated()) {
            return;
        }
        TimeValue computeNextDelay = computeNextDelay(j);
        this.logger.debug("Waiting [{}] before executing next realtime import for job [{}]", computeNextDelay, str);
        holder.future = this.threadPool.schedule(computeNextDelay, MachineLearning.DATAFEED_THREAD_POOL_NAME, new AbstractRunnable() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.3
            @Override // org.elasticsearch.common.util.concurrent.AbstractRunnable
            public void onFailure(Exception exc) {
                DatafeedManager.this.logger.error("Unexpected datafeed failure for job [" + str + "] stopping...", (Throwable) exc);
                holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20L), exc);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.elasticsearch.common.util.concurrent.AbstractRunnable
            public void doRun() throws Exception {
                long j2;
                try {
                    j2 = holder.executeRealTime();
                    holder.problemTracker.reportNoneEmptyCount();
                } catch (DatafeedJob.AnalysisProblemException e) {
                    j2 = e.nextDelayInMsSinceEpoch;
                    holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
                    if (e.shouldStop) {
                        holder.stop("realtime_analysis_error", TimeValue.timeValueSeconds(20L), e);
                        return;
                    }
                } catch (DatafeedJob.EmptyDataCountException e2) {
                    j2 = e2.nextDelayInMsSinceEpoch;
                    holder.problemTracker.reportEmptyDataCount();
                } catch (DatafeedJob.ExtractionProblemException e3) {
                    j2 = e3.nextDelayInMsSinceEpoch;
                    holder.problemTracker.reportExtractionProblem(e3.getCause().getMessage());
                } catch (Exception e4) {
                    DatafeedManager.this.logger.error("Unexpected datafeed failure for job [" + str + "] stopping...", (Throwable) e4);
                    holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20L), e4);
                    return;
                }
                holder.problemTracker.finishReport();
                if (j2 >= 0) {
                    DatafeedManager.this.doDatafeedRealtime(j2, str, holder);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getJobId(StartDatafeedAction.DatafeedTask datafeedTask) {
        return this.runningDatafeedsOnThisNode.get(Long.valueOf(datafeedTask.getAllocationId())).getJobId();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public JobState getJobState(PersistentTasksCustomMetaData persistentTasksCustomMetaData, StartDatafeedAction.DatafeedTask datafeedTask) {
        return MlMetadata.getJobState(getJobId(datafeedTask), persistentTasksCustomMetaData);
    }

    private TimeValue computeNextDelay(long j) {
        return new TimeValue(Math.max(1L, j - this.currentTimeSupplier.get().longValue()));
    }

    boolean isRunning(long j) {
        return this.runningDatafeedsOnThisNode.containsKey(Long.valueOf(j));
    }
}
