/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.app.throughput.sink;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.app.throughput.sink.ThroughputSinkProperties;
import org.springframework.cloud.stream.app.throughput.sink.TimeUnit;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.Lifecycle;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;

@EnableBinding(value={Sink.class})
@EnableConfigurationProperties(value={ThroughputSinkProperties.class})
public class ThroughputSinkConfiguration
implements Lifecycle {
    private final Log logger = LogFactory.getLog(this.getClass());
    private final AtomicLong counter = new AtomicLong();
    private final AtomicLong start = new AtomicLong(-1L);
    private final AtomicLong bytes = new AtomicLong(-1L);
    private final AtomicLong intermediateCounter = new AtomicLong();
    private final AtomicLong intermediateBytes = new AtomicLong();
    private final TimeUnit timeUnit = TimeUnit.s;
    private final ExecutorService executorService = Executors.newFixedThreadPool(1);
    private volatile boolean running;
    private volatile boolean reportBytes = false;
    @Autowired
    private volatile ThroughputSinkProperties properties;

    public void start() {
        this.running = true;
    }

    public void stop() {
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ServiceActivator(inputChannel="input")
    public void throughputSink(Message<?> message) {
        if (this.start.get() == -1L) {
            AtomicLong atomicLong = this.start;
            synchronized (atomicLong) {
                if (this.start.get() == -1L) {
                    Object payload = message.getPayload();
                    if (payload instanceof byte[] || payload instanceof String) {
                        this.reportBytes = true;
                    }
                    this.start.set(System.currentTimeMillis());
                    this.executorService.execute(new ReportStats());
                }
            }
        }
        this.intermediateCounter.incrementAndGet();
        if (this.reportBytes) {
            Object payload = message.getPayload();
            if (payload instanceof byte[]) {
                this.intermediateBytes.addAndGet(((byte[])payload).length);
            } else if (payload instanceof String) {
                this.intermediateBytes.addAndGet(((String)payload).getBytes().length);
            }
        }
    }

    private class ReportStats
    implements Runnable {
        private ReportStats() {
        }

        @Override
        public void run() {
            int reportEveryMs = ThroughputSinkConfiguration.this.properties.getReportEveryMs();
            while (ThroughputSinkConfiguration.this.isRunning()) {
                long intervalStart = System.currentTimeMillis();
                try {
                    Thread.sleep(reportEveryMs);
                    long timeNow = System.currentTimeMillis();
                    long currentCounter = ThroughputSinkConfiguration.this.intermediateCounter.getAndSet(0L);
                    long currentBytes = ThroughputSinkConfiguration.this.intermediateBytes.getAndSet(0L);
                    long totalCounter = ThroughputSinkConfiguration.this.counter.addAndGet(currentCounter);
                    long totalBytes = ThroughputSinkConfiguration.this.bytes.addAndGet(currentBytes);
                    ThroughputSinkConfiguration.this.logger.info((Object)String.format("Messages: %10d in %5.2f%s = %11.2f/s", new Object[]{currentCounter, (double)(timeNow - intervalStart) / 1000.0, ThroughputSinkConfiguration.this.timeUnit, (double)currentCounter * 1000.0 / (double)reportEveryMs}));
                    ThroughputSinkConfiguration.this.logger.info((Object)String.format("Messages: %10d in %5.2f%s = %11.2f/s", new Object[]{totalCounter, (double)(timeNow - ThroughputSinkConfiguration.this.start.get()) / 1000.0, ThroughputSinkConfiguration.this.timeUnit, (double)totalCounter * 1000.0 / (double)(timeNow - ThroughputSinkConfiguration.this.start.get())}));
                    if (!ThroughputSinkConfiguration.this.reportBytes) continue;
                    ThroughputSinkConfiguration.this.logger.info((Object)String.format("Throughput: %12d in %5.2f%s = %11.2fMB/s, ", new Object[]{currentBytes, (double)(timeNow - intervalStart) / 1000.0, ThroughputSinkConfiguration.this.timeUnit, (double)currentBytes / 1048576.0 * 1000.0 / (double)reportEveryMs}));
                    ThroughputSinkConfiguration.this.logger.info((Object)String.format("Throughput: %12d in %5.2f%s = %11.2fMB/s", new Object[]{totalBytes, (double)(timeNow - ThroughputSinkConfiguration.this.start.get()) / 1000.0, ThroughputSinkConfiguration.this.timeUnit, (double)totalBytes / 1048576.0 * 1000.0 / (double)(timeNow - ThroughputSinkConfiguration.this.start.get())}));
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    ThroughputSinkConfiguration.this.logger.warn((Object)"Thread interrupted", (Throwable)e);
                    return;
                }
            }
        }
    }
}

