/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.appbroker.logging.streaming;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.cloudfoundry.client.CloudFoundryClient;
import org.cloudfoundry.client.v2.applications.GetApplicationRequest;
import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.EnvelopeType;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.ReadResponse;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider;
import org.springframework.cloud.appbroker.logging.LoggingUtils;
import org.springframework.cloud.appbroker.logging.streaming.LogStreamPublisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class LogCacheStreamPublisher
implements LogStreamPublisher<org.cloudfoundry.dropsonde.events.Envelope> {
    private static final Logger LOG = LoggerFactory.getLogger(LogCacheStreamPublisher.class);
    private final CloudFoundryClient client;
    private final LogCacheClient logCacheClient;
    private final ApplicationIdsProvider applicationIdsProvider;

    public LogCacheStreamPublisher(CloudFoundryClient client, LogCacheClient logCacheClient, ApplicationIdsProvider applicationIdsProvider) {
        this.client = client;
        this.logCacheClient = logCacheClient;
        this.applicationIdsProvider = applicationIdsProvider;
    }

    @Override
    public Flux<org.cloudfoundry.dropsonde.events.Envelope> getLogStream(String serviceInstanceId) {
        return this.applicationIdsProvider.getApplicationIds(serviceInstanceId).doOnNext(id -> LOG.debug("Starting log streaming for app with ID {}", id)).flatMap(this::createApplicationStreamer);
    }

    protected Flux<org.cloudfoundry.dropsonde.events.Envelope> createApplicationStreamer(String applicationId) {
        return this.getApplicationName(applicationId).flatMapMany(appName -> {
            long initialStartTime = Instant.now().minus(5L, ChronoUnit.SECONDS).toEpochMilli() * 1000000L;
            return this.readLogCache(applicationId, initialStartTime).flatMapMany(initialResponse -> {
                AtomicLong lastTimestamp = LogCacheStreamPublisher.getLastTimestamp(initialResponse, initialStartTime);
                Flux<org.cloudfoundry.dropsonde.events.Envelope> initialLogs = LogCacheStreamPublisher.convertEnvelopesToDropsonde(initialResponse);
                Flux polledLogs = Flux.interval((Duration)Duration.ofSeconds(1L)).flatMap(tick -> this.readLogCache(applicationId, lastTimestamp.get() + 1L).flatMapMany(readResponse -> {
                    LogCacheStreamPublisher.updateLastTimestampFromResponse(readResponse, lastTimestamp);
                    return LogCacheStreamPublisher.convertEnvelopesToDropsonde(readResponse);
                })).onErrorResume(error -> {
                    LOG.error("Error during log polling", error);
                    return Flux.empty();
                });
                return Flux.merge((Publisher[])new Publisher[]{initialLogs, polledLogs}).retryWhen((Retry)Retry.backoff((long)3L, (Duration)Duration.ofSeconds(5L))).doOnError(error -> LOG.error("Streaming error", error));
            });
        });
    }

    private static AtomicLong getLastTimestamp(ReadResponse initialResponse, long initialStartTime) {
        return new AtomicLong(initialResponse.getEnvelopes().getBatch().stream().mapToLong(Envelope::getTimestamp).max().orElse(initialStartTime));
    }

    private static Flux<org.cloudfoundry.dropsonde.events.Envelope> convertEnvelopesToDropsonde(ReadResponse readResponse) {
        return Flux.fromIterable((Iterable)readResponse.getEnvelopes().getBatch()).map(LoggingUtils::convertLogCacheEnvelopeToDropsonde);
    }

    private Mono<String> getApplicationName(String applicationId) {
        return this.client.applicationsV2().get(GetApplicationRequest.builder().applicationId(applicationId).build()).map(response -> response.getEntity().getName());
    }

    private Mono<ReadResponse> readLogCache(String applicationId, long lastTimestamp) {
        return this.logCacheClient.read(ReadRequest.builder().sourceId(applicationId).envelopeTypes(new EnvelopeType[]{EnvelopeType.LOG}).startTime(Long.valueOf(lastTimestamp)).build());
    }

    private static void updateLastTimestampFromResponse(ReadResponse readResponse, AtomicLong lastTimestamp) {
        long maxTimestamp = readResponse.getEnvelopes().getBatch().stream().mapToLong(Envelope::getTimestamp).max().orElse(lastTimestamp.get());
        lastTimestamp.set(maxTimestamp);
    }
}

