/*
 * Decompiled with CFR 0.152.
 */
package org.terracotta.management.resource.services.events;

import java.io.IOException;
import java.security.Principal;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriInfo;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseBroadcaster;
import org.glassfish.jersey.server.ChunkedOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.management.ServiceLocator;
import org.terracotta.management.resource.events.EventEntityV2;
import org.terracotta.management.resource.services.events.EventServiceV2;
import org.terracotta.management.resource.services.events.TerracottaEventOutput;

@Path(value="/v2/events")
@Singleton
public class AllEventsResourceServiceImplV2 {
    private static final Logger LOG = LoggerFactory.getLogger(AllEventsResourceServiceImplV2.class);
    public static final int BATCH_SIZE = Integer.getInteger("TerracottaEventOutput.batch_size", 32);
    public static final long TIMER_INTERVAL = Long.getLong("TerracottaEventOutput.timer_interval", 917L);
    public static final long MAX_IDLE_KEEPALIVE = Long.getLong("TerracottaEventOutput.max_idle_keepalive", 57917L);
    private final EventServiceV2 eventService = (EventServiceV2)ServiceLocator.locate(EventServiceV2.class);
    private final Broadcaster broadcaster = new Broadcaster();
    private static final Timer flushTimer = new Timer("sse-flush-timer", true);

