/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.impl;

import com.google.protobuf.Empty;
import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.event.DcbEventChannel;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.FutureStreamObserver;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.event.dcb.AddTagsRequest;
import io.axoniq.axonserver.grpc.event.dcb.AddTagsResponse;
import io.axoniq.axonserver.grpc.event.dcb.AppendEventsRequest;
import io.axoniq.axonserver.grpc.event.dcb.AppendEventsResponse;
import io.axoniq.axonserver.grpc.event.dcb.CancelScheduledEventRequest;
import io.axoniq.axonserver.grpc.event.dcb.ConsistencyCondition;
import io.axoniq.axonserver.grpc.event.dcb.DcbEventSchedulerGrpc;
import io.axoniq.axonserver.grpc.event.dcb.DcbEventStoreGrpc;
import io.axoniq.axonserver.grpc.event.dcb.Event;
import io.axoniq.axonserver.grpc.event.dcb.GetHeadRequest;
import io.axoniq.axonserver.grpc.event.dcb.GetHeadResponse;
import io.axoniq.axonserver.grpc.event.dcb.GetSequenceAtRequest;
import io.axoniq.axonserver.grpc.event.dcb.GetSequenceAtResponse;
import io.axoniq.axonserver.grpc.event.dcb.GetTagsRequest;
import io.axoniq.axonserver.grpc.event.dcb.GetTagsResponse;
import io.axoniq.axonserver.grpc.event.dcb.GetTailRequest;
import io.axoniq.axonserver.grpc.event.dcb.GetTailResponse;
import io.axoniq.axonserver.grpc.event.dcb.RemoveTagsRequest;
import io.axoniq.axonserver.grpc.event.dcb.RemoveTagsResponse;
import io.axoniq.axonserver.grpc.event.dcb.RescheduleEventRequest;
import io.axoniq.axonserver.grpc.event.dcb.ScheduleEventRequest;
import io.axoniq.axonserver.grpc.event.dcb.ScheduleToken;
import io.axoniq.axonserver.grpc.event.dcb.SourceEventsRequest;
import io.axoniq.axonserver.grpc.event.dcb.SourceEventsResponse;
import io.axoniq.axonserver.grpc.event.dcb.StreamEventsRequest;
import io.axoniq.axonserver.grpc.event.dcb.StreamEventsResponse;
import io.axoniq.axonserver.grpc.event.dcb.Tag;
import io.axoniq.axonserver.grpc.event.dcb.TaggedEvent;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

