package org.factcast.store.internal.catchup;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.core.Fact;
import org.factcast.core.subscription.SubscriptionImpl;
import org.factcast.core.subscription.TransformationException;
import org.factcast.core.subscription.transformation.FactTransformerService;
import org.factcast.core.subscription.transformation.FactTransformers;
import org.factcast.core.subscription.transformation.TransformationRequest;
import org.factcast.store.internal.AbstractFactInterceptor;
import org.factcast.store.internal.Pair;
import org.factcast.store.internal.PgMetrics;
import org.factcast.store.internal.filter.FactFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/factcast/store/internal/catchup/BufferingFactInterceptor.class */
public class BufferingFactInterceptor extends AbstractFactInterceptor {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BufferingFactInterceptor.class);
    private final FactTransformerService service;
    private final FactTransformers transformers;
    private final FactFilter filter;
    private final SubscriptionImpl targetSubscription;
    private final int maxBufferSize;
    private Mode mode;
    private final List<Pair<TransformationRequest, CompletableFuture<Fact>>> buffer;
    private final Map<UUID, CompletableFuture<Fact>> index;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/factcast/store/internal/catchup/BufferingFactInterceptor$Mode.class */
    public enum Mode {
        DIRECT,
        BUFFERING
    }

    public BufferingFactInterceptor(FactTransformerService factTransformerService, FactTransformers factTransformers, FactFilter factFilter, SubscriptionImpl subscriptionImpl, int i, PgMetrics pgMetrics) {
        super(pgMetrics);
        this.mode = Mode.DIRECT;
        this.service = factTransformerService;
        this.transformers = factTransformers;
        this.filter = factFilter;
        this.targetSubscription = subscriptionImpl;
        this.maxBufferSize = i;
        this.buffer = new ArrayList(i);
        this.index = new HashMap(i);
    }

    @Override // java.util.function.Consumer
    public void accept(@NonNull Fact fact) {
        Objects.requireNonNull(fact, "f is marked non-null but is null");
        if (this.filter.test(fact)) {
            TransformationRequest prepareTransformation = this.transformers.prepareTransformation(fact);
            if (this.mode == Mode.DIRECT) {
                acceptInDirectMode(fact, prepareTransformation);
            } else if (this.mode == Mode.BUFFERING) {
                acceptInBufferingMode(fact, prepareTransformation);
            }
        }
    }

    private void acceptInBufferingMode(@NonNull Fact fact, TransformationRequest transformationRequest) {
        Objects.requireNonNull(fact, "f is marked non-null but is null");
        if (transformationRequest == null) {
            this.buffer.add(completedTransformation(fact));
        } else {
            addScheduledTransformationToBuffer(scheduledTransformation(transformationRequest));
        }
    }

    private void addScheduledTransformationToBuffer(Pair<TransformationRequest, CompletableFuture<Fact>> pair) {
        this.buffer.add(pair);
        this.index.put(pair.left().toTransform().id(), pair.right());
        if (this.buffer.size() >= this.maxBufferSize) {
            flush();
        }
    }

    private void acceptInDirectMode(@NonNull Fact fact, TransformationRequest transformationRequest) {
        Objects.requireNonNull(fact, "f is marked non-null but is null");
        if (transformationRequest == null) {
            this.targetSubscription.notifyElement(fact);
            increaseNotifyMetric(1);
        } else {
            this.mode = Mode.BUFFERING;
            addScheduledTransformationToBuffer(scheduledTransformation(transformationRequest));
        }
    }

    @NonNull
    private Pair<TransformationRequest, CompletableFuture<Fact>> scheduledTransformation(@NonNull TransformationRequest transformationRequest) {
        Objects.requireNonNull(transformationRequest, "transformationRequest is marked non-null but is null");
        return Pair.of(transformationRequest, new CompletableFuture());
    }

    @NonNull
    private Pair<TransformationRequest, CompletableFuture<Fact>> completedTransformation(@NonNull Fact fact) {
        Objects.requireNonNull(fact, "f is marked non-null but is null");
        return Pair.of(null, CompletableFuture.completedFuture(fact));
    }

    public void flush() {
        if (this.buffer.isEmpty()) {
            return;
        }
        log.trace("flushing buffer of size " + this.buffer.size());
        List list = (List) this.buffer.stream().map((v0) -> {
            return v0.left();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        if (!list.isEmpty()) {
            CompletableFuture.runAsync(() -> {
                try {
                    this.service.transform(list).forEach(fact -> {
                        CompletableFuture<Fact> completableFuture = this.index.get(fact.id());
                        if (completableFuture != null) {
                            completableFuture.complete(fact);
                        } else {
                            log.warn("found unexpected fact id after transformation: {}", fact.id());
                        }
                    });
                } catch (Exception e) {
                    this.index.values().forEach(completableFuture -> {
                        if (completableFuture.isDone()) {
                            return;
                        }
                        completableFuture.completeExceptionally(e);
                    });
                }
            });
        }
        this.buffer.forEach(pair -> {
            try {
                this.targetSubscription.notifyElement((Fact) ((CompletableFuture) pair.right()).get(30L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new TransformationException(e);
            } catch (ExecutionException | TimeoutException e2) {
                throw new TransformationException(e2);
            }
        });
        increaseNotifyMetric(this.buffer.size());
        this.buffer.clear();
        this.index.clear();
    }
}
