/*
 * Decompiled with CFR 0.152.
 */
package com.launchdarkly.sdk.server;

import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.logging.LogValues;
import com.launchdarkly.sdk.server.StreamProcessorEvents;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataSource;
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
import com.launchdarkly.sdk.server.subsystems.SerializationException;
import com.launchdarkly.shaded.com.google.common.annotations.VisibleForTesting;
import com.launchdarkly.shaded.com.google.gson.JsonParseException;
import com.launchdarkly.shaded.com.google.gson.stream.JsonReader;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.ConnectionErrorHandler;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.UnsuccessfulResponseException;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.events.DiagnosticStore;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.http.HttpErrors;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.http.HttpHelpers;
import com.launchdarkly.shaded.com.launchdarkly.sdk.internal.http.HttpProperties;
import com.launchdarkly.shaded.okhttp3.Headers;
import com.launchdarkly.shaded.okhttp3.OkHttpClient;
import java.io.IOException;
import java.io.Reader;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

final class StreamProcessor
implements DataSource {
    private static final String PUT = "put";
    private static final String PATCH = "patch";
    private static final String DELETE = "delete";
    private static final Duration DEAD_CONNECTION_INTERVAL = Duration.ofSeconds(300L);
    private static final String ERROR_CONTEXT_MESSAGE = "in stream connection";
    private static final String WILL_RETRY_MESSAGE = "will retry";
    private final DataSourceUpdateSink dataSourceUpdates;
    private final HttpProperties httpProperties;
    private final Headers headers;
    @VisibleForTesting
    final URI streamUri;
    @VisibleForTesting
    final Duration initialReconnectDelay;
    private final DiagnosticStore diagnosticAccumulator;
    private final int threadPriority;
    private final DataStoreStatusProvider.StatusListener statusListener;
    private volatile EventSource es;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private volatile long esStarted = 0L;
    private volatile boolean lastStoreUpdateFailed = false;
    private final LDLogger logger;
    ConnectionErrorHandler connectionErrorHandler = this.createDefaultConnectionErrorHandler();

    StreamProcessor(HttpProperties httpProperties, DataSourceUpdateSink dataSourceUpdates, int threadPriority, DiagnosticStore diagnosticAccumulator, URI streamUri, Duration initialReconnectDelay, LDLogger logger) {
        this.dataSourceUpdates = dataSourceUpdates;
        this.httpProperties = httpProperties;
        this.diagnosticAccumulator = diagnosticAccumulator;
        this.threadPriority = threadPriority;
        this.streamUri = streamUri;
        this.initialReconnectDelay = initialReconnectDelay;
        this.logger = logger;
        this.headers = httpProperties.toHeadersBuilder().add("Accept", "text/event-stream").build();
        if (dataSourceUpdates.getDataStoreStatusProvider() != null && dataSourceUpdates.getDataStoreStatusProvider().isStatusMonitoringEnabled()) {
            this.statusListener = this::onStoreStatusChanged;
            dataSourceUpdates.getDataStoreStatusProvider().addStatusListener(this.statusListener);
        } else {
            this.statusListener = null;
        }
    }

    private void onStoreStatusChanged(DataStoreStatusProvider.Status newStatus) {
        EventSource stream;
        if (newStatus.isAvailable() && newStatus.isRefreshNeeded() && (stream = this.es) != null) {
            this.logger.warn("Restarting stream to refresh data after data store outage");
            stream.restart();
        }
    }

    private ConnectionErrorHandler createDefaultConnectionErrorHandler() {
        return t -> {
            this.recordStreamInit(true);
            if (t instanceof UnsuccessfulResponseException) {
                int status = ((UnsuccessfulResponseException)t).getCode();
                DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(status);
                boolean recoverable = HttpErrors.checkIfErrorIsRecoverableAndLog(this.logger, HttpErrors.httpErrorDescription(status), ERROR_CONTEXT_MESSAGE, status, WILL_RETRY_MESSAGE);
                if (recoverable) {
                    this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
                    this.esStarted = System.currentTimeMillis();
                    return ConnectionErrorHandler.Action.PROCEED;
                }
                this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.OFF, errorInfo);
                return ConnectionErrorHandler.Action.SHUTDOWN;
            }
            HttpErrors.checkIfErrorIsRecoverableAndLog(this.logger, t.toString(), ERROR_CONTEXT_MESSAGE, 0, WILL_RETRY_MESSAGE);
            DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromException(t instanceof IOException ? DataSourceStatusProvider.ErrorKind.NETWORK_ERROR : DataSourceStatusProvider.ErrorKind.UNKNOWN, t);
            this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
            return ConnectionErrorHandler.Action.PROCEED;
        };
    }

    @Override
    public Future<Void> start() {
        CompletableFuture<Void> initFuture = new CompletableFuture<Void>();
        ConnectionErrorHandler wrappedConnectionErrorHandler = t -> {
            ConnectionErrorHandler.Action result = this.connectionErrorHandler.onConnectionError(t);
            if (result == ConnectionErrorHandler.Action.SHUTDOWN) {
                initFuture.complete(null);
            }
            return result;
        };
        StreamEventHandler handler = new StreamEventHandler(initFuture);
        URI endpointUri = HttpHelpers.concatenateUriPath(this.streamUri, "/all");
        EventSource.Builder builder = new EventSource.Builder((EventHandler)handler, endpointUri).threadPriority(this.threadPriority).logger(this.logger).readBufferSize(5000).streamEventData(true).expectFields("event").clientBuilderActions(new EventSource.Builder.ClientConfigurer(){

            @Override
            public void configure(OkHttpClient.Builder clientBuilder) {
                StreamProcessor.this.httpProperties.applyToHttpClientBuilder(clientBuilder);
            }
        }).connectionErrorHandler(wrappedConnectionErrorHandler).headers(this.headers).reconnectTime(this.initialReconnectDelay.toMillis(), TimeUnit.MILLISECONDS).readTimeout(DEAD_CONNECTION_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
        this.es = builder.build();
        this.esStarted = System.currentTimeMillis();
        this.es.start();
        return initFuture;
    }

    private void recordStreamInit(boolean failed) {
        if (this.diagnosticAccumulator != null && this.esStarted != 0L) {
            this.diagnosticAccumulator.recordStreamInit(this.esStarted, System.currentTimeMillis() - this.esStarted, failed);
        }
    }

    @Override
    public void close() throws IOException {
        this.logger.info("Closing LaunchDarkly StreamProcessor");
        if (this.statusListener != null) {
            this.dataSourceUpdates.getDataStoreStatusProvider().removeStatusListener(this.statusListener);
        }
        if (this.es != null) {
            this.es.close();
        }
        this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.OFF, null);
    }

    @Override
    public boolean isInitialized() {
        return this.initialized.get();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static <T> T parseStreamJson(Function<JsonReader, T> parser, Reader r) throws StreamInputException {
        try (JsonReader jr = new JsonReader(r);){
            JsonReader t = parser.apply(jr);
            return (T)t;
        }
        catch (JsonParseException e) {
            throw new StreamInputException(e);
        }
        catch (SerializationException e) {
            throw new StreamInputException(e);
        }
        catch (IOException e) {
            throw new StreamInputException(e);
        }
    }

    private static final class StreamStoreException
    extends Exception {
        private StreamStoreException() {
        }
    }

    private static final class StreamInputException
    extends Exception {
        public StreamInputException(Throwable cause) {
            super(cause);
        }
    }

    private class StreamEventHandler
    implements EventHandler {
        private final CompletableFuture<Void> initFuture;

        StreamEventHandler(CompletableFuture<Void> initFuture) {
            this.initFuture = initFuture;
        }

        @Override
        public void onOpen() throws Exception {
        }

        @Override
        public void onClosed() throws Exception {
        }

        @Override
        public void onMessage(String eventName, MessageEvent event) throws Exception {
            try {
                switch (eventName) {
                    case "put": {
                        this.handlePut(event.getDataReader());
                        break;
                    }
                    case "patch": {
                        this.handlePatch(event.getDataReader());
                        break;
                    }
                    case "delete": {
                        this.handleDelete(event.getDataReader());
                        break;
                    }
                    default: {
                        StreamProcessor.this.logger.warn("Unexpected event found in stream: {}", (Object)eventName);
                    }
                }
                StreamProcessor.this.lastStoreUpdateFailed = false;
                StreamProcessor.this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
            }
            catch (StreamInputException e) {
                StreamProcessor.this.logger.error("LaunchDarkly service request failed or received invalid data: {}", LogValues.exceptionSummary(e));
                StreamProcessor.this.logger.debug(LogValues.exceptionTrace(e));
                DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(e.getCause() instanceof IOException ? DataSourceStatusProvider.ErrorKind.NETWORK_ERROR : DataSourceStatusProvider.ErrorKind.INVALID_DATA, 0, e.getCause() == null ? e.getMessage() : e.getCause().toString(), Instant.now());
                StreamProcessor.this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
                StreamProcessor.this.es.restart();
            }
            catch (StreamStoreException e) {
                if (StreamProcessor.this.statusListener == null) {
                    if (!StreamProcessor.this.lastStoreUpdateFailed) {
                        StreamProcessor.this.logger.warn("Restarting stream to ensure that we have the latest data");
                    }
                    StreamProcessor.this.es.restart();
                }
                StreamProcessor.this.lastStoreUpdateFailed = true;
            }
            catch (Exception e) {
                StreamProcessor.this.logger.warn("Unexpected error from stream processor: {}", LogValues.exceptionSummary(e));
                StreamProcessor.this.logger.debug(LogValues.exceptionTrace(e));
            }
        }

        private void handlePut(Reader eventData) throws StreamInputException, StreamStoreException {
            StreamProcessor.this.recordStreamInit(false);
            StreamProcessor.this.esStarted = 0L;
            StreamProcessorEvents.PutData putData = (StreamProcessorEvents.PutData)StreamProcessor.parseStreamJson(StreamProcessorEvents::parsePutData, eventData);
            if (!StreamProcessor.this.dataSourceUpdates.init(putData.data)) {
                throw new StreamStoreException();
            }
            if (!StreamProcessor.this.initialized.getAndSet(true)) {
                this.initFuture.complete(null);
                StreamProcessor.this.logger.info("Initialized LaunchDarkly client.");
            }
        }

        private void handlePatch(Reader eventData) throws StreamInputException, StreamStoreException {
            StreamProcessorEvents.PatchData data = (StreamProcessorEvents.PatchData)StreamProcessor.parseStreamJson(StreamProcessorEvents::parsePatchData, eventData);
            if (data.kind == null) {
                return;
            }
            if (!StreamProcessor.this.dataSourceUpdates.upsert(data.kind, data.key, data.item)) {
                throw new StreamStoreException();
            }
        }

        private void handleDelete(Reader eventData) throws StreamInputException, StreamStoreException {
            StreamProcessorEvents.DeleteData data = (StreamProcessorEvents.DeleteData)StreamProcessor.parseStreamJson(StreamProcessorEvents::parseDeleteData, eventData);
            if (data.kind == null) {
                return;
            }
            DataStoreTypes.ItemDescriptor placeholder = new DataStoreTypes.ItemDescriptor(data.version, null);
            if (!StreamProcessor.this.dataSourceUpdates.upsert(data.kind, data.key, placeholder)) {
                throw new StreamStoreException();
            }
        }

        @Override
        public void onComment(String comment) {
            StreamProcessor.this.logger.debug("Received a heartbeat");
        }

        @Override
        public void onError(Throwable throwable) {
            StreamProcessor.this.logger.warn("Encountered EventSource error: {}", LogValues.exceptionSummary(throwable));
            StreamProcessor.this.logger.debug(LogValues.exceptionTrace(throwable));
        }
    }
}