public class DcbEventChannelImpl
extends AbstractAxonServerChannel<Void>
implements DcbEventChannel {
    private static final int BUFFER_SIZE = 512;
    private static final int REFILL_BATCH = 16;
    private final DcbEventStoreGrpc.DcbEventStoreStub eventStore;
    private final DcbEventSchedulerGrpc.DcbEventSchedulerStub eventScheduler;
    private final ClientIdentification clientIdentification;
    private final Set<ResultStream<StreamEventsResponse>> buffers = ConcurrentHashMap.newKeySet();

    public DcbEventChannelImpl(ClientIdentification clientIdentification, ScheduledExecutorService executor, AxonServerManagedChannel axonServerManagedChannel) {
        super(clientIdentification, executor, axonServerManagedChannel);
        this.eventStore = DcbEventStoreGrpc.newStub((Channel)axonServerManagedChannel);
        this.eventScheduler = DcbEventSchedulerGrpc.newStub((Channel)axonServerManagedChannel);
        this.clientIdentification = clientIdentification;
    }

    @Override
    public void connect() {
    }

    @Override
    public void reconnect() {
        this.closeBuffers();
    }

    @Override
    public void disconnect() {
        this.closeBuffers();
    }

    private void closeBuffers() {
        this.buffers.forEach(ResultStream::close);
        this.buffers.clear();
    }

    @Override
    public boolean isReady() {
        return true;
    }

    @Override
    public DcbEventChannel.AppendEventsTransaction startTransaction() {
        FutureStreamObserver<AppendEventsResponse> response = new FutureStreamObserver<AppendEventsResponse>(null);
        StreamObserver<AppendEventsRequest> clientStream = this.eventStore.append(response);
        return new AppendEventsTransactionImpl(clientStream, response);
    }

    @Override
    public DcbEventChannel.AppendEventsTransaction startTransaction(ConsistencyCondition condition) {
        FutureStreamObserver<AppendEventsResponse> response = new FutureStreamObserver<AppendEventsResponse>(null);
        StreamObserver<AppendEventsRequest> clientStream = this.eventStore.append(response);
        return new AppendEventsTransactionImpl(clientStream, response).condition(condition);
    }

    @Override
    public ResultStream<StreamEventsResponse> stream(StreamEventsRequest request, int bufferSize, int refillBatch) {
        AbstractBufferedStream<StreamEventsResponse, Empty> buffer = new AbstractBufferedStream<StreamEventsResponse, Empty>(this.clientIdentification.getClientId(), Math.max(64, bufferSize), Math.max(16, Math.min(bufferSize, refillBatch))){

            @Override
            protected Empty buildFlowControlMessage(FlowControl flowControl) {
                return null;
            }

            @Override
            protected StreamEventsResponse terminalMessage() {
                return StreamEventsResponse.getDefaultInstance();
            }
        };
        this.buffers.add((ResultStream<StreamEventsResponse>)buffer);
        buffer.onCloseRequested(() -> this.buffers.remove(buffer));
        try {
            this.eventStore.stream(request, (StreamObserver<StreamEventsResponse>)buffer);
        }
        catch (Exception e) {
            this.buffers.remove(buffer);
            throw e;
        }
        return buffer;
    }

    @Override
    public ResultStream<SourceEventsResponse> source(SourceEventsRequest request) {
        AbstractBufferedStream<SourceEventsResponse, Empty> result = new AbstractBufferedStream<SourceEventsResponse, Empty>(this.clientIdentification.getClientId(), 512, 16){

            @Override
            protected SourceEventsResponse terminalMessage() {
                return SourceEventsResponse.newBuilder().build();
            }

            @Override
            protected Empty buildFlowControlMessage(FlowControl flowControl) {
                return null;
            }
        };
        this.eventStore.source(request, (StreamObserver<SourceEventsResponse>)result);
        return result;
    }

    @Override
    public CompletableFuture<GetTagsResponse> tagsFor(long sequence) {
        FutureStreamObserver<GetTagsResponse> future = new FutureStreamObserver<GetTagsResponse>(null);
        this.eventStore.getTags(GetTagsRequest.newBuilder().setSequence(sequence).build(), future);
        return future;
    }

    @Override
    public CompletableFuture<AppendEventsResponse> append(Collection<TaggedEvent> taggedEvents, ConsistencyCondition condition) {
        if (taggedEvents == null || taggedEvents.isEmpty()) {
            return CompletableFuture.failedFuture(new IllegalArgumentException("taggedEvents must not be null or empty"));
        }
        FutureStreamObserver<AppendEventsResponse> response = new FutureStreamObserver<AppendEventsResponse>(null);
        StreamObserver<AppendEventsRequest> clientStream = this.eventStore.append(response);
        return new AppendEventsTransactionImpl(clientStream, response).append(taggedEvents, condition).commit();
    }

    @Override
    public CompletableFuture<GetHeadResponse> head() {
        FutureStreamObserver<GetHeadResponse> future = new FutureStreamObserver<GetHeadResponse>(null);
        this.eventStore.getHead(GetHeadRequest.getDefaultInstance(), future);
        return future;
    }

    @Override
    public CompletableFuture<GetTailResponse> tail() {
        FutureStreamObserver<GetTailResponse> future = new FutureStreamObserver<GetTailResponse>(null);
        this.eventStore.getTail(GetTailRequest.getDefaultInstance(), future);
        return future;
    }

    @Override
    public CompletableFuture<GetSequenceAtResponse> getSequenceAt(Instant timestamp) {
        FutureStreamObserver<GetSequenceAtResponse> future = new FutureStreamObserver<GetSequenceAtResponse>(null);
        this.eventStore.getSequenceAt(GetSequenceAtRequest.newBuilder().setTimestamp(timestamp.toEpochMilli()).build(), future);
        return future;
    }

    @Override
    public CompletableFuture<AddTagsResponse> addTags(long sequence, Collection<Tag> tags) {
        FutureStreamObserver<AddTagsResponse> future = new FutureStreamObserver<AddTagsResponse>(null);
        AddTagsRequest request = AddTagsRequest.newBuilder().setSequence(sequence).addAllTag(tags).build();
        this.eventStore.addTags(request, future);
        return future;
    }

    @Override
    public CompletableFuture<RemoveTagsResponse> removeTags(long sequence, Collection<Tag> tags) {
        FutureStreamObserver<RemoveTagsResponse> future = new FutureStreamObserver<RemoveTagsResponse>(null);
        RemoveTagsRequest request = RemoveTagsRequest.newBuilder().setSequence(sequence).addAllTag(tags).build();
        this.eventStore.removeTags(request, future);
        return future;
    }

    @Override
    public CompletableFuture<String> scheduleEvent(Instant scheduleTime, Event event) {
        FutureStreamObserver<ScheduleToken> responseObserver = new FutureStreamObserver<ScheduleToken>(new AxonServerException(ErrorCategory.INSTRUCTION_ACK_ERROR, "An unknown error occurred while scheduling an Event. No response received from Server.", ""));
        this.eventScheduler.scheduleEvent(ScheduleEventRequest.newBuilder().setEvent(event).setInstant(scheduleTime.toEpochMilli()).build(), responseObserver);
        return responseObserver.thenApply(ScheduleToken::getToken);
    }

    @Override
    public CompletableFuture<InstructionAck> cancelSchedule(String token) {
        FutureStreamObserver<InstructionAck> responseObserver = new FutureStreamObserver<InstructionAck>(new AxonServerException(ErrorCategory.INSTRUCTION_ACK_ERROR, "An unknown error occurred while cancelling a scheduled Event. No response received from Server.", ""));
        this.eventScheduler.cancelScheduledEvent(CancelScheduledEventRequest.newBuilder().setToken(token).build(), responseObserver);
        return responseObserver;
    }

    @Override
    public CompletableFuture<String> reschedule(String scheduleToken, Instant scheduleTime, @Nullable Event event) {
        FutureStreamObserver<ScheduleToken> responseObserver = new FutureStreamObserver<ScheduleToken>(new AxonServerException(ErrorCategory.INSTRUCTION_ACK_ERROR, "An unknown error occurred while rescheduling Event. No response received from Server.", ""));
        RescheduleEventRequest.Builder builder = RescheduleEventRequest.newBuilder().setToken(scheduleToken).setInstant(scheduleTime.toEpochMilli());
        if (event != null) {
            builder.setEvent(event);
        }
        this.eventScheduler.rescheduleEvent(builder.build(), responseObserver);
        return responseObserver.thenApply(ScheduleToken::getToken);
    }

    private static class AppendEventsTransactionImpl
    implements DcbEventChannel.AppendEventsTransaction {
        private final StreamObserver<AppendEventsRequest> stream;
        private final CompletableFuture<AppendEventsResponse> result;
        private final AtomicBoolean conditionSet = new AtomicBoolean(false);

        AppendEventsTransactionImpl(StreamObserver<AppendEventsRequest> stream, CompletableFuture<AppendEventsResponse> result) {
            this.stream = stream;
            this.result = result;
        }

        private DcbEventChannel.AppendEventsTransaction append(Collection<TaggedEvent> taggedEvent, ConsistencyCondition condition) throws IllegalStateException {
            this.stream.onNext((Object)this.createConsistencyCondition(condition).addAllEvent(taggedEvent).build());
            return this;
        }

        private AppendEventsRequest.Builder createConsistencyCondition(ConsistencyCondition consistencyCondition) throws IllegalStateException {
            if (this.conditionSet.compareAndSet(false, true)) {
                return AppendEventsRequest.newBuilder().setCondition(consistencyCondition);
            }
            throw new IllegalStateException("Consistency Condition already set.");
        }

        public DcbEventChannel.AppendEventsTransaction condition(ConsistencyCondition condition) {
            this.stream.onNext((Object)this.createConsistencyCondition(condition).build());
            return this;
        }

        @Override
        public DcbEventChannel.AppendEventsTransaction append(TaggedEvent taggedEvent) {
            this.stream.onNext((Object)AppendEventsRequest.newBuilder().addEvent(taggedEvent).build());
            return this;
        }

        @Override
        public CompletableFuture<AppendEventsResponse> commit() {
            this.stream.onCompleted();
            return this.result;
        }

        @Override
        public void rollback() {
            this.stream.onError((Throwable)new StatusRuntimeException(Status.CANCELLED));
        }
    }
}