    public AllEventsResourceServiceImplV2() {
        LOG.debug("sse-flush-timer being used: {}", (Object)flushTimer);
        flushTimer.schedule(new TimerTask(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                LOG.debug("There are {} registered SSE event output(s), checking them", (Object)AllEventsResourceServiceImplV2.this.broadcaster.outputs.size());
                for (Map.Entry entry : AllEventsResourceServiceImplV2.this.broadcaster.outputs.entrySet()) {
                    TerracottaEventOutput output = (TerracottaEventOutput)((Object)entry.getKey());
                    TerracottaEventOutputFlushingMetadata metadata = (TerracottaEventOutputFlushingMetadata)entry.getValue();
                    long idleTime = metadata.accumulatedIdleTime.addAndGet(TIMER_INTERVAL);
                    int unflushedCount = metadata.unflushedCount.get();
                    if (unflushedCount > 0) {
                        LOG.debug("A SSE event output accumulated {} unflushed events during max interval, flushing it", (Object)unflushedCount);
                        try {
                            output.flush();
                            continue;
                        }
                        catch (Exception e) {
                            LOG.debug("Error flushing SSE from timer, closing event output", (Throwable)e);
                            AllEventsResourceServiceImplV2.this.broadcaster.close(output);
                            continue;
                        }
                        finally {
                            metadata.unflushedCount.addAndGet(-unflushedCount);
                            continue;
                        }
                    }
                    if (idleTime >= MAX_IDLE_KEEPALIVE) {
                        LOG.debug("A SSE event output has been idle for too long {}, closing it", (Object)idleTime);
                        AllEventsResourceServiceImplV2.this.broadcaster.close(output);
                        continue;
                    }
                    LOG.debug("A SSE event output accumulated 0 event during flush interval");
                }
            }
        }, TIMER_INTERVAL, TIMER_INTERVAL);
    }

    @GET
    @Produces(value={"text/event-stream"})
    public TerracottaEventOutput getServerSentEvents(@Context UriInfo info, @QueryParam(value="localOnly") boolean localOnly, @Context HttpServletRequest request, @Context HttpServletResponse response) {
        Principal principal = request.getUserPrincipal();
        String userName = principal != null ? principal.getName() : "tc_no_security_ctxt";
        EventServiceListener eventOutput = new EventServiceListener(userName);
        LOG.debug("Invoking AllEventsResourceServiceImplV2.getServerSentEvents: info={}, localOnly={}, user={}", new Object[]{info.getRequestUri(), localOnly, userName});
        this.broadcaster.add(eventOutput);
        this.eventService.registerEventListener((EventServiceV2.EventListener)eventOutput, localOnly);
        return eventOutput;
    }

    private static class TerracottaEventOutputFlushingMetadata {
        final AtomicInteger unflushedCount = new AtomicInteger();
        final AtomicLong accumulatedIdleTime = new AtomicLong();

        private TerracottaEventOutputFlushingMetadata() {
        }
    }

    public class EventServiceListener
    extends TerracottaEventOutput
    implements EventServiceV2.EventListener {
        private final String userName;

        public EventServiceListener(String userName) {
            this.userName = userName;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void write(OutboundEvent chunk) throws IOException {
            if (this.isClosed()) {
                throw new IOException("closed");
            }
            TerracottaEventOutputFlushingMetadata metadata = (TerracottaEventOutputFlushingMetadata)AllEventsResourceServiceImplV2.this.broadcaster.outputs.get((Object)this);
            metadata.accumulatedIdleTime.set(0L);
            int unflushedCount = metadata.unflushedCount.incrementAndGet();
            try {
                super.write((Object)chunk);
            }
            finally {
                if (unflushedCount == BATCH_SIZE) {
                    LOG.debug("A SSE event output reached {} unflushed events, flushing it", (Object)unflushedCount);
                    metadata.unflushedCount.addAndGet(-unflushedCount);
                    super.flush();
                } else {
                    LOG.debug("A SSE event output accumulating {} unflushed events", (Object)unflushedCount);
                }
            }
        }

        public void onEvent(EventEntityV2 eventEntity) {
            OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
            eventBuilder.reconnectDelay(100L);
            eventBuilder.mediaType(MediaType.APPLICATION_JSON_TYPE);
            eventBuilder.name(EventEntityV2.class.getSimpleName());
            eventBuilder.data(EventEntityV2.class, (Object)eventEntity);
            OutboundEvent event = eventBuilder.build();
            try {
                this.write(event);
            }
            catch (Exception e) {
                this.onError(e);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Event dispatched: {AgentId: %s, Type: %s, ApiVersion: %s, Representables: %s}", eventEntity.getAgentId(), eventEntity.getType(), eventEntity.getApiVersion(), eventEntity.getRootRepresentables()));
            }
        }

        public void onError(Throwable throwable) {
            LOG.debug("Error when waiting for management events.", throwable);
            try {
                AllEventsResourceServiceImplV2.this.broadcaster.close(this);
            }
            catch (Exception e) {
                LOG.debug("Error closing SSE event output", (Throwable)e);
            }
        }

        public String getUsername() {
            return this.userName;
        }

        public String toString() {
            return ((Object)((Object)this)).getClass().getName() + "@" + Integer.toHexString(this.hashCode());
        }
    }

    private class Broadcaster
    extends SseBroadcaster {
        private final Map<TerracottaEventOutput, TerracottaEventOutputFlushingMetadata> outputs = new ConcurrentHashMap<TerracottaEventOutput, TerracottaEventOutputFlushingMetadata>();

        private Broadcaster() {
        }

        public void onException(ChunkedOutput<OutboundEvent> chunkedOutput, Exception exception) {
            LOG.debug("Error writing to OutputEvent", (Throwable)exception);
            this.close(chunkedOutput);
        }

        public <OUT extends ChunkedOutput<OutboundEvent>> boolean add(OUT chunkedOutput) {
            this.outputs.put((TerracottaEventOutput)chunkedOutput, new TerracottaEventOutputFlushingMetadata());
            return super.add(chunkedOutput);
        }

        public void onClose(ChunkedOutput<OutboundEvent> chunkedOutput) {
            this.outputs.remove(chunkedOutput);
            AllEventsResourceServiceImplV2.this.eventService.unregisterEventListener((EventServiceV2.EventListener)((EventServiceListener)chunkedOutput));
        }

        public void close(ChunkedOutput<OutboundEvent> chunkedOutput) {
            try {
                if (!chunkedOutput.isClosed()) {
                    chunkedOutput.close();
                }
            }
            catch (Exception e) {
                LOG.debug("Error closing SSE event output from timer", (Throwable)e);
            }
            finally {
                this.onClose(chunkedOutput);
                this.remove(chunkedOutput);
            }
        }
    }
}

