/*
 * Decompiled with CFR 0.152.
 */
package de.codecentric.boot.admin.server.domain.entities;

import de.codecentric.boot.admin.server.domain.entities.EventsourcingInstanceRepository;
import de.codecentric.boot.admin.server.domain.entities.Instance;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import de.codecentric.boot.admin.server.eventstore.InstanceEventStore;
import de.codecentric.boot.admin.server.eventstore.OptimisticLockingException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class SnapshottingInstanceRepository
extends EventsourcingInstanceRepository {
    private static final Logger log = LoggerFactory.getLogger(SnapshottingInstanceRepository.class);
    private final ConcurrentMap<InstanceId, Instance> snapshots = new ConcurrentHashMap<InstanceId, Instance>();
    private final Set<InstanceId> outdatedSnapshots = ConcurrentHashMap.newKeySet();
    private final InstanceEventStore eventStore;
    private @Nullable Disposable subscription;

    public SnapshottingInstanceRepository(InstanceEventStore eventStore) {
        super(eventStore);
        this.eventStore = eventStore;
    }

    @Override
    public Flux<Instance> findAll() {
        return Mono.fromSupplier(this.snapshots::values).flatMapIterable(Function.identity());
    }

    @Override
    public Mono<Instance> find(InstanceId id) {
        return Mono.defer(() -> {
            if (!this.outdatedSnapshots.contains(id)) {
                return Mono.justOrEmpty((Object)((Instance)this.snapshots.get(id)));
            }
            return this.rehydrateSnapshot(id).doOnSuccess(v -> this.outdatedSnapshots.remove(v.getId()));
        });
    }

    @Override
    public Mono<Instance> save(Instance instance) {
        return super.save(instance).doOnError(OptimisticLockingException.class, e -> this.outdatedSnapshots.add(instance.getId()));
    }

    public void start() {
        this.subscription = this.eventStore.findAll().concatWith((Publisher)this.eventStore).subscribe(this::updateSnapshot);
    }

    public void stop() {
        if (this.subscription != null) {
            this.subscription.dispose();
            this.subscription = null;
        }
    }

    protected Mono<Instance> rehydrateSnapshot(InstanceId id) {
        return super.find(id).map(instance -> this.snapshots.compute(id, (? super K key, ? super V snapshot) -> {
            if (snapshot == null || instance.getVersion() >= snapshot.getVersion()) {
                return instance;
            }
            return snapshot;
        }));
    }

    protected void updateSnapshot(InstanceEvent event) {
        try {
            this.snapshots.compute(event.getInstance(), (? super K key, ? super V old) -> {
                Instance instance;
                Instance instance2 = instance = old != null ? old : Instance.create(key);
                if (event.getVersion() > instance.getVersion()) {
                    return instance.apply(event);
                }
                return instance;
            });
        }
        catch (Exception ex) {
            log.warn("Error while updating the snapshot with event {}", (Object)event, (Object)ex);
        }
    }
}

