/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.client.common.topics;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Message;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.ErrorsHelper;
import com.oracle.coherence.grpc.MessageHelper;
import com.oracle.coherence.grpc.TopicHelper;
import com.oracle.coherence.grpc.client.common.GrpcConnection;
import com.oracle.coherence.grpc.client.common.SimpleStreamObserver;
import com.oracle.coherence.grpc.client.common.topics.GrpcNamedTopicConnector;
import com.oracle.coherence.grpc.client.common.topics.TopicServiceGrpcConnection;
import com.oracle.coherence.grpc.messages.common.v1.CollectionOfInt32;
import com.oracle.coherence.grpc.messages.common.v1.ErrorMessage;
import com.oracle.coherence.grpc.messages.topic.v1.ChannelAndPosition;
import com.oracle.coherence.grpc.messages.topic.v1.CommitResponse;
import com.oracle.coherence.grpc.messages.topic.v1.CommitResponseStatus;
import com.oracle.coherence.grpc.messages.topic.v1.EnsureSubscriptionRequest;
import com.oracle.coherence.grpc.messages.topic.v1.InitializeSubscriptionRequest;
import com.oracle.coherence.grpc.messages.topic.v1.InitializeSubscriptionResponse;
import com.oracle.coherence.grpc.messages.topic.v1.MapOfChannelAndPosition;
import com.oracle.coherence.grpc.messages.topic.v1.ReceiveRequest;
import com.oracle.coherence.grpc.messages.topic.v1.ReceiveResponse;
import com.oracle.coherence.grpc.messages.topic.v1.ReceiveStatus;
import com.oracle.coherence.grpc.messages.topic.v1.SeekRequest;
import com.oracle.coherence.grpc.messages.topic.v1.SeekResponse;
import com.oracle.coherence.grpc.messages.topic.v1.SeekedPositions;
import com.oracle.coherence.grpc.messages.topic.v1.TopicElement;
import com.oracle.coherence.grpc.messages.topic.v1.TopicPosition;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceRequestType;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceResponse;
import com.tangosol.internal.net.topic.BaseRemoteSubscriber;
import com.tangosol.internal.net.topic.ReceiveResult;
import com.tangosol.internal.net.topic.SeekResult;
import com.tangosol.internal.net.topic.SimpleReceiveResult;
import com.tangosol.internal.net.topic.SubscriberConnector;
import com.tangosol.internal.net.topic.TopicSubscription;
import com.tangosol.internal.net.topic.impl.paged.agent.SeekProcessor;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.io.Serializer;
import com.tangosol.net.RequestIncompleteException;
import com.tangosol.net.RequestTimeoutException;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.TopicDependencies;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class GrpcSubscriberConnector<V>
extends BaseRemoteSubscriber<V>
implements GrpcConnection.ConnectionListener {
    private final GrpcNamedTopicConnector<?> f_connector;
    private final TopicServiceGrpcConnection f_connection;
    private final GrpcConnection.Listener<TopicServiceResponse> f_listener;
    private final int f_nProxyId;
    private final Serializer f_serializer;
    private SubscriberConnector.ConnectedSubscriber<V> m_subscriber;

    public GrpcSubscriberConnector(GrpcNamedTopicConnector<?> connector, int proxyId, TopicServiceGrpcConnection connection, String sTopicName, SubscriberId subscriberId, SubscriberGroupId groupId) {
        super(sTopicName, subscriberId, groupId);
        this.f_connector = connector;
        this.f_nProxyId = proxyId;
        this.f_connection = connection;
        this.f_serializer = connector.getTopicService().getSerializer();
        SimpleStreamObserver<TopicServiceResponse> eventObserver = new SimpleStreamObserver<TopicServiceResponse>(this::onEvent);
        this.f_listener = new GrpcConnection.Listener<TopicServiceResponse>(eventObserver, r -> r.getProxyId() == proxyId);
        connection.addResponseObserver(this.f_listener);
        connection.addConnectionListener(this);
    }

    public void postConstruct(SubscriberConnector.ConnectedSubscriber<V> subscriber) {
        this.m_subscriber = subscriber;
    }

    public boolean isActive() {
        return this.f_connection.isConnected();
    }

    public void ensureConnected() {
    }

    public void close() {
        this.f_connection.removeConnectionListener(this);
        if (this.f_listener != null) {
            this.f_connection.removeResponseObserver(this.f_listener);
        }
    }

    public Position[] initialize(SubscriberConnector.ConnectedSubscriber<V> subscriber, boolean fForceReconnect, boolean fReconnect, boolean fDisconnected) {
        InitializeSubscriptionRequest request = InitializeSubscriptionRequest.newBuilder().setDisconnected(fDisconnected).setForceReconnect(fForceReconnect).setReconnect(fReconnect).build();
        InitializeSubscriptionResponse response = this.send(TopicServiceRequestType.InitializeSubscription, (Message)request, InitializeSubscriptionResponse.class);
        this.m_subscriptionId = response.getSubscriptionId();
        this.m_connectionTimestamp = Timestamps.toMillis((Timestamp)response.getTimestamp());
        return (Position[])response.getHeadsList().stream().map(TopicHelper::fromProtobufPosition).toArray(Position[]::new);
    }

    public boolean ensureSubscription(SubscriberConnector.ConnectedSubscriber<V> subscriber, long subscriptionId, boolean fForceReconnect) {
        EnsureSubscriptionRequest request = EnsureSubscriptionRequest.newBuilder().setSubscriptionId(subscriptionId).setForceReconnect(fForceReconnect).build();
        BoolValue value = this.send(TopicServiceRequestType.EnsureSubscription, (Message)request, BoolValue.class);
        return value != null && value.getValue();
    }

    protected void sendHeartbeat(boolean fAsync) {
        if (fAsync) {
            this.poll(TopicServiceRequestType.SubscriberHeartbeat, (Message)BoolValue.of((boolean)fAsync));
        } else {
            this.send(TopicServiceRequestType.SubscriberHeartbeat, (Message)BoolValue.of((boolean)fAsync));
        }
    }

    protected SimpleReceiveResult receiveInternal(int nChannel, Position headPosition, long lVersion, int cMaxElements) {
        ReceiveRequest request = ReceiveRequest.newBuilder().setChannel(nChannel).setMaxMessages(cMaxElements).build();
        CompletionStage future = this.poll(TopicServiceRequestType.Receive, (Message)request, ReceiveResponse.class).thenApply(this::onReceiveResponse);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (SimpleReceiveResult)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    protected void onEvent(TopicServiceResponse response) {
        this.dispatchEvent(TopicHelper.fromProtobufSubscriberEvent((SubscriberConnector)this, (TopicServiceResponse)response));
    }

    protected SimpleReceiveResult onReceiveResponse(ReceiveResponse response) {
        Position head = TopicHelper.fromProtobufPosition((TopicPosition)response.getHeadPosition());
        ReceiveResult.Status status = switch (response.getStatus()) {
            default -> throw new IncompatibleClassChangeError();
            case ReceiveStatus.ReceiveSuccess -> ReceiveResult.Status.Success;
            case ReceiveStatus.ChannelExhausted -> ReceiveResult.Status.Exhausted;
            case ReceiveStatus.ChannelNotAllocatedChannel -> ReceiveResult.Status.NotAllocatedChannel;
            case ReceiveStatus.UnknownSubscriber -> ReceiveResult.Status.UnknownSubscriber;
            case ReceiveStatus.UNRECOGNIZED -> throw new IllegalArgumentException("Unknown subscriber status: " + String.valueOf(response.getStatus()));
        };
        Queue elements = response.getValuesList().stream().map(BinaryHelper::toBinary).collect(Collectors.toCollection(LinkedList::new));
        return new SimpleReceiveResult(elements, response.getRemainingValues(), status, head);
    }

    protected void commitInternal(int nChannel, Position position, BaseRemoteSubscriber.CommitHandler handler) {
        ChannelAndPosition request = ChannelAndPosition.newBuilder().setChannel(nChannel).setPosition(TopicHelper.toProtobufPosition((Position)position)).build();
        CommitResponse response = this.send(TopicServiceRequestType.CommitPosition, (Message)request, CommitResponse.class);
        Position head = TopicHelper.fromProtobufPosition((TopicPosition)response.getPosition());
        Subscriber.CommitResultStatus status = switch (response.getStatus()) {
            default -> throw new IncompatibleClassChangeError();
            case CommitResponseStatus.Committed -> Subscriber.CommitResultStatus.Committed;
            case CommitResponseStatus.AlreadyCommitted -> Subscriber.CommitResultStatus.AlreadyCommitted;
            case CommitResponseStatus.Rejected -> Subscriber.CommitResultStatus.Rejected;
            case CommitResponseStatus.Unowned -> Subscriber.CommitResultStatus.Unowned;
            case CommitResponseStatus.NothingToCommit -> Subscriber.CommitResultStatus.NothingToCommit;
            case CommitResponseStatus.UNRECOGNIZED -> throw new IllegalArgumentException("Unknown subscriber status: " + String.valueOf(response.getStatus()));
        };
        RequestIncompleteException error = null;
        if (response.hasError()) {
            error = ErrorsHelper.createException((ErrorMessage)response.getError(), (Serializer)this.f_serializer);
        }
        Subscriber.CommitResult result = new Subscriber.CommitResult(nChannel, position, status, (Throwable)error);
        handler.committed(result, head);
    }

    public Subscriber.Element<V> peek(int nChannel, Position position) {
        ChannelAndPosition request = ChannelAndPosition.newBuilder().setChannel(nChannel).setPosition(TopicHelper.toProtobufPosition((Position)position)).build();
        CompletionStage future = this.poll(TopicServiceRequestType.PeekAtPosition, (Message)request, TopicElement.class).thenApply(this::createElement);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (Subscriber.Element)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    protected Subscriber.Element<V> createElement(TopicElement element) {
        return this.m_subscriber.createElement(BinaryHelper.toBinary((ByteString)element.getValue()), element.getChannel());
    }

    public int getRemainingMessages(SubscriberGroupId groupId, int[] anChannel) {
        return this.f_connector.getRemainingMessages(groupId.getGroupName(), anChannel);
    }

    public boolean isCommitted(SubscriberGroupId groupId, int nChannel, Position position) {
        ChannelAndPosition request = ChannelAndPosition.newBuilder().setChannel(nChannel).setPosition(TopicHelper.toProtobufPosition((Position)position)).build();
        CompletionStage future = this.poll(TopicServiceRequestType.IsPositionCommitted, (Message)request, BoolValue.class).thenApply(value -> value != null && value.getValue());
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (Boolean)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    public TopicSubscription getSubscription(SubscriberConnector.ConnectedSubscriber<V> subscriber, long id) {
        throw new UnsupportedOperationException();
    }

    public SortedSet<Integer> getOwnedChannels(SubscriberConnector.ConnectedSubscriber<V> subscriber) {
        CompletionStage future = this.poll(TopicServiceRequestType.GetOwnedChannels, CollectionOfInt32.class).thenApply(col -> new TreeSet(col.getValuesList()));
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (SortedSet)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    public Map<Integer, Position> getTopicHeads(int[] anChannel) {
        CollectionOfInt32 col = MessageHelper.toCollectionOfInt32((int[])anChannel);
        CompletionStage future = this.poll(TopicServiceRequestType.GetSubscriberHeads, (Message)col, MapOfChannelAndPosition.class).thenApply(TopicHelper::fromProtobufChannelAndPosition);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (Map)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    public Map<Integer, Position> getTopicTails() {
        CompletionStage future = this.poll(TopicServiceRequestType.GetTails, MapOfChannelAndPosition.class).thenApply(TopicHelper::fromProtobufChannelAndPosition);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (Map)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    public Map<Integer, Position> getLastCommittedInGroup(SubscriberGroupId groupId) {
        CompletionStage future = this.poll(TopicServiceRequestType.GetLastCommited, MapOfChannelAndPosition.class).thenApply(TopicHelper::fromProtobufChannelAndPosition);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (Map)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    public Map<Integer, SeekResult> seekToPosition(SubscriberConnector.ConnectedSubscriber<V> subscriber, Map<Integer, Position> map) {
        SeekRequest request = SeekRequest.newBuilder().setByPosition(TopicHelper.toProtobufChannelAndPosition(map)).build();
        CompletionStage future = this.poll(TopicServiceRequestType.SeekSubscriber, (Message)request, SeekResponse.class).thenApply(this::handleSeekResponse);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (Map)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    public Map<Integer, SeekResult> seekToTimestamp(SubscriberConnector.ConnectedSubscriber<V> subscriber, Map<Integer, Instant> map) {
        SeekRequest request = SeekRequest.newBuilder().setByTimestamp(TopicHelper.toProtobufChannelAndTimestamp(map)).build();
        CompletionStage future = this.poll(TopicServiceRequestType.SeekSubscriber, (Message)request, SeekResponse.class).thenApply(this::handleSeekResponse);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (Map)((CompletableFuture)future).get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            ((CompletableFuture)future).cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    protected Map<Integer, SeekResult> handleSeekResponse(SeekResponse response) {
        HashMap<Integer, SeekResult> map = new HashMap<Integer, SeekResult>();
        Map seekedMap = response.getPositionsMap();
        for (Map.Entry entry : seekedMap.entrySet()) {
            SeekedPositions pos = (SeekedPositions)entry.getValue();
            Position head = TopicHelper.fromProtobufPosition((TopicPosition)pos.getHead());
            Position seeked = TopicHelper.fromProtobufPosition((TopicPosition)pos.getSeekedTo());
            map.put((Integer)entry.getKey(), (SeekResult)new SeekProcessor.Result(head, seeked));
        }
        return map;
    }

    public void closeSubscription(SubscriberConnector.ConnectedSubscriber<V> subscriber, boolean fDestroyed) {
        if (this.f_connection.isConnected()) {
            this.f_connection.send(0, TopicServiceRequestType.DestroySubscriber, (Message)Int32Value.of((int)this.f_nProxyId));
            this.f_connection.close();
        }
    }

    public TopicDependencies getTopicDependencies() {
        return this.f_connector.getTopicService().getTopicBackingMapManager().getTopicDependencies(this.f_sTopicName);
    }

    @Override
    public void onConnectionEvent(GrpcConnection.ConnectionEvent event) {
        if (event.getType() == GrpcConnection.ConnectionEvent.Type.Disconnected) {
            this.dispatchEvent(new SubscriberConnector.SubscriberEvent((SubscriberConnector)this, SubscriberConnector.SubscriberEvent.Type.Disconnected));
        }
    }

    protected long getRequestTimeoutMillis() {
        return this.f_connector.getTopicService().getDependencies().getRequestTimeoutMillis();
    }

    protected CompletableFuture<TopicServiceResponse> poll(TopicServiceRequestType type) {
        return this.f_connection.poll(this.f_nProxyId, type);
    }

    protected CompletableFuture<TopicServiceResponse> poll(TopicServiceRequestType type, Message message) {
        return this.f_connection.poll(this.f_nProxyId, type, message);
    }

    protected <M extends Message> CompletableFuture<M> poll(TopicServiceRequestType requestType, Class<M> resultType) {
        return this.f_connection.poll(this.f_nProxyId, requestType).thenApply(m -> this.f_connection.unpackMessage((TopicServiceResponse)m, resultType));
    }

    protected <M extends Message> CompletableFuture<M> poll(TopicServiceRequestType requestType, Message message, Class<M> resultType) {
        return this.f_connection.poll(this.f_nProxyId, requestType, message).thenApply(m -> this.f_connection.unpackMessage((TopicServiceResponse)m, resultType));
    }

    protected TopicServiceResponse send(TopicServiceRequestType requestType, Message message) {
        CompletableFuture<TopicServiceResponse> future = this.poll(requestType, message);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return future.get(cMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            future.cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }

    protected <M extends Message> M send(TopicServiceRequestType requestType, Message message, Class<M> resultType) {
        CompletableFuture<M> future = this.poll(requestType, message, resultType);
        long cMillis = this.getRequestTimeoutMillis();
        try {
            return (M)((Message)future.get(cMillis, TimeUnit.MILLISECONDS));
        }
        catch (ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
        catch (InterruptedException | TimeoutException e) {
            future.cancel(true);
            throw new RequestTimeoutException((Throwable)e);
        }
    }
}

