package com.hazelcast.spi.impl.eventservice.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricDescriptorConstants;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.StaticMetricsProvider;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.server.ServerConnection;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.EmptyStatement;
import com.hazelcast.internal.util.FutureUtil;
import com.hazelcast.internal.util.InvocationUtil;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.ThreadUtil;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.internal.util.executor.StripedExecutor;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.eventservice.EventFilter;
import com.hazelcast.spi.impl.eventservice.EventRegistration;
import com.hazelcast.spi.impl.eventservice.EventService;
import com.hazelcast.spi.impl.eventservice.impl.operations.DeregistrationOperationSupplier;
import com.hazelcast.spi.impl.eventservice.impl.operations.OnJoinRegistrationOperation;
import com.hazelcast.spi.impl.eventservice.impl.operations.RegistrationOperationSupplier;
import com.hazelcast.spi.impl.eventservice.impl.operations.SendEventOperation;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.logging.Level;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/spi/impl/eventservice/impl/EventServiceImpl.class */
public class EventServiceImpl implements EventService, StaticMetricsProvider {
    public static final String SERVICE_NAME = "hz:core:eventService";
    public static final String EVENT_SYNC_FREQUENCY_PROP = "hazelcast.event.sync.frequency";
    private static final EventRegistration[] EMPTY_REGISTRATIONS = new EventRegistration[0];
    private static final int EVENT_SYNC_FREQUENCY = 100000;
    private static final int SEND_RETRY_COUNT = 50;
    private static final int WARNING_LOG_FREQUENCY = 1000;
    private static final int MAX_RETRIES = 100;
    final ILogger logger;
    final NodeEngineImpl nodeEngine;
    private final ConcurrentMap<String, EventServiceSegment> segments;
    private final StripedExecutor eventExecutor;
    private final long eventQueueTimeoutMs;

    @Probe(name = MetricDescriptorConstants.EVENT_METRIC_EVENT_SERVICE_THREAD_COUNT)
    private final int eventThreadCount;

    @Probe(name = MetricDescriptorConstants.EVENT_METRIC_EVENT_SERVICE_QUEUE_CAPACITY)
    private final int eventQueueCapacity;
    private final int sendEventSyncTimeoutMillis;
    private final InternalSerializationService serializationService;
    private final int eventSyncFrequency;

    @Probe(name = MetricDescriptorConstants.EVENT_METRIC_EVENT_SERVICE_TOTAL_FAILURE_COUNT)
    private final MwCounter totalFailures = MwCounter.newMwCounter();

    @Probe(name = MetricDescriptorConstants.EVENT_METRIC_EVENT_SERVICE_REJECTED_COUNT)
    private final MwCounter rejectedCount = MwCounter.newMwCounter();

    @Probe(name = MetricDescriptorConstants.EVENT_METRIC_EVENT_SERVICE_SYNC_DELIVERY_FAILURE_COUNT)
    private final MwCounter syncDeliveryFailureCount = MwCounter.newMwCounter();
    private final ConcurrentMap<UUID, Object> listenerCache = new ConcurrentHashMap();

    public EventServiceImpl(NodeEngineImpl nodeEngineImpl) {
        this.nodeEngine = nodeEngineImpl;
        this.serializationService = (InternalSerializationService) nodeEngineImpl.getSerializationService();
        this.logger = nodeEngineImpl.getLogger(EventService.class.getName());
        HazelcastProperties properties = nodeEngineImpl.getProperties();
        this.eventThreadCount = properties.getInteger(ClusterProperty.EVENT_THREAD_COUNT);
        this.eventQueueCapacity = properties.getInteger(ClusterProperty.EVENT_QUEUE_CAPACITY);
        this.eventQueueTimeoutMs = properties.getMillis(ClusterProperty.EVENT_QUEUE_TIMEOUT_MILLIS);
        this.sendEventSyncTimeoutMillis = properties.getInteger(ClusterProperty.EVENT_SYNC_TIMEOUT_MILLIS);
        this.eventSyncFrequency = loadEventSyncFrequency();
        this.eventExecutor = new StripedExecutor(nodeEngineImpl.getNode().getLogger(EventServiceImpl.class), ThreadUtil.createThreadName(nodeEngineImpl.getHazelcastInstance().getName(), MetricDescriptorConstants.EVENT_PREFIX), this.eventThreadCount, this.eventQueueCapacity);
        this.segments = new ConcurrentHashMap();
    }

