/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.impl;

import com.google.protobuf.Empty;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.event.SnapshotChannel;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.FutureStreamObserver;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.event.dcb.AddSnapshotRequest;
import io.axoniq.axonserver.grpc.event.dcb.AddSnapshotResponse;
import io.axoniq.axonserver.grpc.event.dcb.DcbSnapshotStoreGrpc;
import io.axoniq.axonserver.grpc.event.dcb.DeleteSnapshotsRequest;
import io.axoniq.axonserver.grpc.event.dcb.DeleteSnapshotsResponse;
import io.axoniq.axonserver.grpc.event.dcb.GetLastSnapshotRequest;
import io.axoniq.axonserver.grpc.event.dcb.GetLastSnapshotResponse;
import io.axoniq.axonserver.grpc.event.dcb.ListSnapshotsRequest;
import io.axoniq.axonserver.grpc.event.dcb.ListSnapshotsResponse;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;

public class SnapshotChannelImpl
extends AbstractAxonServerChannel<Void>
implements SnapshotChannel {
    private final DcbSnapshotStoreGrpc.DcbSnapshotStoreStub snapshotStore;
    private final ClientIdentification clientIdentification;
    private final Set<ResultStream<ListSnapshotsResponse>> buffers = ConcurrentHashMap.newKeySet();
    private static final int BUFFER_SIZE = 512;
    private static final int REFILL_BATCH = 16;

    public SnapshotChannelImpl(ClientIdentification clientIdentification, ScheduledExecutorService executor, AxonServerManagedChannel axonServerManagedChannel) {
        super(clientIdentification, executor, axonServerManagedChannel);
        this.snapshotStore = DcbSnapshotStoreGrpc.newStub((Channel)axonServerManagedChannel);
        this.clientIdentification = clientIdentification;
    }

    @Override
    public CompletableFuture<AddSnapshotResponse> addSnapshot(AddSnapshotRequest request) {
        FutureStreamObserver<AddSnapshotResponse> res = new FutureStreamObserver<AddSnapshotResponse>(null);
        this.snapshotStore.add(request, res);
        return res;
    }

    @Override
    public CompletableFuture<DeleteSnapshotsResponse> deleteSnapshots(DeleteSnapshotsRequest request) {
        FutureStreamObserver<DeleteSnapshotsResponse> res = new FutureStreamObserver<DeleteSnapshotsResponse>(null);
        this.snapshotStore.delete(request, res);
        return res;
    }

    @Override
    public ResultStream<ListSnapshotsResponse> listSnapshots(ListSnapshotsRequest request) {
        AbstractBufferedStream<ListSnapshotsResponse, Empty> buffer = new AbstractBufferedStream<ListSnapshotsResponse, Empty>(this.clientIdentification.getClientId(), 512, 16){

            @Override
            protected ListSnapshotsResponse terminalMessage() {
                return ListSnapshotsResponse.newBuilder().build();
            }

            @Override
            protected Empty buildFlowControlMessage(FlowControl flowControl) {
                return null;
            }
        };
        this.buffers.add((ResultStream<ListSnapshotsResponse>)buffer);
        buffer.onCloseRequested(() -> this.buffers.remove(buffer));
        try {
            this.snapshotStore.list(request, (StreamObserver<ListSnapshotsResponse>)buffer);
        }
        catch (Exception e) {
            this.buffers.remove(buffer);
            throw e;
        }
        return buffer;
    }

    @Override
    public CompletableFuture<GetLastSnapshotResponse> getLastSnapshot(GetLastSnapshotRequest request) {
        FutureStreamObserver<GetLastSnapshotResponse> res = new FutureStreamObserver<GetLastSnapshotResponse>(null);
        this.snapshotStore.getLast(request, res);
        return res;
    }

    @Override
    public void connect() {
    }

    @Override
    public void reconnect() {
        this.closeBuffers();
    }

    @Override
    public void disconnect() {
        this.closeBuffers();
    }

    @Override
    public boolean isReady() {
        return true;
    }

    private void closeBuffers() {
        this.buffers.forEach(ResultStream::close);
        this.buffers.clear();
    }
}

