/*
 * Decompiled with CFR 0.152.
 */
package io.seata.server.cluster.manager;

import io.seata.common.thread.NamedThreadFactory;
import io.seata.server.cluster.listener.ClusterChangeEvent;
import io.seata.server.cluster.listener.ClusterChangeListener;
import io.seata.server.cluster.watch.Watcher;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class ClusterWatcherManager
implements ClusterChangeListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final Map<String, Queue<Watcher<?>>> WATCHERS = new ConcurrentHashMap();
    private static final Map<String, Long> GROUP_UPDATE_TIME = new ConcurrentHashMap<String, Long>();
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("long-polling", 1));

    @PostConstruct
    public void init() {
        this.scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            for (String group : WATCHERS.keySet()) {
                Optional.ofNullable(WATCHERS.remove(group)).ifPresent(watchers -> watchers.parallelStream().forEach(watcher -> {
                    if (System.currentTimeMillis() >= watcher.getTimeout()) {
                        HttpServletResponse httpServletResponse = (HttpServletResponse)((AsyncContext)watcher.getAsyncContext()).getResponse();
                        watcher.setDone(true);
                        httpServletResponse.setStatus(304);
                        ((AsyncContext)watcher.getAsyncContext()).complete();
                    }
                    if (!watcher.isDone()) {
                        this.registryWatcher((Watcher<?>)watcher);
                    }
                }));
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    @EventListener
    @Async
    public void onChangeEvent(ClusterChangeEvent event) {
        if (event.getTerm() > 0L) {
            GROUP_UPDATE_TIME.put(event.getGroup(), event.getTerm());
            Optional.ofNullable(WATCHERS.remove(event.getGroup())).ifPresent(watchers -> watchers.parallelStream().forEach(this::notify));
        }
    }

    private void notify(Watcher<?> watcher) {
        AsyncContext asyncContext = (AsyncContext)watcher.getAsyncContext();
        HttpServletResponse httpServletResponse = (HttpServletResponse)asyncContext.getResponse();
        watcher.setDone(true);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("notify cluster change event to: {}", (Object)asyncContext.getRequest().getRemoteAddr());
        }
        httpServletResponse.setStatus(200);
        asyncContext.complete();
    }

    public void registryWatcher(Watcher<?> watcher) {
        String group = watcher.getGroup();
        Long term = GROUP_UPDATE_TIME.get(group);
        if (term == null || watcher.getTerm() >= term) {
            WATCHERS.computeIfAbsent(group, value -> new ConcurrentLinkedQueue()).add(watcher);
        } else {
            this.notify(watcher);
        }
    }
}

