package com.alibaba.nacos.common.trace.publisher;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.ShardedEventPublisher;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ThreadUtils;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/nacos/common/trace/publisher/TraceEventPublisher.class */
public class TraceEventPublisher extends Thread implements ShardedEventPublisher {
    private static final String THREAD_NAME = "trace.publisher-";
    private static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.trace.publisher");
    private static final int DEFAULT_WAIT_TIME = 60;
    private final Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes = new ConcurrentHashMap();
    private volatile boolean initialized = false;
    private volatile boolean shutdown = false;
    private int queueMaxSize = -1;
    private BlockingQueue<Event> queue;
    private String publisherName;

    @Override // com.alibaba.nacos.common.notify.EventPublisher
    public void init(Class<? extends Event> cls, int i) {
        this.queueMaxSize = i;
        this.queue = new ArrayBlockingQueue(i);
        this.publisherName = cls.getSimpleName();
        super.setName(THREAD_NAME + this.publisherName);
        super.setDaemon(true);
        super.start();
        this.initialized = true;
    }

    @Override // com.alibaba.nacos.common.notify.EventPublisher
    public long currentEventSize() {
        return this.queue.size();
    }

    @Override // com.alibaba.nacos.common.notify.EventPublisher
    public void addSubscriber(Subscriber subscriber) {
        addSubscriber(subscriber, subscriber.subscribeType());
    }

    @Override // com.alibaba.nacos.common.notify.ShardedEventPublisher
    public void addSubscriber(Subscriber subscriber, Class<? extends Event> cls) {
        this.subscribes.computeIfAbsent(cls, cls2 -> {
            return new ConcurrentHashSet();
        }).add(subscriber);
    }

    @Override // com.alibaba.nacos.common.notify.EventPublisher
    public void removeSubscriber(Subscriber subscriber) {
        removeSubscriber(subscriber, subscriber.subscribeType());
    }

    @Override // com.alibaba.nacos.common.notify.ShardedEventPublisher
    public void removeSubscriber(Subscriber subscriber, Class<? extends Event> cls) {
        this.subscribes.computeIfPresent(cls, (cls2, set) -> {
            set.remove(subscriber);
            if (set.isEmpty()) {
                return null;
            }
            return set;
        });
    }

    @Override // com.alibaba.nacos.common.notify.EventPublisher
    public boolean publish(Event event) {
        checkIsStart();
        if (this.queue.offer(event)) {
            return true;
        }
        LOGGER.warn("Trace Event Publish failed, event : {}, publish queue size : {}", event, Long.valueOf(currentEventSize()));
        return true;
    }

    @Override // com.alibaba.nacos.common.notify.EventPublisher
    public void notifySubscriber(Subscriber subscriber, Event event) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
        }
        Runnable runnable = () -> {
            subscriber.onEvent(event);
        };
        Executor executor = subscriber.executor();
        if (executor != null) {
            executor.execute(runnable);
            return;
        }
        try {
            runnable.run();
        } catch (Throwable th) {
            LOGGER.error("Event callback exception: ", th);
        }
    }

    @Override // com.alibaba.nacos.common.lifecycle.Closeable
    public void shutdown() throws NacosException {
        this.shutdown = true;
        this.queue.clear();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            waitSubscriberForInit();
            handleEvents();
        } catch (Exception e) {
            LOGGER.error("Trace Event Publisher {}, stop to handle event due to unexpected exception: ", this.publisherName, e);
        }
    }

    private void waitSubscriberForInit() {
        for (int i = DEFAULT_WAIT_TIME; i > 0 && !this.shutdown && this.subscribes.isEmpty(); i--) {
            ThreadUtils.sleep(1000L);
        }
    }

    private void handleEvents() {
        while (!this.shutdown) {
            try {
                handleEvent(this.queue.take());
            } catch (InterruptedException e) {
                LOGGER.warn("Trace Event Publisher {} take event from queue failed:", this.publisherName, e);
                Thread.currentThread().interrupt();
            }
        }
    }

    private void handleEvent(Event event) {
        Class<?> cls = event.getClass();
        Set<Subscriber<? extends Event>> set = this.subscribes.get(cls);
        if (null == set) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[NotifyCenter] No subscribers for slow event {}", cls.getName());
            }
        } else {
            Iterator<Subscriber<? extends Event>> it = set.iterator();
            while (it.hasNext()) {
                notifySubscriber(it.next(), event);
            }
        }
    }

    void checkIsStart() {
        if (!this.initialized) {
            throw new IllegalStateException("Publisher does not start");
        }
    }

    public String getStatus() {
        return String.format("Publisher %-30s: shutdown=%5s, queue=%7d/%-7d", this.publisherName, Boolean.valueOf(this.shutdown), Long.valueOf(currentEventSize()), Integer.valueOf(this.queueMaxSize));
    }
}
