/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.emitter.graphite;

import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteSender;
import com.codahale.metrics.graphite.PickledGraphite;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.SocketException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.druid.emitter.graphite.DruidToGraphiteEventConverter;
import org.apache.druid.emitter.graphite.GraphiteEmitterConfig;
import org.apache.druid.emitter.graphite.GraphiteEvent;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;

public class GraphiteEmitter
implements Emitter {
    private static Logger log = new Logger(GraphiteEmitter.class);
    private final DruidToGraphiteEventConverter graphiteEventConverter;
    private final GraphiteEmitterConfig graphiteEmitterConfig;
    private final List<Emitter> alertEmitters;
    private final List<Emitter> requestLogEmitters;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
    private static final long FLUSH_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1L);
    private static final Pattern DOT_OR_WHITESPACE_PATTERN = Pattern.compile("[\\s]+|[.]+");
    private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("GraphiteEmitter-%s").build());
    private AtomicLong countLostEvents = new AtomicLong(0L);

    public GraphiteEmitter(GraphiteEmitterConfig graphiteEmitterConfig, List<Emitter> alertEmitters, List<Emitter> requestLogEmitters) {
        this.alertEmitters = alertEmitters;
        this.requestLogEmitters = requestLogEmitters;
        this.graphiteEmitterConfig = graphiteEmitterConfig;
        this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter();
        this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        log.info("Starting Graphite Emitter.", new Object[0]);
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            if (!this.started.get()) {
                this.exec.scheduleAtFixedRate(new ConsumerRunnable(), this.graphiteEmitterConfig.getFlushPeriod(), this.graphiteEmitterConfig.getFlushPeriod(), TimeUnit.MILLISECONDS);
                this.started.set(true);
            }
        }
    }

    public void emit(Event event) {
        if (!this.started.get()) {
            throw new ISE("Emit called unexpectedly before service start", new Object[0]);
        }
        if (event instanceof ServiceMetricEvent) {
            GraphiteEvent graphiteEvent = this.graphiteEventConverter.druidEventToGraphite((ServiceMetricEvent)event);
            if (graphiteEvent == null) {
                return;
            }
            try {
                boolean isSuccessful = this.eventsQueue.offer(graphiteEvent, this.graphiteEmitterConfig.getEmitWaitTime(), TimeUnit.MILLISECONDS);
                if (!isSuccessful && this.countLostEvents.getAndIncrement() % 1000L == 0L) {
                    log.error("Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency", new Object[]{this.countLostEvents.get()});
                }
            }
            catch (InterruptedException e) {
                log.error((Throwable)e, "got interrupted with message [%s]", new Object[]{e.getMessage()});
                Thread.currentThread().interrupt();
            }
        } else if (event instanceof RequestLogEvent) {
            for (Emitter emitter : this.requestLogEmitters) {
                emitter.emit(event);
            }
        } else if (!this.alertEmitters.isEmpty() && event instanceof AlertEvent) {
            for (Emitter emitter : this.alertEmitters) {
                emitter.emit(event);
            }
        } else if (event instanceof AlertEvent) {
            AlertEvent alertEvent = (AlertEvent)event;
            log.error("The following alert is dropped, description is [%s], severity is [%s]", new Object[]{alertEvent.getDescription(), alertEvent.getSeverity()});
        } else if (!(event instanceof SegmentMetadataEvent)) {
            log.error("unknown event type [%s]", new Object[]{event.getClass()});
        }
    }

    public void flush() {
        if (this.started.get()) {
            ScheduledFuture<?> future = this.exec.schedule(new ConsumerRunnable(), 0L, TimeUnit.MILLISECONDS);
            try {
                future.get(FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                if (e instanceof InterruptedException) {
                    throw new RuntimeException("interrupted flushing elements from queue", e);
                }
                log.error((Throwable)e, e.getMessage(), new Object[0]);
            }
        }
    }

    public void close() {
        this.flush();
        this.started.set(false);
        this.exec.shutdown();
    }

    protected static String sanitize(String namespace) {
        return GraphiteEmitter.sanitize(namespace, false);
    }

    protected static String sanitize(String namespace, Boolean replaceSlashToDot) {
        String sanitizedNamespace = DOT_OR_WHITESPACE_PATTERN.matcher(namespace).replaceAll("_");
        if (replaceSlashToDot.booleanValue()) {
            sanitizedNamespace = sanitizedNamespace.replace('/', '.');
        }
        return sanitizedNamespace;
    }

    private class ConsumerRunnable
    implements Runnable {
        private final GraphiteSender graphite;

        public ConsumerRunnable() {
            this.graphite = GraphiteEmitter.this.graphiteEmitterConfig.getProtocol().equals("plaintext") ? new Graphite(GraphiteEmitter.this.graphiteEmitterConfig.getHostname(), GraphiteEmitter.this.graphiteEmitterConfig.getPort()) : new PickledGraphite(GraphiteEmitter.this.graphiteEmitterConfig.getHostname(), GraphiteEmitter.this.graphiteEmitterConfig.getPort(), GraphiteEmitter.this.graphiteEmitterConfig.getBatchSize());
            log.info("Using %s protocol.", new Object[]{GraphiteEmitter.this.graphiteEmitterConfig.getProtocol()});
        }

        @Override
        public void run() {
            block7: {
                try {
                    if (!this.graphite.isConnected()) {
                        log.info("trying to connect to graphite server", new Object[0]);
                        this.graphite.connect();
                    }
                    while (GraphiteEmitter.this.eventsQueue.size() > 0 && !GraphiteEmitter.this.exec.isShutdown()) {
                        try {
                            GraphiteEvent graphiteEvent = GraphiteEmitter.this.eventsQueue.poll(GraphiteEmitter.this.graphiteEmitterConfig.getWaitForEventTime(), TimeUnit.MILLISECONDS);
                            if (graphiteEvent == null) continue;
                            log.debug("sent [%s] with value [%s] and time [%s]", new Object[]{graphiteEvent.getEventPath(), graphiteEvent.getValue(), graphiteEvent.getTimestamp()});
                            this.graphite.send(graphiteEvent.getEventPath(), graphiteEvent.getValue(), graphiteEvent.getTimestamp());
                        }
                        catch (IOException | InterruptedException e) {
                            log.error((Throwable)e, e.getMessage(), new Object[0]);
                            if (e instanceof InterruptedException) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                            if (!(e instanceof SocketException)) continue;
                            this.graphite.close();
                            log.warn("Trying to re-connect to graphite server", new Object[0]);
                            this.graphite.connect();
                        }
                    }
                }
                catch (Exception e) {
                    log.error((Throwable)e, e.getMessage(), new Object[0]);
                    if (!(e instanceof InterruptedException)) break block7;
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

