/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.event.AggregateEventStream;
import io.axoniq.axonserver.connector.event.AppendEventsTransaction;
import io.axoniq.axonserver.connector.event.EventChannel;
import io.axoniq.axonserver.connector.event.EventStream;
import io.axoniq.axonserver.connector.event.impl.AppendEventsTransactionImpl;
import io.axoniq.axonserver.connector.event.impl.BufferedAggregateEventStream;
import io.axoniq.axonserver.connector.event.impl.BufferedEventStream;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.FutureStreamObserver;
import io.axoniq.axonserver.grpc.event.Confirmation;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.EventStoreGrpc;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import io.axoniq.axonserver.grpc.event.GetAggregateSnapshotsRequest;
import io.axoniq.axonserver.grpc.event.GetFirstTokenRequest;
import io.axoniq.axonserver.grpc.event.GetLastTokenRequest;
import io.axoniq.axonserver.grpc.event.GetTokenAtRequest;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrRequest;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrResponse;
import io.axoniq.axonserver.grpc.event.TrackingToken;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;

public class EventChannelImpl
extends AbstractAxonServerChannel
implements EventChannel {
    private static final ReadHighestSequenceNrResponse UNKNOWN_HIGHEST_SEQ = ReadHighestSequenceNrResponse.newBuilder().setToSequenceNr(-1L).build();
    private static final TrackingToken NO_TOKEN_AVAILABLE = TrackingToken.newBuilder().setToken(-1L).build();
    private final EventStoreGrpc.EventStoreStub eventStore;
    private final Set<BufferedEventStream> buffers = ConcurrentHashMap.newKeySet();

    public EventChannelImpl(ScheduledExecutorService executor, AxonServerManagedChannel channel) {
        super(executor, channel);
        this.eventStore = EventStoreGrpc.newStub((Channel)channel);
    }

    @Override
    public synchronized void connect() {
    }

    @Override
    public void reconnect() {
        this.closeEventStreams();
    }

    @Override
    public void disconnect() {
        this.closeEventStreams();
    }

    private void closeEventStreams() {
        this.buffers.forEach(AbstractBufferedStream::close);
        this.buffers.clear();
    }

    @Override
    public boolean isConnected() {
        return true;
    }

    @Override
    public AppendEventsTransaction startAppendEventsTransaction() {
        FutureStreamObserver<Confirmation> result = new FutureStreamObserver<Confirmation>(null);
        StreamObserver<Event> clientStream = this.eventStore.appendEvent(result);
        return new AppendEventsTransactionImpl(clientStream, result);
    }

    @Override
    public CompletableFuture<Long> findHighestSequence(String aggregateId) {
        FutureStreamObserver<ReadHighestSequenceNrResponse> result = new FutureStreamObserver<ReadHighestSequenceNrResponse>(UNKNOWN_HIGHEST_SEQ);
        this.eventStore.readHighestSequenceNr(ReadHighestSequenceNrRequest.newBuilder().setAggregateId(aggregateId).build(), result);
        return result.thenApply(ReadHighestSequenceNrResponse::getToSequenceNr);
    }

    @Override
    public EventStream openStream(long token, int bufferSize, int refillBatch, boolean forceReadFromLeader) {
        BufferedEventStream buffer = new BufferedEventStream(token, Math.max(64, bufferSize), Math.max(16, Math.min(bufferSize, refillBatch)), forceReadFromLeader);
        this.buffers.add(buffer);
        buffer.onCloseRequested(() -> this.buffers.remove(buffer));
        try {
            this.eventStore.listEvents((StreamObserver<EventWithToken>)buffer);
        }
        catch (Exception e) {
            this.buffers.remove(buffer);
            throw e;
        }
        buffer.enableFlowControl();
        return buffer;
    }

    @Override
    public AggregateEventStream openAggregateStream(String aggregateIdentifier, boolean allowSnapshots) {
        return this.doGetAggregateStream(GetAggregateEventsRequest.newBuilder().setAggregateId(aggregateIdentifier).setAllowSnapshots(allowSnapshots).build());
    }

    @Override
    public AggregateEventStream openAggregateStream(String aggregateIdentifier, long initialSequence, long maxSequence) {
        return this.doGetAggregateStream(GetAggregateEventsRequest.newBuilder().setAggregateId(aggregateIdentifier).setInitialSequence(initialSequence).setMaxSequence(maxSequence).build());
    }

    @Override
    public CompletableFuture<Confirmation> appendSnapshot(Event snapshotEvent) {
        FutureStreamObserver<Confirmation> result = new FutureStreamObserver<Confirmation>(Confirmation.newBuilder().setSuccess(false).build());
        this.eventStore.appendSnapshot(snapshotEvent, result);
        return result;
    }

    @Override
    public AggregateEventStream loadSnapshots(String aggregateIdentifier, long initialSequence, long maxSequence, int maxResults) {
        BufferedAggregateEventStream buffer = new BufferedAggregateEventStream(maxResults);
        this.eventStore.listAggregateSnapshots(GetAggregateSnapshotsRequest.newBuilder().setInitialSequence(initialSequence).setMaxResults(maxResults).setMaxSequence(maxSequence).setAggregateId(aggregateIdentifier).build(), (StreamObserver<Event>)buffer);
        return buffer;
    }

    @Override
    public CompletableFuture<Long> getLastToken() {
        FutureStreamObserver<TrackingToken> result = new FutureStreamObserver<TrackingToken>(NO_TOKEN_AVAILABLE);
        this.eventStore.getLastToken(GetLastTokenRequest.newBuilder().build(), result);
        return result.thenApply(TrackingToken::getToken);
    }

    @Override
    public CompletableFuture<Long> getFirstToken() {
        FutureStreamObserver<TrackingToken> result = new FutureStreamObserver<TrackingToken>(NO_TOKEN_AVAILABLE);
        this.eventStore.getFirstToken(GetFirstTokenRequest.newBuilder().build(), result);
        return result.thenApply(TrackingToken::getToken);
    }

    @Override
    public CompletableFuture<Long> getTokenAt(long instant) {
        FutureStreamObserver<TrackingToken> result = new FutureStreamObserver<TrackingToken>(NO_TOKEN_AVAILABLE);
        this.eventStore.getTokenAt(GetTokenAtRequest.newBuilder().setInstant(instant).build(), result);
        return result.thenApply(TrackingToken::getToken);
    }

    private AggregateEventStream doGetAggregateStream(GetAggregateEventsRequest request) {
        BufferedAggregateEventStream buffer = new BufferedAggregateEventStream();
        this.eventStore.listAggregateEvents(request, (StreamObserver<Event>)buffer);
        return buffer;
    }
}

