/*
 * Decompiled with CFR 0.152.
 */
package io.gravitee.gateway.services.sync.process.distributed.synchronizer;

import io.gravitee.gateway.services.sync.process.common.deployer.Deployer;
import io.gravitee.gateway.services.sync.process.common.model.Deployable;
import io.gravitee.gateway.services.sync.process.common.model.SyncAction;
import io.gravitee.gateway.services.sync.process.distributed.DistributedSynchronizer;
import io.gravitee.gateway.services.sync.process.distributed.fetcher.DistributedEventFetcher;
import io.gravitee.repository.distributedsync.model.DistributedEvent;
import io.gravitee.repository.distributedsync.model.DistributedEventType;
import io.gravitee.repository.distributedsync.model.DistributedSyncAction;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.time.Instant;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractDistributedSynchronizer<T extends Deployable, Y extends Deployer<T>>
implements DistributedSynchronizer {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractDistributedSynchronizer.class);
    protected static final Set<DistributedSyncAction> INIT_SYNC_ACTIONS = Set.of(DistributedSyncAction.DEPLOY);
    protected static final Set<DistributedSyncAction> INCREMENTAL_SYNC_ACTIONS = Set.of(DistributedSyncAction.DEPLOY, DistributedSyncAction.UNDEPLOY);
    private final DistributedEventFetcher distributedEventFetcher;
    private final ThreadPoolExecutor syncFetcherExecutor;
    private final ThreadPoolExecutor syncDeployerExecutor;

    @Override
    public Completable synchronize(Long from, Long to) {
        boolean initialSync = from == null || from == -1L;
        AtomicLong launchTime = new AtomicLong();
        return this.distributedEventFetcher.fetchLatest(from, to, this.distributedEventType(), this.syncActions(initialSync)).subscribeOn(Schedulers.from((Executor)this.syncFetcherExecutor)).flatMapMaybe(this::mapTo).compose(upstream -> {
            Y deployer = this.createDeployer();
            return upstream.parallel(this.syncDeployerExecutor.getMaximumPoolSize()).runOn(Schedulers.from((Executor)this.syncDeployerExecutor)).flatMap(deployable -> {
                if (deployable.syncAction() == SyncAction.DEPLOY) {
                    return this.deploy(deployer, deployable);
                }
                if (deployable.syncAction() == SyncAction.UNDEPLOY) {
                    return this.undeploy(deployer, deployable);
                }
                return Flowable.empty();
            }).sequential(this.distributedEventFetcher.bulkItems());
        }).count().doOnSubscribe(disposable -> launchTime.set(Instant.now().toEpochMilli())).doOnSuccess(count -> {
            String logMsg = String.format("%s %s(s) synchronized in %sms", count, this.distributedEventType().name().toLowerCase(), System.currentTimeMillis() - launchTime.get());
            if (initialSync) {
                log.info(logMsg);
            } else {
                log.debug(logMsg);
            }
        }).ignoreElement();
    }

    protected Set<DistributedSyncAction> syncActions(boolean initialSync) {
        return initialSync ? INIT_SYNC_ACTIONS : INCREMENTAL_SYNC_ACTIONS;
    }

    protected abstract DistributedEventType distributedEventType();

    protected abstract Maybe<T> mapTo(DistributedEvent var1);

    protected abstract Y createDeployer();

    private Flowable<T> deploy(Y apiDeployer, T deployable) {
        return apiDeployer.deploy(deployable).andThen((CompletableSource)apiDeployer.doAfterDeployment(deployable)).andThen((Publisher)Flowable.just(deployable));
    }

    private Flowable<T> undeploy(Y apiDeployer, T deployable) {
        return apiDeployer.undeploy(deployable).andThen((CompletableSource)apiDeployer.doAfterUndeployment(deployable)).andThen((Publisher)Flowable.just(deployable));
    }

    @Generated
    public AbstractDistributedSynchronizer(DistributedEventFetcher distributedEventFetcher, ThreadPoolExecutor syncFetcherExecutor, ThreadPoolExecutor syncDeployerExecutor) {
        this.distributedEventFetcher = distributedEventFetcher;
        this.syncFetcherExecutor = syncFetcherExecutor;
        this.syncDeployerExecutor = syncDeployerExecutor;
    }
}

