/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.transformation.impl.grpc;

import io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.grpc.event.DeletedEvent;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.TransformRequest;
import io.axoniq.axonserver.grpc.event.TransformationId;
import io.axoniq.axonserver.grpc.event.TransformedEvent;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrpcTransformationStream
implements EventTransformationService.TransformationStream {
    private final Logger logger = LoggerFactory.getLogger(GrpcTransformationStream.class);
    private final String transformationId;
    private final Map<Long, CompletableFuture<Void>> pendingRequests = new ConcurrentHashMap<Long, CompletableFuture<Void>>();
    private final StreamObserver<TransformRequest> requestStreamObserver;
    private final AtomicReference<Consumer<Throwable>> onCompletedByServerListener = new AtomicReference();
    private final AtomicReference<Throwable> completed = new AtomicReference();

    public GrpcTransformationStream(String transformationId, StreamObserver<TransformRequest> requestStreamObserver, Consumer<Consumer<Long>> ackListenerRegistration, Consumer<Consumer<Throwable>> onCompleteRegistration) {
        this.transformationId = transformationId;
        this.requestStreamObserver = requestStreamObserver;
        ackListenerRegistration.accept(this::onTransformationActionAck);
        onCompleteRegistration.accept(this::completedByServer);
    }

    @Nonnull
    private static TransformedEvent transformedEvent(long token, Event event) {
        return TransformedEvent.newBuilder().setEvent(event).setToken(token).build();
    }

    @Override
    public CompletableFuture<Void> deleteEvent(long token, long sequence) {
        this.checkCompleted();
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.pendingRequests.put(sequence, completableFuture);
        this.logger.trace("Sending delete event {} to Axon Server.", (Object)token);
        this.requestStreamObserver.onNext((Object)TransformRequest.newBuilder().setTransformationId(this.transformationId()).setSequence(sequence).setDeleteEvent(this.deleteEvent(token)).build());
        return completableFuture;
    }

    private DeletedEvent deleteEvent(long token) {
        return DeletedEvent.newBuilder().setToken(token).build();
    }

    private void checkCompleted() {
        Throwable throwable = this.completed.get();
        if (throwable != null) {
            throw new StreamClosedException(throwable);
        }
    }

    @Override
    public CompletableFuture<Void> replaceEvent(long token, Event event, long sequence) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.pendingRequests.put(sequence, completableFuture);
        this.logger.trace("Sending replace event {} to Axon Server.", (Object)token);
        this.requestStreamObserver.onNext((Object)TransformRequest.newBuilder().setTransformationId(this.transformationId()).setSequence(sequence).setReplaceEvent(GrpcTransformationStream.transformedEvent(token, event)).build());
        return completableFuture;
    }

    @Nonnull
    private TransformationId transformationId() {
        return TransformationId.newBuilder().setId(this.transformationId).build();
    }

    @Override
    public void complete() {
        this.completed.compareAndSet(null, new RuntimeException("Completed by the client"));
        this.requestStreamObserver.onCompleted();
    }

    @Override
    public void onCompletedByServer(Consumer<Throwable> onCompleted) {
        this.onCompletedByServerListener.set(onCompleted);
    }

    private void completedByServer(Throwable error) {
        this.completed.compareAndSet(null, error);
        this.logger.warn("Transformation stream completed by server with error: ", error);
        this.completePending(error);
        Consumer<Throwable> completedByServerCallback = this.onCompletedByServerListener.get();
        if (completedByServerCallback != null) {
            completedByServerCallback.accept(error);
        }
    }

    private void completePending(Throwable error) {
        this.pendingRequests.forEach((sequence, result) -> {
            result.completeExceptionally(error);
            this.pendingRequests.remove(sequence);
        });
    }

    private void onTransformationActionAck(long sequence) {
        this.logger.trace("Acknowledge received for transformation sequence {}. ", (Object)sequence);
        this.pendingRequests.computeIfPresent(sequence, (key, appendOperation) -> {
            appendOperation.complete(null);
            return null;
        });
    }
}

