/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.implementation;

import com.azure.core.http.policy.RetryOptions;
import com.azure.core.http.policy.RetryStrategy;
import com.azure.core.implementation.ImplUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;

public final class RetriableDownloadFlux
extends Flux<ByteBuffer> {
    private static final ClientLogger LOGGER = new ClientLogger(RetriableDownloadFlux.class);
    private final Supplier<Flux<ByteBuffer>> downloadSupplier;
    private final BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume;
    private final RetryStrategy retryStrategy;
    private final int maxRetries;
    private final long position;
    private final int retryCount;

    public RetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume, RetryOptions retryOptions, long position) {
        this(downloadSupplier, onDownloadErrorResume, ImplUtils.getRetryStrategyFromOptions(retryOptions), position, 0);
    }

    private RetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume, RetryStrategy retryStrategy, long position, int retryCount) {
        this.downloadSupplier = downloadSupplier;
        this.onDownloadErrorResume = onDownloadErrorResume;
        this.retryStrategy = retryStrategy;
        this.maxRetries = retryStrategy.getMaxRetries();
        this.position = position;
        this.retryCount = retryCount;
    }

    @Override
    public void subscribe(CoreSubscriber<? super ByteBuffer> actual) {
        long[] currentPosition = new long[]{this.position};
        this.downloadSupplier.get().map(buffer -> {
            currentPosition[0] = currentPosition[0] + (long)buffer.remaining();
            return buffer;
        }).onErrorResume(Exception.class, (? super E exception) -> {
            int updatedRetryCount = this.retryCount + 1;
            if (updatedRetryCount > this.maxRetries) {
                LOGGER.log(LogLevel.ERROR, () -> "Exhausted all retry attempts while downloading, " + this.maxRetries + " of " + this.maxRetries + ".", (Throwable)exception);
                return Flux.error(exception);
            }
            LOGGER.log(LogLevel.INFORMATIONAL, () -> "Using retry attempt " + updatedRetryCount + " of " + this.maxRetries + " while downloading.", (Throwable)exception);
            Duration backoff = this.retryStrategy.calculateRetryDelay(updatedRetryCount);
            RetriableDownloadFlux retryDownload = new RetriableDownloadFlux(() -> this.onDownloadErrorResume.apply((Throwable)exception, currentPosition[0]), this.onDownloadErrorResume, this.retryStrategy, currentPosition[0], updatedRetryCount);
            if (backoff != null && !backoff.isNegative() && !backoff.isZero()) {
                return retryDownload.delaySubscription(backoff);
            }
            return retryDownload;
        }).subscribe(actual);
    }
}

