/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.client.common.topics;

import com.google.protobuf.Any;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.oracle.coherence.grpc.MessageHelper;
import com.oracle.coherence.grpc.client.common.BaseClientChannel;
import com.oracle.coherence.grpc.client.common.BaseGrpcClient;
import com.oracle.coherence.grpc.client.common.ClientProtocol;
import com.oracle.coherence.grpc.client.common.GrpcConnection;
import com.oracle.coherence.grpc.client.common.SimpleStreamObserver;
import com.oracle.coherence.grpc.client.common.topics.GrpcRemoteTopicService;
import com.oracle.coherence.grpc.client.common.topics.GrpcTopicLifecycleEventDispatcher;
import com.oracle.coherence.grpc.client.common.topics.TopicServiceGrpcConnection;
import com.oracle.coherence.grpc.messages.topic.v1.EnsureSubscriberGroupRequest;
import com.oracle.coherence.grpc.messages.topic.v1.EnsureTopicRequest;
import com.oracle.coherence.grpc.messages.topic.v1.GetRemainingMessagesRequest;
import com.oracle.coherence.grpc.messages.topic.v1.NamedTopicEvent;
import com.oracle.coherence.grpc.messages.topic.v1.TopicEventType;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceRequest;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceRequestType;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceResponse;
import com.tangosol.internal.net.topic.NamedTopicConnector;
import com.tangosol.internal.net.topic.NamedTopicSubscriber;
import com.tangosol.internal.net.topic.NamedTopicView;
import com.tangosol.internal.net.topic.PublisherConnector;
import com.tangosol.internal.net.topic.SubscriberConnector;
import com.tangosol.net.TopicService;
import com.tangosol.net.events.EventDispatcher;
import com.tangosol.net.topic.NamedTopicEvent;
import com.tangosol.net.topic.Publisher;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.util.Filter;
import com.tangosol.util.ValueExtractor;
import io.grpc.Channel;
import java.util.concurrent.CompletableFuture;

