/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.proxy;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.TopicAddMessageListenerCodec;
import com.hazelcast.client.impl.protocol.codec.TopicPublishCodec;
import com.hazelcast.client.impl.protocol.codec.TopicRemoveMessageListenerCodec;
import com.hazelcast.client.proxy.PartitionSpecificClientProxy;
import com.hazelcast.client.spi.ClientClusterService;
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.client.spi.impl.ListenerMessageCodec;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Member;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.monitor.LocalTopicStats;
import com.hazelcast.nio.serialization.Data;

public class ClientTopicProxy<E>
extends PartitionSpecificClientProxy
implements ITopic<E> {
    public ClientTopicProxy(String serviceName, String objectId) {
        super(serviceName, objectId);
    }

    @Override
    public void publish(E message) {
        SerializationService serializationService = this.getContext().getSerializationService();
        Object data = serializationService.toData(message);
        ClientMessage request = TopicPublishCodec.encodeRequest(this.name, data);
        this.invokeOnPartition(request);
    }

    @Override
    public String addMessageListener(MessageListener<E> listener) {
        TopicItemHandler handler = new TopicItemHandler(listener);
        return this.registerListener(new Codec(), handler);
    }

    @Override
    public boolean removeMessageListener(String registrationId) {
        return this.deregisterListener(registrationId);
    }

    @Override
    public LocalTopicStats getLocalTopicStats() {
        throw new UnsupportedOperationException("Locality is ambiguous for client!!!");
    }

    public String toString() {
        return "ITopic{name='" + this.name + '\'' + '}';
    }

    private class Codec
    implements ListenerMessageCodec {
        private Codec() {
        }

        @Override
        public ClientMessage encodeAddRequest(boolean localOnly) {
            return TopicAddMessageListenerCodec.encodeRequest(ClientTopicProxy.this.name, localOnly);
        }

        @Override
        public String decodeAddResponse(ClientMessage clientMessage) {
            return TopicAddMessageListenerCodec.decodeResponse((ClientMessage)clientMessage).response;
        }

        @Override
        public ClientMessage encodeRemoveRequest(String realRegistrationId) {
            return TopicRemoveMessageListenerCodec.encodeRequest(ClientTopicProxy.this.name, realRegistrationId);
        }

        @Override
        public boolean decodeRemoveResponse(ClientMessage clientMessage) {
            return TopicRemoveMessageListenerCodec.decodeResponse((ClientMessage)clientMessage).response;
        }
    }

    private final class TopicItemHandler
    extends TopicAddMessageListenerCodec.AbstractEventHandler
    implements EventHandler<ClientMessage> {
        private final MessageListener<E> listener;

        private TopicItemHandler(MessageListener<E> listener) {
            this.listener = listener;
        }

        @Override
        public void handle(Data item, long publishTime, String uuid) {
            SerializationService serializationService = ClientTopicProxy.this.getContext().getSerializationService();
            ClientClusterService clusterService = ClientTopicProxy.this.getContext().getClusterService();
            Object messageObject = serializationService.toObject(item);
            Member member = clusterService.getMember(uuid);
            Message message = new Message(ClientTopicProxy.this.name, messageObject, publishTime, member);
            this.listener.onMessage(message);
        }

        @Override
        public void beforeListenerRegister() {
        }

        @Override
        public void onListenerRegister() {
        }
    }
}