    private static int loadEventSyncFrequency() {
        try {
            int parseInt = Integer.parseInt(System.getProperty(EVENT_SYNC_FREQUENCY_PROP));
            if (parseInt <= 0) {
                parseInt = 100000;
            }
            return parseInt;
        } catch (Exception e) {
            return 100000;
        }
    }

    @Override // com.hazelcast.internal.metrics.StaticMetricsProvider
    public void provideStaticMetrics(MetricsRegistry metricsRegistry) {
        metricsRegistry.registerStaticMetrics((MetricsRegistry) this, MetricDescriptorConstants.EVENT_PREFIX);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public void close(EventRegistration eventRegistration) {
        Object listener = ((Registration) eventRegistration).getListener();
        if (listener instanceof Closeable) {
            try {
                ((Closeable) listener).close();
            } catch (IOException e) {
                EmptyStatement.ignore(e);
            }
        }
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public int getEventThreadCount() {
        return this.eventThreadCount;
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public int getEventQueueCapacity() {
        return this.eventQueueCapacity;
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    @Probe(name = "eventQueueSize", level = ProbeLevel.MANDATORY)
    public int getEventQueueSize() {
        return this.eventExecutor.getWorkQueueSize();
    }

    @Probe(name = "eventsProcessed", level = ProbeLevel.MANDATORY)
    private long eventsProcessed() {
        return this.eventExecutor.processedCount();
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public EventRegistration registerLocalListener(@Nonnull String str, @Nonnull String str2, @Nonnull Object obj) {
        return registerLocalListener(str, str2, TrueEventFilter.INSTANCE, obj);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public EventRegistration registerLocalListener(@Nonnull String str, @Nonnull String str2, @Nonnull EventFilter eventFilter, @Nonnull Object obj) {
        return registerListener0(str, str2, eventFilter, obj, true);
    }

    private EventRegistration registerListener0(@Nonnull String str, @Nonnull String str2, @Nonnull EventFilter eventFilter, @Nonnull Object obj, boolean z) {
        Preconditions.checkNotNull(obj, "Null listener is not allowed!");
        Preconditions.checkNotNull(eventFilter, "Null filter is not allowed!");
        EventServiceSegment segment = getSegment(str, true);
        Registration registration = new Registration(UuidUtil.newUnsecureUUID(), str, str2, eventFilter, this.nodeEngine.getThisAddress(), obj, z);
        if (segment.addRegistration(str2, registration)) {
            return registration;
        }
        return null;
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public EventRegistration registerListener(@Nonnull String str, @Nonnull String str2, @Nonnull Object obj) {
        return (EventRegistration) FutureUtil.getValue(registerListenerAsync(str, str2, obj));
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public EventRegistration registerListener(@Nonnull String str, @Nonnull String str2, @Nonnull EventFilter eventFilter, @Nonnull Object obj) {
        return (EventRegistration) FutureUtil.getValue(registerListenerAsync(str, str2, eventFilter, obj));
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public CompletableFuture<EventRegistration> registerListenerAsync(@Nonnull String str, @Nonnull String str2, @Nonnull Object obj) {
        return registerListenerAsync(str, str2, TrueEventFilter.INSTANCE, obj);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public CompletableFuture<EventRegistration> registerListenerAsync(@Nonnull String str, @Nonnull String str2, @Nonnull EventFilter eventFilter, @Nonnull Object obj) {
        Registration registration = (Registration) registerListener0(str, str2, eventFilter, obj, false);
        if (registration == null) {
            InternalCompletableFuture.newCompletedFuture(null);
        }
        return invokeOnAllMembers(registration, new RegistrationOperationSupplier(registration, this.nodeEngine.getClusterService()));
    }

    private CompletableFuture<EventRegistration> invokeOnAllMembers(Registration registration, Supplier<Operation> supplier) {
        return InvocationUtil.invokeOnStableClusterSerial(this.nodeEngine, supplier, 100).thenApplyAsync(obj -> {
            return registration;
        }, ConcurrencyUtil.CALLER_RUNS);
    }

    private CompletableFuture<EventRegistration> invokeOnSubscriber(Registration registration, Supplier<Operation> supplier) {
        return this.nodeEngine.getOperationService().createInvocationBuilder(registration.getServiceName(), supplier.get(), registration.getSubscriber()).setTryCount(100).invoke().thenApplyAsync(obj -> {
            return registration;
        }, ConcurrencyUtil.CALLER_RUNS);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public boolean deregisterListener(@Nonnull String str, @Nonnull String str2, @Nonnull Object obj) {
        Boolean bool = (Boolean) FutureUtil.getValue(deregisterListenerAsync(str, str2, obj));
        return bool != null && bool.booleanValue();
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public CompletableFuture<Boolean> deregisterListenerAsync(@Nonnull String str, @Nonnull String str2, @Nonnull Object obj) {
        Registration removeRegistration;
        Preconditions.checkNotNull(str, "Null serviceName is not allowed!");
        Preconditions.checkNotNull(str2, "Null topic is not allowed!");
        Preconditions.checkNotNull(obj, "Null id is not allowed!");
        EventServiceSegment segment = getSegment(str, false);
        if (segment != null && (removeRegistration = segment.removeRegistration(str2, (UUID) obj)) != null) {
            return !removeRegistration.isLocalOnly() ? invokeOnAllMembers(removeRegistration, new DeregistrationOperationSupplier(removeRegistration, this.nodeEngine.getClusterService())).thenApplyAsync((v0) -> {
                return Objects.nonNull(v0);
            }, ConcurrencyUtil.CALLER_RUNS) : InternalCompletableFuture.newCompletedFuture(true);
        }
        return InternalCompletableFuture.newCompletedFuture(false);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public void deregisterAllLocalListeners(@Nonnull String str, @Nonnull String str2) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment != null) {
            segment.removeRegistrations(str2);
        }
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public void deregisterAllListeners(@Nonnull String str, @Nonnull String str2, int i) {
        EventServiceSegment segment = getSegment(str, false);
        Collection<Registration> removeRegistrations = segment == null ? null : segment.removeRegistrations(str2);
        ClusterService clusterService = this.nodeEngine.getClusterService();
        if (clusterService.getClusterVersion().isGreaterOrEqual(Versions.V5_3)) {
            InvocationUtil.invokeOnStableClusterSerial(this.nodeEngine, new DeregistrationOperationSupplier(str, str2, i, clusterService), 100);
            return;
        }
        if (removeRegistrations != null) {
            for (Registration registration : removeRegistrations) {
                if (!registration.isLocalOnly()) {
                    InvocationUtil.invokeOnStableClusterSerial(this.nodeEngine, new DeregistrationOperationSupplier(registration, clusterService), 100);
                }
            }
        }
    }

    public void cacheListener(Registration registration) {
        this.listenerCache.put(registration.getId(), registration.getListener());
    }

    public void handleRegistration(Registration registration) {
        if (isLocal(registration)) {
            Object remove = this.listenerCache.remove(registration.getId());
            if (remove == null) {
                return;
            } else {
                registration.setListener(remove);
            }
        }
        getSegment(registration.getServiceName(), true).addRegistration(registration.getTopic(), registration);
    }

    public EventRegistration handleAllRegistrations(Registration registration, int i) {
        RegistrationOperationSupplier registrationOperationSupplier = new RegistrationOperationSupplier(registration, i, this.nodeEngine.getClusterService());
        return (EventRegistration) FutureUtil.getValue(registration.isLocalOnly() ? invokeOnSubscriber(registration, registrationOperationSupplier) : invokeOnAllMembers(registration, registrationOperationSupplier));
    }

    public StripedExecutor getEventExecutor() {
        return this.eventExecutor;
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public EventRegistration[] getRegistrationsAsArray(@Nonnull String str, @Nonnull String str2) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment == null) {
            return EMPTY_REGISTRATIONS;
        }
        Collection<Registration> registrations = segment.getRegistrations(str2, false);
        return (registrations == null || registrations.isEmpty()) ? EMPTY_REGISTRATIONS : (EventRegistration[]) registrations.toArray(new Registration[0]);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public Collection<EventRegistration> getRegistrations(@Nonnull String str, @Nonnull String str2) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment == null) {
            return Collections.emptySet();
        }
        Collection<Registration> registrations = segment.getRegistrations(str2, false);
        return (registrations == null || registrations.isEmpty()) ? Collections.emptySet() : Collections.unmodifiableCollection(registrations);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public boolean hasEventRegistration(@Nonnull String str, @Nonnull String str2) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment == null) {
            return false;
        }
        return segment.hasRegistration(str2);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public void publishEvent(String str, String str2, Object obj, int i) {
        publishEvent(str, getRegistrations(str, str2), obj, i);
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public void publishEvent(String str, EventRegistration eventRegistration, Object obj, int i) {
        if (!(eventRegistration instanceof Registration)) {
            throw new IllegalArgumentException();
        }
        if (isLocal(eventRegistration)) {
            executeLocal(str, obj, eventRegistration, i);
        } else {
            sendEvent(eventRegistration.getSubscriber(), new EventEnvelope(eventRegistration.getId(), str, obj), i);
        }
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public void publishEvent(String str, Collection<EventRegistration> collection, Object obj, int i) {
        Data data = null;
        for (EventRegistration eventRegistration : collection) {
            if (!(eventRegistration instanceof Registration)) {
                throw new IllegalArgumentException();
            }
            if (isLocal(eventRegistration)) {
                executeLocal(str, obj, eventRegistration, i);
            } else {
                if (data == null) {
                    data = this.serializationService.toData(obj);
                }
                sendEvent(eventRegistration.getSubscriber(), new EventEnvelope(eventRegistration.getId(), str, data), i);
            }
        }
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public void publishRemoteEvent(String str, Collection<EventRegistration> collection, Object obj, int i) {
        if (collection.isEmpty()) {
            return;
        }
        Data data = this.serializationService.toData(obj);
        for (EventRegistration eventRegistration : collection) {
            if (!(eventRegistration instanceof Registration)) {
                throw new IllegalArgumentException();
            }
            if (!isLocal(eventRegistration)) {
                sendEvent(eventRegistration.getSubscriber(), new EventEnvelope(eventRegistration.getId(), str, data), i);
            }
        }
    }

    private void executeLocal(String str, Object obj, EventRegistration eventRegistration, int i) {
        if (this.nodeEngine.isRunning()) {
            Registration registration = (Registration) eventRegistration;
            try {
                if (registration.getListener() != null) {
                    this.eventExecutor.execute(new LocalEventDispatcher(this, str, obj, registration.getListener(), i, this.eventQueueTimeoutMs));
                } else {
                    this.logger.warning("Something seems wrong! Listener instance is null! -> " + registration);
                }
            } catch (RejectedExecutionException e) {
                this.rejectedCount.inc();
                if (this.eventExecutor.isLive()) {
                    logFailure("EventQueue overloaded! %s failed to publish to %s:%s", obj, registration.getServiceName(), registration.getTopic());
                }
            }
        }
    }

    private void sendEvent(Address address, EventEnvelope eventEnvelope, int i) {
        String serviceName = eventEnvelope.getServiceName();
        if (!(getSegment(serviceName, true).incrementPublish() % ((long) this.eventSyncFrequency) == 0)) {
            if (this.nodeEngine.getNode().getServer().getConnectionManager(EndpointQualifier.MEMBER).transmit(new Packet(this.serializationService.toBytes(eventEnvelope), i).setPacketType(Packet.Type.EVENT), address) || !this.nodeEngine.isRunning()) {
                return;
            }
            logFailure("Failed to send event packet to: %s, connection might not be alive.", address);
            return;
        }
        try {
            this.nodeEngine.getOperationService().createInvocationBuilder(serviceName, new SendEventOperation(eventEnvelope, i), address).setTryCount(50).invoke().get(this.sendEventSyncTimeoutMillis, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            this.syncDeliveryFailureCount.inc();
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Sync event delivery failed. Event: " + eventEnvelope, (Throwable) e);
            }
        }
    }

    public EventServiceSegment getSegment(@Nonnull String str, boolean z) {
        EventServiceSegment eventServiceSegment = this.segments.get(str);
        if (eventServiceSegment == null && z) {
            EventServiceSegment eventServiceSegment2 = new EventServiceSegment(str, this.nodeEngine.getService(str));
            EventServiceSegment putIfAbsent = this.segments.putIfAbsent(str, eventServiceSegment2);
            if (putIfAbsent == null) {
                eventServiceSegment = eventServiceSegment2;
                MetricsRegistry metricsRegistry = this.nodeEngine.getMetricsRegistry();
                metricsRegistry.registerStaticMetrics(metricsRegistry.newMetricDescriptor().withPrefix(MetricDescriptorConstants.EVENT_PREFIX).withDiscriminator(MetricDescriptorConstants.EVENT_DISCRIMINATOR_SERVICE, str), (MetricDescriptor) eventServiceSegment2);
            } else {
                eventServiceSegment = putIfAbsent;
            }
        }
        return eventServiceSegment;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isLocal(EventRegistration eventRegistration) {
        return this.nodeEngine.getThisAddress().equals(eventRegistration.getSubscriber());
    }

    @Override // com.hazelcast.spi.impl.eventservice.EventService
    public void executeEventCallback(@Nonnull Runnable runnable) {
        if (this.nodeEngine.isRunning()) {
            try {
                this.eventExecutor.execute(runnable);
            } catch (RejectedExecutionException e) {
                this.rejectedCount.inc();
                if (this.eventExecutor.isLive()) {
                    logFailure("EventQueue overloaded! Failed to execute event callback: %s", runnable);
                }
            }
        }
    }

    @Override // java.util.function.Consumer
    public void accept(Packet packet) {
        try {
            this.eventExecutor.execute(new RemoteEventProcessor(this, packet));
        } catch (RejectedExecutionException e) {
            this.rejectedCount.inc();
            if (this.eventExecutor.isLive()) {
                ServerConnection conn = packet.getConn();
                logFailure("EventQueue overloaded! Failed to process event packet sent from: %s", conn.getRemoteAddress() != null ? conn.getRemoteAddress().toString() : conn.toString());
            }
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.hazelcast.internal.services.PreJoinAwareService
    public OnJoinRegistrationOperation getPreJoinOperation() {
        return getOnJoinRegistrationOperation();
    }

    @Override // com.hazelcast.internal.services.PostJoinAwareService
    public Operation getPostJoinOperation() {
        return null;
    }

    private OnJoinRegistrationOperation getOnJoinRegistrationOperation() {
        LinkedList linkedList = new LinkedList();
        Iterator<EventServiceSegment> it = this.segments.values().iterator();
        while (it.hasNext()) {
            it.next().collectRemoteRegistrations(linkedList);
        }
        if (linkedList.isEmpty()) {
            return null;
        }
        return new OnJoinRegistrationOperation(linkedList);
    }

    public void shutdown() {
        this.logger.finest("Stopping event executor...");
        this.eventExecutor.shutdown();
        Iterator<EventServiceSegment> it = this.segments.values().iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        this.segments.clear();
    }

    public void onMemberLeft(MemberImpl memberImpl) {
        Address address = memberImpl.getAddress();
        Iterator<EventServiceSegment> it = this.segments.values().iterator();
        while (it.hasNext()) {
            it.next().onMemberLeft(address);
        }
    }

    private void logFailure(String str, Object... objArr) {
        this.totalFailures.inc();
        Level level = this.totalFailures.get() % 1000 == 0 ? Level.WARNING : Level.FINEST;
        if (this.logger.isLoggable(level)) {
            this.logger.log(level, String.format(str, objArr));
        }
    }
}
