/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.asyncprocessing.AsyncFutureImpl;
import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutor;
import org.apache.flink.runtime.asyncprocessing.AsyncRequest;
import org.apache.flink.runtime.asyncprocessing.AsyncRequestContainer;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.util.function.CheckedSupplier;

public class SimpleAsyncExecutionController<K>
extends AsyncExecutionController<K, RunnableTask<K, ?>> {
    public SimpleAsyncExecutionController(MailboxExecutor mailboxExecutor, AsyncFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler, ExecutorService asyncThreadPool, DeclarationManager declarationManager, EpochManager.ParallelMode epochParallelMode, int maxParallelism, int batchSize, long bufferTimeout, int maxInFlightRecords, @Nullable AsyncExecutionController.SwitchContextListener<K> switchContextListener, @Nullable MetricGroup metricGroup) {
        super(mailboxExecutor, exceptionHandler, new TaskExecutor(asyncThreadPool), declarationManager, epochParallelMode, maxParallelism, batchSize, bufferTimeout, maxInFlightRecords, switchContextListener, metricGroup);
    }

    public <R> InternalAsyncFuture<R> handleRequest(CheckedSupplier<R> checkedSupplier, boolean allowOverdraft) {
        InternalAsyncFuture asyncFuture = this.asyncFutureFactory.create(this.currentContext);
        RunnableTask request = new RunnableTask(this.currentContext, false, asyncFuture, checkedSupplier);
        this.handleRequest(request, allowOverdraft);
        return asyncFuture;
    }

    static class TaskExecutor<K>
    implements AsyncExecutor<RunnableTask<K, ?>> {
        private final ExecutorService taskExecutorService;
        private final boolean managedExecutor;

        public TaskExecutor(ExecutorService taskExecutorService) {
            this.taskExecutorService = taskExecutorService;
            this.managedExecutor = false;
        }

        @Override
        public CompletableFuture<Void> executeBatchRequests(AsyncRequestContainer<RunnableTask<K, ?>> asyncRequestContainer) {
            if (asyncRequestContainer.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            RunnableContainer container = (RunnableContainer)asyncRequestContainer;
            LinkedList requests = container.requests;
            while (!requests.isEmpty()) {
                RunnableTask request = requests.poll();
                if (request == null) continue;
                this.taskExecutorService.submit(() -> {
                    try {
                        request.run();
                    }
                    catch (Exception e) {
                        request.getFuture().completeExceptionally("Async task failed.", (Throwable)e);
                    }
                });
            }
            return CompletableFuture.completedFuture(null);
        }

        @Override
        public AsyncRequestContainer<RunnableTask<K, ?>> createRequestContainer() {
            return new RunnableContainer();
        }

        @Override
        public void executeRequestSync(RunnableTask<K, ?> asyncRequest) {
            try {
                asyncRequest.run();
            }
            catch (Exception e) {
                asyncRequest.getFuture().completeExceptionally("Task failed.", (Throwable)e);
            }
        }

        @Override
        public boolean fullyLoaded() {
            return false;
        }

        @Override
        public void shutdown() {
            if (this.managedExecutor) {
                this.taskExecutorService.shutdown();
            }
        }

        static class RunnableContainer<K>
        implements AsyncRequestContainer<RunnableTask<K, ?>> {
            LinkedList<RunnableTask<K, ?>> requests = new LinkedList();

            RunnableContainer() {
            }

            @Override
            public void offer(RunnableTask<K, ?> stateRequest) {
                this.requests.add(stateRequest);
            }

            @Override
            public boolean isEmpty() {
                return this.requests.isEmpty();
            }
        }
    }

    public static class RunnableTask<K, RET>
    extends AsyncRequest<K> {
        final CheckedSupplier<RET> runnable;

        public RunnableTask(RecordContext<K> context, boolean sync, InternalAsyncFuture<RET> asyncFuture, CheckedSupplier<RET> runnable) {
            super(context, sync, asyncFuture);
            this.runnable = runnable;
        }

        private void run() throws Exception {
            this.asyncFuture.complete(this.runnable.get());
        }
    }
}

