/*
 * Decompiled with CFR 0.152.
 */
package org.opentripplanner.ext.siri.updater.azure;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.google.common.io.CharStreams;
import jakarta.xml.bind.JAXBException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.xml.stream.XMLStreamException;
import org.apache.http.client.utils.URIBuilder;
import org.opentripplanner.ext.siri.SiriAlertsUpdateHandler;
import org.opentripplanner.ext.siri.updater.azure.AbstractAzureSiriUpdater;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureSXUpdaterParameters;
import org.opentripplanner.framework.io.HttpUtils;
import org.opentripplanner.framework.time.DurationUtils;
import org.opentripplanner.routing.impl.TransitAlertServiceImpl;
import org.opentripplanner.routing.services.TransitAlertService;
import org.opentripplanner.transit.service.TransitModel;
import org.opentripplanner.updater.alert.TransitAlertProvider;
import org.opentripplanner.updater.spi.HttpHeaders;
import org.rutebanken.siri20.util.SiriXml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.Siri;

public class SiriAzureSXUpdater
extends AbstractAzureSiriUpdater
implements TransitAlertProvider {
    private final Logger LOG = LoggerFactory.getLogger(this.getClass());
    private final SiriAlertsUpdateHandler updateHandler;
    private final TransitAlertService transitAlertService;
    private static final transient AtomicLong messageCounter = new AtomicLong(0L);
    private final LocalDate fromDateTime;
    private final LocalDate toDateTime;
    private Instant startTime;

    public SiriAzureSXUpdater(SiriAzureSXUpdaterParameters config, TransitModel transitModel) {
        super(config, transitModel);
        this.fromDateTime = config.getFromDateTime();
        this.toDateTime = config.getToDateTime();
        this.transitAlertService = new TransitAlertServiceImpl(transitModel);
        this.updateHandler = new SiriAlertsUpdateHandler(this.feedId, transitModel, this.transitAlertService, this.fuzzyTripMatcher(), 0L);
    }

    @Override
    protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) {
        ServiceBusReceivedMessage message = messageContext.getMessage();
        this.LOG.debug("Processing message. messageId={}, sequenceNumber={}, enqueued time={}", new Object[]{message.getMessageId(), message.getSequenceNumber(), message.getEnqueuedTime()});
        messageCounter.incrementAndGet();
        this.processMessage(message.getBody().toString(), message.getMessageId());
    }

    @Override
    protected void errorConsumer(ServiceBusErrorContext errorContext) {
        this.defaultErrorConsumer(errorContext);
    }

    @Override
    protected void initializeData(String url, Consumer<ServiceBusReceivedMessageContext> consumer) throws IOException, URISyntaxException {
        if (url == null) {
            this.LOG.info("No history url set up for Siri Azure Sx Updater");
            return;
        }
        while (!this.isPrimed()) {
            this.startTime = Instant.now();
            URI uri = new URIBuilder(url).addParameter("publishFromDateTime", this.fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)).addParameter("publishToDateTime", this.toDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)).build();
            this.LOG.info("Fetching initial Siri SX data from {}, timeout is {}ms", (Object)uri, (Object)this.timeout);
            long t1 = System.currentTimeMillis();
            HttpHeaders rh = HttpHeaders.of().acceptApplicationXML().build();
            InputStream data = HttpUtils.getData(uri, Duration.ofMillis(this.timeout), rh.asMap());
            long t2 = System.currentTimeMillis();
            if (data == null) {
                throw new IOException("Historical endpoint returned no data from url" + url);
            }
            InputStreamReader reader = new InputStreamReader(data);
            String string = CharStreams.toString((Readable)reader);
            this.LOG.info("Fetching initial data - finished after {} ms, got {} bytes", (Object)(t2 - t1), (Object)string.length());
            this.processHistory(string, "SX-INITIAL-1");
        }
    }

    private Siri getSiri(String message, String id) throws XMLStreamException, JAXBException {
        Siri siri = SiriXml.parseXml((String)message);
        if (siri.getServiceDelivery() == null || siri.getServiceDelivery().getSituationExchangeDeliveries() == null || siri.getServiceDelivery().getSituationExchangeDeliveries().isEmpty()) {
            if (siri.getHeartbeatNotification() != null) {
                this.LOG.info("Received SIRI heartbeat message");
            } else {
                this.LOG.warn("Empty Siri message for messageId {}", (Object)id);
                this.LOG.debug(message);
            }
            return null;
        }
        return siri;
    }

    private void processMessage(String message, String id) {
        try {
            Siri siri = this.getSiri(message, id);
            if (siri == null) {
                return;
            }
            this.saveResultOnGraph.execute((graph, transitModel) -> this.updateHandler.update(siri.getServiceDelivery()));
        }
        catch (JAXBException | XMLStreamException e) {
            this.LOG.error(e.getLocalizedMessage(), e);
        }
    }

    private void processHistory(String message, String id) {
        try {
            Siri siri = this.getSiri(message, id);
            if (siri == null) {
                this.LOG.info("Did not receive any SX messages from history endpoint.");
                return;
            }
            Future<?> f = this.saveResultOnGraph.execute((graph, transitModel) -> {
                try {
                    long t1 = System.currentTimeMillis();
                    this.updateHandler.update(siri.getServiceDelivery());
                    this.LOG.info("Azure SX updater initialized after {} ms: [time since startup: {}]", (Object)(System.currentTimeMillis() - t1), (Object)DurationUtils.durationToStr(Duration.between(this.startTime, Instant.now())));
                    this.setPrimed(true);
                }
                catch (Exception e) {
                    this.LOG.error("Could not process SX history", (Throwable)e);
                }
            });
            f.get();
        }
        catch (JAXBException | InterruptedException | ExecutionException | XMLStreamException e) {
            this.LOG.error(e.getLocalizedMessage(), e);
        }
    }

    @Override
    public TransitAlertService getTransitAlertService() {
        return this.transitAlertService;
    }
}

