/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl;

import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.DataSerializable;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.AbstractOperation;
import com.hazelcast.spi.EventFilter;
import com.hazelcast.spi.EventPublishingService;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.EventService;
import com.hazelcast.spi.Invocation;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.PostJoinAwareService;
import com.hazelcast.spi.annotation.PrivateApi;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.SpiDataSerializerHook;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.executor.StripedExecutor;
import com.hazelcast.util.executor.StripedRunnable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

public class EventServiceImpl
implements EventService,
PostJoinAwareService {
    private static final EventRegistration[] EMPTY_REGISTRATIONS = new EventRegistration[0];
    private final ILogger logger;
    private final NodeEngineImpl nodeEngine;
    private final ConcurrentMap<String, EventServiceSegment> segments;
    private final StripedExecutor eventExecutor;

    EventServiceImpl(NodeEngineImpl nodeEngine) {
        this.nodeEngine = nodeEngine;
        this.logger = nodeEngine.getLogger(EventService.class.getName());
        Node node = nodeEngine.getNode();
        int eventThreadCount = node.getGroupProperties().EVENT_THREAD_COUNT.getInteger();
        this.eventExecutor = new StripedExecutor(nodeEngine.executionService.getCachedExecutor(), eventThreadCount);
        this.segments = new ConcurrentHashMap<String, EventServiceSegment>();
    }

    @Override
    public EventRegistration registerLocalListener(String serviceName, String topic, Object listener) {
        return this.registerListenerInternal(serviceName, topic, new EmptyFilter(), listener, true);
    }

    @Override
    public EventRegistration registerLocalListener(String serviceName, String topic, EventFilter filter, Object listener) {
        return this.registerListenerInternal(serviceName, topic, filter, listener, true);
    }

    @Override
    public EventRegistration registerListener(String serviceName, String topic, Object listener) {
        return this.registerListenerInternal(serviceName, topic, new EmptyFilter(), listener, false);
    }

    @Override
    public EventRegistration registerListener(String serviceName, String topic, EventFilter filter, Object listener) {
        return this.registerListenerInternal(serviceName, topic, filter, listener, false);
    }

    private EventRegistration registerListenerInternal(String serviceName, String topic, EventFilter filter, Object listener, boolean localOnly) {
        Registration reg;
        if (listener == null) {
            throw new IllegalArgumentException("Listener required!");
        }
        if (filter == null) {
            throw new IllegalArgumentException("EventFilter required!");
        }
        EventServiceSegment segment = this.getSegment(serviceName, true);
        if (segment.addRegistration(topic, reg = new Registration(UUID.randomUUID().toString(), serviceName, topic, filter, this.nodeEngine.getThisAddress(), listener, localOnly))) {
            if (!localOnly) {
                this.invokeRegistrationOnOtherNodes(serviceName, reg);
            }
            return reg;
        }
        return null;
    }

    private boolean handleRegistration(Registration reg) {
        EventServiceSegment segment = this.getSegment(reg.serviceName, true);
        return segment.addRegistration(reg.topic, reg);
    }

    @Override
    public boolean deregisterListener(String serviceName, String topic, Object id) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            Registration reg = segment.removeRegistration(topic, String.valueOf(id));
            if (reg != null && !reg.isLocalOnly()) {
                this.invokeDeregistrationOnOtherNodes(serviceName, topic, String.valueOf(id));
            }
            return reg != null;
        }
        return false;
    }

    @Override
    public void deregisterAllListeners(String serviceName, String topic) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            segment.removeRegistrations(topic);
        }
    }

    private void deregisterSubscriber(String serviceName, String topic, String id) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            segment.removeRegistration(topic, id);
        }
    }

    private void invokeRegistrationOnOtherNodes(String serviceName, Registration reg) {
        Collection<MemberImpl> members = this.nodeEngine.getClusterService().getMemberList();
        ArrayList<Future> calls = new ArrayList<Future>(members.size());
        for (MemberImpl member : members) {
            if (member.localMember()) continue;
            Invocation inv = this.nodeEngine.getOperationService().createInvocationBuilder(serviceName, (Operation)new RegistrationOperation(reg), member.getAddress()).build();
            calls.add(inv.invoke());
        }
        for (Future f : calls) {
            try {
                f.get(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException ignored) {
            }
            catch (TimeoutException ignored) {
            }
            catch (MemberLeftException e) {
                this.logger.log(Level.FINEST, "Member left while registering listener...", e);
            }
            catch (ExecutionException e) {
                throw new HazelcastException(e);
            }
        }
    }

    private void invokeDeregistrationOnOtherNodes(String serviceName, String topic, String id) {
        Collection<MemberImpl> members = this.nodeEngine.getClusterService().getMemberList();
        ArrayList<Future> calls = new ArrayList<Future>(members.size());
        for (MemberImpl member : members) {
            if (member.localMember()) continue;
            Invocation inv = this.nodeEngine.getOperationService().createInvocationBuilder(serviceName, (Operation)new DeregistrationOperation(topic, id), member.getAddress()).build();
            calls.add(inv.invoke());
        }
        for (Future f : calls) {
            try {
                f.get(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException ignored) {
            }
            catch (TimeoutException ignored) {
            }
            catch (MemberLeftException e) {
                this.logger.log(Level.FINEST, "Member left while de-registering listener...", e);
            }
            catch (ExecutionException e) {
                throw new HazelcastException(e);
            }
        }
    }

    @Override
    public EventRegistration[] getRegistrationsAsArray(String serviceName, String topic) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            Collection registrations = segment.getRegistrations(topic, false);
            return registrations != null && !registrations.isEmpty() ? (EventRegistration[])registrations.toArray(new Registration[registrations.size()]) : EMPTY_REGISTRATIONS;
        }
        return EMPTY_REGISTRATIONS;
    }

    @Override
    public Collection<EventRegistration> getRegistrations(String serviceName, String topic) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            Collection registrations = segment.getRegistrations(topic, false);
            return registrations != null && !registrations.isEmpty() ? Collections.unmodifiableCollection(registrations) : Collections.emptySet();
        }
        return Collections.emptySet();
    }

    @Override
    public void publishEvent(String serviceName, EventRegistration registration, Object event, int orderKey) {
        if (!(registration instanceof Registration)) {
            throw new IllegalArgumentException();
        }
        Registration reg = (Registration)registration;
        if (reg.isLocal()) {
            this.executeLocal(serviceName, event, reg, orderKey);
        } else {
            Address subscriber = registration.getSubscriber();
            this.sendEventPacket(subscriber, new EventPacket(registration.getId(), serviceName, event), orderKey);
        }
    }

    @Override
    public void publishEvent(String serviceName, Collection<EventRegistration> registrations, Object event, int orderKey) {
        Iterator<EventRegistration> iter = registrations.iterator();
        Data eventData = null;
        while (iter.hasNext()) {
            EventRegistration registration = iter.next();
            if (!(registration instanceof Registration)) {
                throw new IllegalArgumentException();
            }
            Registration reg = (Registration)registration;
            if (reg.isLocal()) {
                this.executeLocal(serviceName, event, reg, orderKey);
                continue;
            }
            if (eventData == null) {
                eventData = this.nodeEngine.toData(event);
            }
            Address subscriber = registration.getSubscriber();
            this.sendEventPacket(subscriber, new EventPacket(registration.getId(), serviceName, eventData), orderKey);
        }
    }

    private void executeLocal(String serviceName, Object event, Registration reg, int orderKey) {
        if (this.nodeEngine.isActive()) {
            try {
                this.eventExecutor.execute(new LocalEventDispatcher(serviceName, event, reg.listener, orderKey));
            }
            catch (RejectedExecutionException e) {
                this.logger.log(Level.WARNING, e.toString());
            }
        }
    }

    private void sendEventPacket(Address subscriber, EventPacket eventPacket, int orderKey) {
        boolean sync;
        String serviceName = eventPacket.serviceName;
        EventServiceSegment segment = this.getSegment(serviceName, true);
        boolean bl = sync = segment.incrementPublish() % 100000 == 0;
        if (sync) {
            Invocation inv = this.nodeEngine.getOperationService().createInvocationBuilder(serviceName, (Operation)new SendEventOperation(eventPacket, orderKey), subscriber).setTryCount(50).build();
            try {
                inv.invoke().get(3L, TimeUnit.SECONDS);
            }
            catch (Exception ignored) {}
        } else {
            Packet packet = new Packet(this.nodeEngine.toData(eventPacket), orderKey, this.nodeEngine.getSerializationContext());
            packet.setHeader(2);
            this.nodeEngine.send(packet, subscriber);
        }
    }

    private EventServiceSegment getSegment(String service, boolean forceCreate) {
        EventServiceSegment segment = (EventServiceSegment)this.segments.get(service);
        if (segment == null && forceCreate) {
            return ConcurrencyUtil.getOrPutIfAbsent(this.segments, service, new ConstructorFunction<String, EventServiceSegment>(){

                @Override
                public EventServiceSegment createNew(String key) {
                    return new EventServiceSegment(key);
                }
            });
        }
        return segment;
    }

    @PrivateApi
    void executeEvent(Runnable eventRunnable) {
        if (this.nodeEngine.isActive()) {
            try {
                this.eventExecutor.execute(eventRunnable);
            }
            catch (RejectedExecutionException e) {
                this.logger.log(Level.WARNING, e.toString());
            }
        }
    }

    @PrivateApi
    void handleEvent(Packet packet) {
        try {
            this.eventExecutor.execute(new RemoteEventPacketProcessor(packet));
        }
        catch (RejectedExecutionException e) {
            this.logger.log(Level.WARNING, e.toString());
        }
    }

    @Override
    public PostJoinRegistrationOperation getPostJoinOperation() {
        LinkedList<Registration> registrations = new LinkedList<Registration>();
        for (EventServiceSegment segment : this.segments.values()) {
            for (Registration reg : segment.registrationIdMap.values()) {
                if (reg.isLocalOnly()) continue;
                registrations.add(reg);
            }
        }
        return registrations.isEmpty() ? null : new PostJoinRegistrationOperation(registrations);
    }

    void shutdown() {
        this.logger.log(Level.FINEST, "Stopping event executor...");
        this.eventExecutor.shutdown();
        for (EventServiceSegment segment : this.segments.values()) {
            segment.clear();
        }
        this.segments.clear();
    }

    void onMemberLeft(MemberImpl member) {
        Address address = member.getAddress();
        for (EventServiceSegment segment : this.segments.values()) {
            segment.onMemberLeft(address);
        }
    }

    public static class PostJoinRegistrationOperation
    extends AbstractOperation {
        private Collection<Registration> registrations;

        public PostJoinRegistrationOperation() {
        }

        public PostJoinRegistrationOperation(Collection<Registration> registrations) {
            this.registrations = registrations;
        }

        @Override
        public void run() throws Exception {
            if (this.registrations != null && this.registrations.size() > 0) {
                NodeEngineImpl nodeEngine = (NodeEngineImpl)this.getNodeEngine();
                EventServiceImpl eventService = nodeEngine.eventService;
                for (Registration reg : this.registrations) {
                    eventService.handleRegistration(reg);
                }
            }
        }

        @Override
        public boolean returnsResponse() {
            return false;
        }

        @Override
        protected void writeInternal(ObjectDataOutput out) throws IOException {
            super.writeInternal(out);
            int len = this.registrations != null ? this.registrations.size() : 0;
            out.writeInt(len);
            if (len > 0) {
                for (Registration reg : this.registrations) {
                    reg.writeData(out);
                }
            }
        }

        @Override
        protected void readInternal(ObjectDataInput in) throws IOException {
            super.readInternal(in);
            int len = in.readInt();
            if (len > 0) {
                this.registrations = new ArrayList<Registration>(len);
                for (int i = 0; i < len; ++i) {
                    Registration reg = new Registration();
                    this.registrations.add(reg);
                    reg.readData(in);
                }
            }
        }
    }

    public static class DeregistrationOperation
    extends AbstractOperation {
        private String topic;
        private String id;

        DeregistrationOperation() {
        }

        private DeregistrationOperation(String topic, String id) {
            this.topic = topic;
            this.id = id;
        }

        @Override
        public void run() throws Exception {
            EventServiceImpl eventService = (EventServiceImpl)this.getNodeEngine().getEventService();
            eventService.deregisterSubscriber(this.getServiceName(), this.topic, this.id);
        }

        @Override
        public Object getResponse() {
            return true;
        }

        @Override
        public boolean returnsResponse() {
            return true;
        }

        @Override
        protected void writeInternal(ObjectDataOutput out) throws IOException {
            out.writeUTF(this.topic);
            out.writeUTF(this.id);
        }

        @Override
        protected void readInternal(ObjectDataInput in) throws IOException {
            this.topic = in.readUTF();
            this.id = in.readUTF();
        }
    }

    public static class RegistrationOperation
    extends AbstractOperation {
        private Registration registration;
        private transient boolean response = false;

        public RegistrationOperation() {
        }

        private RegistrationOperation(Registration registration) {
            this.registration = registration;
        }

        @Override
        public void run() throws Exception {
            EventServiceImpl eventService = (EventServiceImpl)this.getNodeEngine().getEventService();
            this.response = eventService.handleRegistration(this.registration);
        }

        @Override
        public Object getResponse() {
            return this.response;
        }

        @Override
        public boolean returnsResponse() {
            return true;
        }

        @Override
        protected void writeInternal(ObjectDataOutput out) throws IOException {
            this.registration.writeData(out);
        }

        @Override
        protected void readInternal(ObjectDataInput in) throws IOException {
            this.registration = new Registration();
            this.registration.readData(in);
        }
    }

    public static class SendEventOperation
    extends AbstractOperation {
        private EventPacket eventPacket;
        private int orderKey;

        public SendEventOperation() {
        }

        public SendEventOperation(EventPacket eventPacket, int orderKey) {
            this.eventPacket = eventPacket;
            this.orderKey = orderKey;
        }

        @Override
        public void run() throws Exception {
            EventServiceImpl eventService;
            EventServiceImpl eventServiceImpl = eventService = (EventServiceImpl)this.getNodeEngine().getEventService();
            eventServiceImpl.getClass();
            eventService.executeEvent(eventServiceImpl.new EventPacketProcessor(this.eventPacket, this.orderKey));
        }

        @Override
        public boolean returnsResponse() {
            return true;
        }

        @Override
        protected void writeInternal(ObjectDataOutput out) throws IOException {
            super.writeInternal(out);
            this.eventPacket.writeData(out);
            out.writeInt(this.orderKey);
        }

        @Override
        protected void readInternal(ObjectDataInput in) throws IOException {
            super.readInternal(in);
            this.eventPacket = new EventPacket();
            this.eventPacket.readData(in);
            this.orderKey = in.readInt();
        }
    }

    public static final class EmptyFilter
    implements EventFilter,
    DataSerializable {
        @Override
        public boolean eval(Object arg) {
            return true;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof EmptyFilter;
        }

        public int hashCode() {
            return 0;
        }
    }

    public static final class EventPacket
    implements IdentifiedDataSerializable {
        private String id;
        private String serviceName;
        private Object event;

        public EventPacket() {
        }

        EventPacket(String id, String serviceName, Object event) {
            this.event = event;
            this.id = id;
            this.serviceName = serviceName;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeUTF(this.id);
            out.writeUTF(this.serviceName);
            out.writeObject(this.event);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.id = in.readUTF();
            this.serviceName = in.readUTF();
            this.event = in.readObject();
        }

        @Override
        public int getFactoryId() {
            return SpiDataSerializerHook.F_ID;
        }

        @Override
        public int getId() {
            return 6;
        }
    }

    public static class Registration
    implements EventRegistration {
        private String id;
        private String serviceName;
        private String topic;
        private EventFilter filter;
        private Address subscriber;
        private transient boolean localOnly;
        private transient Object listener;

        public Registration() {
        }

        public Registration(String id, String serviceName, String topic, EventFilter filter, Address subscriber, Object listener, boolean localOnly) {
            this.filter = filter;
            this.id = id;
            this.listener = listener;
            this.serviceName = serviceName;
            this.topic = topic;
            this.subscriber = subscriber;
            this.localOnly = localOnly;
        }

        @Override
        public EventFilter getFilter() {
            return this.filter;
        }

        @Override
        public String getId() {
            return this.id;
        }

        @Override
        public Address getSubscriber() {
            return this.subscriber;
        }

        @Override
        public boolean isLocalOnly() {
            return this.localOnly;
        }

        private boolean isLocal() {
            return this.listener != null;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Registration that = (Registration)o;
            if (this.id != null ? !this.id.equals(that.id) : that.id != null) {
                return false;
            }
            if (this.serviceName != null ? !this.serviceName.equals(that.serviceName) : that.serviceName != null) {
                return false;
            }
            if (this.topic != null ? !this.topic.equals(that.topic) : that.topic != null) {
                return false;
            }
            if (this.filter != null ? !this.filter.equals(that.filter) : that.filter != null) {
                return false;
            }
            return !(this.subscriber != null ? !this.subscriber.equals(that.subscriber) : that.subscriber != null);
        }

        public int hashCode() {
            int result = this.id != null ? this.id.hashCode() : 0;
            result = 31 * result + (this.serviceName != null ? this.serviceName.hashCode() : 0);
            result = 31 * result + (this.topic != null ? this.topic.hashCode() : 0);
            result = 31 * result + (this.filter != null ? this.filter.hashCode() : 0);
            result = 31 * result + (this.subscriber != null ? this.subscriber.hashCode() : 0);
            return result;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeUTF(this.id);
            out.writeUTF(this.serviceName);
            out.writeUTF(this.topic);
            this.subscriber.writeData(out);
            out.writeObject(this.filter);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.id = in.readUTF();
            this.serviceName = in.readUTF();
            this.topic = in.readUTF();
            this.subscriber = new Address();
            this.subscriber.readData(in);
            this.filter = (EventFilter)in.readObject();
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Registration");
            sb.append("{filter=").append(this.filter);
            sb.append(", id='").append(this.id).append('\'');
            sb.append(", serviceName='").append(this.serviceName).append('\'');
            sb.append(", subscriber=").append(this.subscriber);
            sb.append(", listener=").append(this.listener);
            sb.append('}');
            return sb.toString();
        }
    }

    private class LocalEventDispatcher
    implements StripedRunnable {
        final String serviceName;
        final Object event;
        final Object listener;
        final int orderKey;

        private LocalEventDispatcher(String serviceName, Object event, Object listener, int orderKey) {
            this.serviceName = serviceName;
            this.event = event;
            this.listener = listener;
            this.orderKey = orderKey;
        }

        @Override
        public final void run() {
            EventPublishingService service = (EventPublishingService)EventServiceImpl.this.nodeEngine.getService(this.serviceName);
            if (service != null) {
                service.dispatchEvent(this.event, this.listener);
            } else if (EventServiceImpl.this.nodeEngine.isActive()) {
                throw new IllegalArgumentException("Service[" + this.serviceName + "] could not be found!");
            }
        }

        @Override
        public int getKey() {
            return this.orderKey;
        }
    }

    private class RemoteEventPacketProcessor
    extends EventPacketProcessor
    implements StripedRunnable {
        private Packet packet;

        public RemoteEventPacketProcessor(Packet packet) {
            this.packet = packet;
            this.orderKey = packet.getPartitionId();
        }

        @Override
        public void run() {
            Data data = this.packet.getData();
            EventPacket eventPacket = (EventPacket)EventServiceImpl.this.nodeEngine.toObject(data);
            this.process(eventPacket);
        }
    }

    private class EventPacketProcessor
    implements StripedRunnable {
        private EventPacket eventPacket;
        int orderKey;

        private EventPacketProcessor() {
        }

        public EventPacketProcessor(EventPacket packet, int orderKey) {
            this.eventPacket = packet;
            this.orderKey = orderKey;
        }

        @Override
        public void run() {
            this.process(this.eventPacket);
        }

        void process(EventPacket eventPacket) {
            Object eventObject = eventPacket.event;
            if (eventObject instanceof Data) {
                eventObject = EventServiceImpl.this.nodeEngine.toObject(eventObject);
            }
            String serviceName = eventPacket.serviceName;
            EventPublishingService service = (EventPublishingService)EventServiceImpl.this.nodeEngine.getService(serviceName);
            if (service == null) {
                EventServiceImpl.this.logger.log(Level.WARNING, "There is no service named: " + serviceName);
                return;
            }
            EventServiceSegment segment = EventServiceImpl.this.getSegment(serviceName, false);
            if (segment == null) {
                EventServiceImpl.this.logger.log(Level.WARNING, "No service registration found for " + serviceName);
                return;
            }
            Registration registration = (Registration)segment.registrationIdMap.get(eventPacket.id);
            if (registration == null) {
                EventServiceImpl.this.logger.log(Level.WARNING, "No registration found for " + serviceName + " / " + eventPacket.id);
                return;
            }
            if (!registration.isLocal()) {
                EventServiceImpl.this.logger.log(Level.WARNING, "Invalid target for  " + registration);
                return;
            }
            service.dispatchEvent(eventObject, registration.listener);
        }

        @Override
        public int getKey() {
            return this.orderKey;
        }
    }

    private static class EventServiceSegment {
        final String serviceName;
        final ConcurrentMap<String, Collection<Registration>> registrations = new ConcurrentHashMap<String, Collection<Registration>>();
        final ConcurrentMap<String, Registration> registrationIdMap = new ConcurrentHashMap<String, Registration>();
        final AtomicInteger totalPublishes = new AtomicInteger();

        EventServiceSegment(String serviceName) {
            this.serviceName = serviceName;
        }

        private Collection<Registration> getRegistrations(String topic, boolean forceCreate) {
            Collection listenerList = (Collection)this.registrations.get(topic);
            if (listenerList == null && forceCreate) {
                return ConcurrencyUtil.getOrPutIfAbsent(this.registrations, topic, new ConstructorFunction<String, Collection<Registration>>(){

                    @Override
                    public Collection<Registration> createNew(String key) {
                        return Collections.newSetFromMap(new ConcurrentHashMap());
                    }
                });
            }
            return listenerList;
        }

        private boolean addRegistration(String topic, Registration registration) {
            Collection<Registration> registrations = this.getRegistrations(topic, true);
            if (registrations.add(registration)) {
                this.registrationIdMap.put(registration.id, registration);
                return true;
            }
            return false;
        }

        private Registration removeRegistration(String topic, String id) {
            Collection all;
            Registration registration = (Registration)this.registrationIdMap.remove(id);
            if (registration != null && (all = (Collection)this.registrations.get(topic)) != null) {
                all.remove(registration);
            }
            return registration;
        }

        void removeRegistrations(String topic) {
            Collection all = (Collection)this.registrations.remove(topic);
            if (all != null) {
                for (Registration reg : all) {
                    this.registrationIdMap.remove(reg.getId());
                }
            }
        }

        void clear() {
            this.registrations.clear();
            this.registrationIdMap.clear();
        }

        void onMemberLeft(Address address) {
            for (Collection all : this.registrations.values()) {
                Iterator iter = all.iterator();
                while (iter.hasNext()) {
                    Registration reg = (Registration)iter.next();
                    if (!address.equals(reg.getSubscriber())) continue;
                    iter.remove();
                    this.registrationIdMap.remove(reg.id);
                }
            }
        }

        int incrementPublish() {
            return this.totalPublishes.incrementAndGet();
        }
    }
}

