/*
 * Decompiled with CFR 0.152.
 */
package io.gravitee.gateway.services.sync.process.repository;

import io.gravitee.common.service.AbstractService;
import io.gravitee.common.utils.RxHelper;
import io.gravitee.gateway.services.sync.SyncManager;
import io.gravitee.gateway.services.sync.process.distributed.DistributedSynchronizer;
import io.gravitee.gateway.services.sync.process.distributed.service.DistributedSyncService;
import io.gravitee.gateway.services.sync.process.repository.RepositorySynchronizer;
import io.gravitee.gateway.services.sync.process.repository.handler.SyncHandler;
import io.gravitee.node.api.Node;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleSource;
import io.reactivex.rxjava3.disposables.Disposable;
import io.vertx.core.Handler;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSyncManager
extends AbstractService<SyncManager>
implements SyncManager {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultSyncManager.class);
    private static final String PATH = "/sync";
    public static final int TIMEFRAME_DELAY = 30000;
    public static final int RETRY_DELAY_MS = 3000;
    private final Router router;
    private final List<RepositorySynchronizer> synchronizers;
    private final List<DistributedSynchronizer> distributedSynchronizers;
    private final int retryAttempt;
    private final DistributedSyncService distributedSyncService;
    private final int delay;
    private final TimeUnit unit;
    private final List<String> environments;
    private final AtomicLong syncCounter = new AtomicLong(0L);
    private final AtomicBoolean initialSync = new AtomicBoolean(false);
    private final AtomicLong totalSyncOnErrorCounter = new AtomicLong(0L);
    private final AtomicBoolean lastSyncOnError = new AtomicBoolean(false);
    private final AtomicReference<String> lastSyncErrorMessage = new AtomicReference();
    private final AtomicBoolean isClusterPrimaryNode = new AtomicBoolean(true);
    private long nextFromTime = -1L;
    private Disposable refreshDisposable;
    private Route routeHandler;

    public DefaultSyncManager(Router router, Node node, List<RepositorySynchronizer> synchronizers, List<DistributedSynchronizer> distributedSynchronizers, DistributedSyncService distributedSyncService, int delay, TimeUnit unit, int retryAttempt) {
        this.router = router;
        this.synchronizers = synchronizers;
        this.distributedSynchronizers = distributedSynchronizers;
        this.distributedSyncService = distributedSyncService;
        this.delay = delay;
        this.unit = unit;
        this.retryAttempt = retryAttempt;
        this.environments = new ArrayList<String>((Set)node.metadata().get("environments"));
    }

    protected void doStart() {
        log.info("Starting sync manager");
        log.debug("Associate a new HTTP handler on {}", (Object)PATH);
        SyncHandler syncHandler = new SyncHandler(this);
        this.routeHandler = this.router.get(PATH).produces("application/json").handler((Handler)syncHandler);
        this.synchronizers.sort(Comparator.comparingInt(RepositorySynchronizer::order));
        if (this.distributedSyncService.isEnabled()) {
            this.distributedSyncService.validate();
            if (this.distributedSynchronizers != null) {
                this.distributedSynchronizers.sort(Comparator.comparingInt(DistributedSynchronizer::order));
            }
            this.isClusterPrimaryNode.set(this.distributedSyncService.isPrimaryNode());
        }
        this.synchronize().andThen((CompletableSource)Completable.fromRunnable(() -> {
            log.info("Sync service has been scheduled with delay [{}{}]", (Object)this.delay, (Object)this.unit.name());
            this.refreshDisposable = Flowable.generate(() -> 0L, (state, emitter) -> {
                emitter.onNext(state);
                return state + 1L;
            }).delay((long)this.delay, this.unit).rebatchRequests(1).concatMapCompletable(interval -> this.synchronize()).subscribe();
        })).blockingSubscribe();
    }

    protected void doStop() throws Exception {
        if (this.refreshDisposable != null) {
            this.refreshDisposable.dispose();
        }
        if (this.routeHandler != null) {
            this.routeHandler.remove();
        }
        super.doStop();
    }

    private Completable synchronize() {
        return Completable.defer(() -> {
            log.debug("Running synchronization process...");
            return this.distributedSyncService.ready();
        }).andThen((SingleSource)Single.defer(() -> {
            if (this.distributedSyncService.isEnabled() && this.distributedSyncService.isPrimaryNode() && (this.nextFromTime == -1L || this.isClusterPrimaryNode.compareAndSet(false, true))) {
                return this.distributedSyncService.state().map(distributedSyncState -> {
                    log.debug("Retrieving distributed sync state");
                    this.nextFromTime = distributedSyncState.getFrom();
                    return distributedSyncState.getTo();
                }).switchIfEmpty((SingleSource)Single.just((Object)System.currentTimeMillis()));
            }
            return Single.just((Object)System.currentTimeMillis());
        })).flatMapCompletable(nextToTime -> {
            Completable synchronizationCompletable;
            log.debug("Synchronization #{} started at {}", (Object)this.syncCounter.incrementAndGet(), (Object)Instant.now());
            log.debug("Events from {} to {} would be synchronized.", (Object)Instant.ofEpochMilli(this.nextFromTime - 30000L), (Object)Instant.ofEpochMilli(nextToTime + 30000L));
            if (this.distributedSyncService.isEnabled() && !this.distributedSyncService.isPrimaryNode()) {
                log.debug("Distributed synchronizers will be used as distributed sync is enabled, and current node is secondary.");
                synchronizationCompletable = this.distributedSynchronizers != null ? Flowable.fromIterable(this.distributedSynchronizers).concatMapCompletable(synchronizer -> synchronizer.synchronize(this.nextFromTime, (Long)nextToTime).compose(upstream -> this.retrySynchronizer(upstream, synchronizer.getClass().getSimpleName()))) : Completable.complete();
            } else {
                synchronizationCompletable = Flowable.fromIterable(this.synchronizers).concatMapCompletable(synchronizer -> synchronizer.synchronize(this.nextFromTime, (Long)nextToTime, this.environments).compose(upstream -> this.retrySynchronizer(upstream, synchronizer.getClass().getSimpleName())));
            }
            return synchronizationCompletable.andThen((CompletableSource)this.distributedSyncService.storeState(this.nextFromTime, (long)nextToTime)).doOnComplete(() -> {
                this.lastSyncOnError.set(false);
                this.lastSyncErrorMessage.set(null);
                if (this.nextFromTime == -1L) {
                    this.initialSync.set(true);
                }
                this.nextFromTime = nextToTime;
                log.debug("Synchronization #{} ended at {} (took {}ms}", new Object[]{this.syncCounter.get(), Instant.now().toString(), System.currentTimeMillis() - nextToTime});
            });
        }).onErrorResumeNext(throwable -> Completable.fromRunnable(() -> {
            if (!this.lastSyncOnError.get()) {
                log.error("Synchronization process has failed", throwable);
            } else {
                log.error("Synchronization process is still failing.");
            }
            this.lastSyncOnError.set(true);
            this.lastSyncErrorMessage.set(throwable.getMessage());
        }));
    }

    private Completable retrySynchronizer(Completable upstream, String synchronizerClazz) {
        return upstream.doOnError(throwable -> log.warn("An error occurs while executing synchronizer {}, retrying...", (Object)synchronizerClazz, throwable)).compose(RxHelper.retry((int)this.retryAttempt, (int)3000, (TimeUnit)TimeUnit.MILLISECONDS)).doOnError(throwable -> log.error("Latest attempt of synchronizer {} has failed", (Object)synchronizerClazz, throwable));
    }

    public long nextSyncTime() {
        return this.nextFromTime;
    }

    public long syncCounter() {
        return this.syncCounter.longValue();
    }

    @Override
    public boolean syncDone() {
        return this.initialSync.get();
    }

    public long totalSyncOnError() {
        return this.totalSyncOnErrorCounter.get();
    }

    public boolean lastSyncOnError() {
        return this.lastSyncOnError.get();
    }

    public String lastSyncErrorMessage() {
        return this.lastSyncErrorMessage.get();
    }
}

