/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.concurrent;

import akka.dispatch.OnComplete;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.Preconditions;
import scala.Function1;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

public class FutureUtils {
    public static <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> operation, int retries, Executor executor) {
        CompletableFuture resultFuture = new CompletableFuture();
        FutureUtils.retryOperation(resultFuture, operation, retries, executor);
        return resultFuture;
    }

    private static <T> void retryOperation(CompletableFuture<T> resultFuture, Supplier<CompletableFuture<T>> operation, int retries, Executor executor) {
        if (!resultFuture.isDone()) {
            CompletableFuture operationFuture = operation.get();
            operationFuture.whenCompleteAsync((t, throwable) -> {
                if (throwable != null) {
                    if (throwable instanceof CancellationException) {
                        resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", (Throwable)throwable));
                    } else if (retries > 0) {
                        FutureUtils.retryOperation(resultFuture, operation, retries - 1, executor);
                    } else {
                        resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries has been exhausted.", (Throwable)throwable));
                    }
                } else {
                    resultFuture.complete(t);
                }
            }, executor);
            resultFuture.whenComplete((t, throwable) -> operationFuture.cancel(false));
        }
    }

    public static <T> CompletableFuture<T> retryWithDelay(Supplier<CompletableFuture<T>> operation, int retries, Time retryDelay, ScheduledExecutor scheduledExecutor) {
        CompletableFuture resultFuture = new CompletableFuture();
        FutureUtils.retryOperationWithDelay(resultFuture, operation, retries, retryDelay, scheduledExecutor);
        return resultFuture;
    }

    private static <T> void retryOperationWithDelay(CompletableFuture<T> resultFuture, Supplier<CompletableFuture<T>> operation, int retries, Time retryDelay, ScheduledExecutor scheduledExecutor) {
        if (!resultFuture.isDone()) {
            CompletableFuture operationResultFuture = operation.get();
            operationResultFuture.whenCompleteAsync((t, throwable) -> {
                if (throwable != null) {
                    if (throwable instanceof CancellationException) {
                        resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", (Throwable)throwable));
                    } else if (retries > 0) {
                        ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(() -> FutureUtils.lambda$null$2(resultFuture, (Supplier)operation, retries, retryDelay, scheduledExecutor), retryDelay.toMilliseconds(), TimeUnit.MILLISECONDS);
                        resultFuture.whenComplete((innerT, innerThrowable) -> scheduledFuture.cancel(false));
                    } else {
                        resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries has been exhausted.", (Throwable)throwable));
                    }
                } else {
                    resultFuture.complete(t);
                }
            }, (Executor)scheduledExecutor);
            resultFuture.whenComplete((t, throwable) -> operationResultFuture.cancel(false));
        }
    }

    public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends CompletableFuture<? extends T>> futures) {
        Preconditions.checkNotNull(futures, (String)"futures");
        ResultConjunctFuture conjunct = new ResultConjunctFuture(futures.size());
        if (futures.isEmpty()) {
            conjunct.complete(Collections.emptyList());
        } else {
            for (CompletableFuture<T> future : futures) {
                future.whenComplete(conjunct::handleCompletedFuture);
            }
        }
        return conjunct;
    }

    public static ConjunctFuture<Void> waitForAll(Collection<? extends CompletableFuture<?>> futures) {
        Preconditions.checkNotNull(futures, (String)"futures");
        return new WaitingConjunctFuture(futures);
    }

    public static <T> CompletableFuture<T> completedExceptionally(Throwable cause) {
        CompletableFuture result = new CompletableFuture();
        result.completeExceptionally(cause);
        return result;
    }

    public static FiniteDuration toFiniteDuration(Time time) {
        return new FiniteDuration(time.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    public static Time toTime(FiniteDuration finiteDuration) {
        return Time.milliseconds((long)finiteDuration.toMillis());
    }

    public static <T> CompletableFuture<T> toJava(Future<T> scalaFuture) {
        final CompletableFuture result = new CompletableFuture();
        scalaFuture.onComplete((Function1)new OnComplete<T>(){

            public void onComplete(Throwable failure, T success) throws Throwable {
                if (failure != null) {
                    result.completeExceptionally(failure);
                } else {
                    result.complete(success);
                }
            }
        }, Executors.directExecutionContext());
        return result;
    }

    public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
        ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
        future.whenComplete((value, throwable) -> {
            if (!timeoutFuture.isDone()) {
                timeoutFuture.cancel(false);
            }
        });
        return future;
    }

    private static /* synthetic */ void lambda$null$2(CompletableFuture resultFuture, Supplier operation, int retries, Time retryDelay, ScheduledExecutor scheduledExecutor) {
        FutureUtils.retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor);
    }

    private static final class Delayer {
        static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler"));

        private Delayer() {
        }

        private static ScheduledFuture<?> delay(Runnable runnable, long delay, TimeUnit timeUnit) {
            Preconditions.checkNotNull((Object)runnable);
            Preconditions.checkNotNull((Object)((Object)timeUnit));
            return delayer.schedule(runnable, delay, timeUnit);
        }
    }

    private static final class Timeout
    implements Runnable {
        private final CompletableFuture<?> future;

        private Timeout(CompletableFuture<?> future) {
            this.future = (CompletableFuture)Preconditions.checkNotNull(future);
        }

        @Override
        public void run() {
            this.future.completeExceptionally(new TimeoutException());
        }
    }

    private static final class WaitingConjunctFuture
    extends ConjunctFuture<Void> {
        private final AtomicInteger numCompleted = new AtomicInteger(0);
        private final int numTotal;

        private void handleCompletedFuture(Object ignored, Throwable throwable) {
            if (throwable == null) {
                if (this.numTotal == this.numCompleted.incrementAndGet()) {
                    this.complete(null);
                }
            } else {
                this.completeExceptionally(throwable);
            }
        }

        private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> futures) {
            Preconditions.checkNotNull(futures, (String)"Futures must not be null.");
            this.numTotal = futures.size();
            if (futures.isEmpty()) {
                this.complete(null);
            } else {
                for (CompletableFuture<?> future : futures) {
                    future.whenComplete(this::handleCompletedFuture);
                }
            }
        }

        @Override
        public int getNumFuturesTotal() {
            return this.numTotal;
        }

        @Override
        public int getNumFuturesCompleted() {
            return this.numCompleted.get();
        }
    }

    private static class ResultConjunctFuture<T>
    extends ConjunctFuture<Collection<T>> {
        private final int numTotal;
        private final AtomicInteger nextIndex = new AtomicInteger(0);
        private final AtomicInteger numCompleted = new AtomicInteger(0);
        private volatile T[] results;

        final void handleCompletedFuture(T value, Throwable throwable) {
            if (throwable != null) {
                this.completeExceptionally(throwable);
            } else {
                int index = this.nextIndex.getAndIncrement();
                this.results[index] = value;
                if (this.numCompleted.incrementAndGet() == this.numTotal) {
                    this.complete(Arrays.asList(this.results));
                }
            }
        }

        ResultConjunctFuture(int numTotal) {
            this.numTotal = numTotal;
            this.results = new Object[numTotal];
        }

        @Override
        public int getNumFuturesTotal() {
            return this.numTotal;
        }

        @Override
        public int getNumFuturesCompleted() {
            return this.numCompleted.get();
        }
    }

    public static abstract class ConjunctFuture<T>
    extends CompletableFuture<T> {
        public abstract int getNumFuturesTotal();

        public abstract int getNumFuturesCompleted();
    }

    public static class RetryException
    extends Exception {
        private static final long serialVersionUID = 3613470781274141862L;

        public RetryException(String message) {
            super(message);
        }

        public RetryException(String message, Throwable cause) {
            super(message, cause);
        }

        public RetryException(Throwable cause) {
            super(cause);
        }
    }
}