public class GrpcNamedTopicConnector<V>
extends BaseClientChannel<Dependencies, TopicServiceGrpcConnection>
implements ClientProtocol,
NamedTopicConnector<V> {
    protected final String f_sName;
    protected final GrpcRemoteTopicService f_topicService;
    private final int f_nTopicId;
    protected NamedTopicView<V> m_topicView;
    private boolean m_fDestroyed;
    private boolean m_fReleased;
    private final GrpcConnection.Listener<TopicServiceResponse> f_listener;

    public GrpcNamedTopicConnector(Dependencies dependencies, GrpcRemoteTopicService service, TopicServiceGrpcConnection connection) {
        super(dependencies, connection);
        this.f_topicService = service;
        this.f_sName = dependencies.getName();
        EnsureTopicRequest ensureRequest = EnsureTopicRequest.newBuilder().setTopic(this.f_sName).build();
        TopicServiceRequest request = TopicServiceRequest.newBuilder().setType(TopicServiceRequestType.EnsureTopic).setMessage(Any.pack((Message)ensureRequest)).build();
        TopicServiceResponse response = (TopicServiceResponse)connection.send((Message)request);
        this.f_nTopicId = response.getProxyId();
        SimpleStreamObserver<TopicServiceResponse> eventObserver = new SimpleStreamObserver<TopicServiceResponse>(this::onEvent);
        this.f_listener = new GrpcConnection.Listener<TopicServiceResponse>(eventObserver, r -> r.getProxyId() == this.f_nTopicId);
        connection.addResponseObserver(this.f_listener);
    }

    public boolean isDestroyed() {
        return this.m_fDestroyed;
    }

    public boolean isReleased() {
        return this.m_fReleased;
    }

    public int getRemainingMessages(String sSubscriberGroup, int[] anChannel) {
        GetRemainingMessagesRequest.Builder builder = GetRemainingMessagesRequest.newBuilder().setSubscriberGroup(sSubscriberGroup);
        if (anChannel != null) {
            for (int nChannel : anChannel) {
                builder.addChannels(nChannel);
            }
        }
        return ((Int32Value)((CompletableFuture)((TopicServiceGrpcConnection)this.f_connection).poll(this.f_nTopicId, TopicServiceRequestType.GetRemainingMessages, (Message)builder.build()).thenApply(((TopicServiceGrpcConnection)this.f_connection)::unpackInteger)).join()).getValue();
    }

    public TopicService getTopicService() {
        return this.f_topicService;
    }

    public String getName() {
        return this.f_sName;
    }

    public void destroy() {
        this.release(true);
    }

    public void release() {
        this.release(false);
    }

    public void ensureSubscriberGroup(String sSubscriberGroup, Filter<?> filter, ValueExtractor<?, ?> extractor) {
        EnsureSubscriberGroupRequest.Builder builder = EnsureSubscriberGroupRequest.newBuilder().setSubscriberGroup(sSubscriberGroup);
        if (filter != null) {
            builder.setFilter(this.toByteString(filter));
        }
        if (extractor != null) {
            builder.setExtractor(this.toByteString(filter));
        }
        ((TopicServiceGrpcConnection)this.f_connection).poll(this.f_nTopicId, TopicServiceRequestType.EnsureSubscriberGroup, (Message)builder.build()).join();
    }

    public void destroySubscriberGroup(String sSubscriberGroup) {
        StringValue value = StringValue.newBuilder().setValue(sSubscriberGroup).build();
        ((TopicServiceGrpcConnection)this.f_connection).poll(this.f_nTopicId, TopicServiceRequestType.DestroySubscriberGroup, (Message)value).join();
    }

    public PublisherConnector<V> createPublisher(Publisher.Option<? super V>[] options) {
        return this.createPublisherConnector(options);
    }

    public <U> NamedTopicSubscriber<U> createSubscriber(Subscriber.Option<? super V, U>[] options) {
        SubscriberConnector<U> connector = this.createSubscriberConnector(options);
        return new NamedTopicSubscriber(this.m_topicView, connector, options);
    }

    public void setConnectedNamedTopic(NamedTopicView<V> namedTopicView) {
        this.m_topicView = namedTopicView;
    }

    public PublisherConnector<V> createPublisherConnector(Publisher.Option<? super V>[] options) {
        return this.f_topicService.ensurePublisher(this.f_sName, options);
    }

    public <U> SubscriberConnector<U> createSubscriberConnector(Subscriber.Option<? super V, U>[] options) {
        return this.f_topicService.ensureSubscriber(this, options);
    }

    protected void onEvent(TopicServiceResponse response) {
        NamedTopicEvent event;
        NamedTopicView<V> view = this.m_topicView;
        if (view != null && (event = (NamedTopicEvent)MessageHelper.unpack((Any)response.getMessage(), NamedTopicEvent.class)) != null && event.getType() == TopicEventType.TopicDestroyed) {
            view.dispatchEvent(NamedTopicEvent.Type.Destroyed);
        }
    }

    protected void release(boolean fDestroy) {
        if (fDestroy) {
            this.m_fDestroyed = true;
            ((TopicServiceGrpcConnection)this.f_connection).send(0, TopicServiceRequestType.DestroyTopic, (Message)StringValue.of((String)this.f_sName));
        } else {
            this.m_fReleased = true;
        }
        ((TopicServiceGrpcConnection)this.f_connection).close();
    }

    public static interface Dependencies
    extends BaseGrpcClient.Dependencies {
        public long getHeartbeatMillis();

        public boolean isRequireHeartbeatAck();
    }

    public static class DefaultDependencies
    extends BaseGrpcClient.DefaultDependencies
    implements Dependencies {
        private long m_nEventsHeartbeat = 0L;
        private boolean m_fRequireHeartbeatAck;

        public DefaultDependencies(String sName, Channel channel, GrpcTopicLifecycleEventDispatcher dispatcher) {
            super(sName, channel, (EventDispatcher)dispatcher);
        }

        @Override
        public long getHeartbeatMillis() {
            return this.m_nEventsHeartbeat;
        }

        @Override
        public boolean isRequireHeartbeatAck() {
            return this.m_fRequireHeartbeatAck;
        }

        public void setHeartbeatMillis(long nEventsHeartbeat) {
            this.m_nEventsHeartbeat = Math.max(0L, nEventsHeartbeat);
        }

        public void setRequireHeartbeatAck(boolean fRequireHeartbeatAck) {
            this.m_fRequireHeartbeatAck = fRequireHeartbeatAck;
        }
    }
}

