/*
 * Decompiled with CFR 0.152.
 */
package org.opentripplanner.ext.siri.updater;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ExpirationPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.entur.protobuf.mapper.SiriMapper;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.ext.siri.updater.SiriETGooglePubsubUpdaterParameters;
import org.opentripplanner.routing.graph.Graph;
import org.opentripplanner.updater.GraphUpdater;
import org.opentripplanner.updater.WriteToGraphCallback;
import org.opentripplanner.util.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
import uk.org.siri.siri20.EstimatedVersionFrameStructure;
import uk.org.siri.siri20.Siri;
import uk.org.siri.www.siri.SiriType;

public class SiriETGooglePubsubUpdater
implements GraphUpdater {
    private static final Logger LOG = LoggerFactory.getLogger(SiriETGooglePubsubUpdater.class);
    private static final transient AtomicLong messageCounter = new AtomicLong(0L);
    private static final transient AtomicLong updateCounter = new AtomicLong(0L);
    private static final transient AtomicLong sizeCounter = new AtomicLong(0L);
    private WriteToGraphCallback saveResultOnGraph;
    private SiriTimetableSnapshotSource snapshotSource;
    private final URI dataInitializationUrl;
    private final String feedId;
    private final int reconnectPeriodSec;
    private final SubscriptionAdminClient subscriptionAdminClient;
    private final ProjectSubscriptionName subscriptionName;
    private final ProjectTopicName topic;
    private final PushConfig pushConfig;
    private final String configRef;
    private transient long startTime;
    private boolean primed;

    public SiriETGooglePubsubUpdater(SiriETGooglePubsubUpdaterParameters config) {
        this.configRef = config.getConfigRef();
        this.dataInitializationUrl = URI.create(config.getDataInitializationUrl());
        this.feedId = config.getFeedId();
        this.reconnectPeriodSec = config.getReconnectPeriodSec();
        Object subscriptionId = System.getenv("HOSTNAME");
        if (subscriptionId == null || ((String)subscriptionId).isEmpty()) {
            subscriptionId = "otp-" + UUID.randomUUID().toString();
        }
        String projectName = config.getProjectName();
        String topicName = config.getTopicName();
        this.subscriptionName = ProjectSubscriptionName.of((String)projectName, (String)subscriptionId);
        this.topic = ProjectTopicName.of((String)projectName, (String)topicName);
        this.pushConfig = PushConfig.getDefaultInstance();
        try {
            if (System.getenv("GOOGLE_APPLICATION_CREDENTIALS") == null || System.getenv("GOOGLE_APPLICATION_CREDENTIALS").isEmpty()) {
                throw new RuntimeException("Google Pubsub updater is configured, but environment variable 'GOOGLE_APPLICATION_CREDENTIALS' is not defined. See https://cloud.google.com/docs/authentication/getting-started");
            }
            this.subscriptionAdminClient = SubscriptionAdminClient.create();
            this.addShutdownHook();
        }
        catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    private void addShutdownHook() {
        try {
            Runtime.getRuntime().addShutdownHook(new Thread(this::teardown));
            LOG.info("Shutdown-hook to clean up Google Pubsub subscription has been added.");
        }
        catch (IllegalStateException e) {
            LOG.info("Instance is already shutting down - cleaning up immediately.", (Throwable)e);
            this.teardown();
        }
    }

    @Override
    public void setGraphUpdaterManager(WriteToGraphCallback saveResultOnGraph) {
        this.saveResultOnGraph = saveResultOnGraph;
    }

    @Override
    public void setup(Graph graph) throws Exception {
        this.snapshotSource = graph.getOrSetupTimetableSnapshotProvider(SiriTimetableSnapshotSource::new);
    }

    @Override
    public void run() throws IOException {
        if (this.subscriptionAdminClient == null) {
            throw new RuntimeException("Unable to initialize Google Pubsub-updater: System.getenv('GOOGLE_APPLICATION_CREDENTIALS') = " + System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
        }
        LOG.info("Creating subscription {}", (Object)this.subscriptionName);
        Subscription subscription = this.subscriptionAdminClient.createSubscription(Subscription.newBuilder().setTopic(this.topic.toString()).setName(this.subscriptionName.toString()).setPushConfig(this.pushConfig).setMessageRetentionDuration(Duration.newBuilder().setSeconds(600L).build()).setExpirationPolicy(ExpirationPolicy.newBuilder().setTtl(Duration.newBuilder().setSeconds(86400L).build()).build()).build());
        LOG.info("Created subscription {}", (Object)this.subscriptionName);
        this.startTime = this.now();
        EstimatedTimetableMessageReceiver receiver = new EstimatedTimetableMessageReceiver();
        int sleepPeriod = 1000;
        int attemptCounter = 1;
        while (!this.isPrimed()) {
            try {
                this.initializeData(this.dataInitializationUrl, receiver);
            }
            catch (Exception e) {
                LOG.warn("Caught Exception while initializing data, will retry after {} ms - attempt number {}. ({})", new Object[]{sleepPeriod *= 2, attemptCounter++, e.toString()});
                try {
                    Thread.sleep(sleepPeriod);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        Subscriber subscriber = null;
        while (true) {
            block11: {
                try {
                    subscriber = Subscriber.newBuilder((String)subscription.getName(), (MessageReceiver)receiver).build();
                    subscriber.startAsync().awaitRunning();
                    subscriber.awaitTerminated();
                }
                catch (IllegalStateException e) {
                    if (subscriber == null) break block11;
                    subscriber.stopAsync();
                }
            }
            try {
                Thread.sleep(this.reconnectPeriodSec * 1000);
                continue;
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                continue;
            }
            break;
        }
    }

    private long now() {
        return ZonedDateTime.now().toInstant().toEpochMilli();
    }

    @Override
    public void teardown() {
        if (this.subscriptionAdminClient != null) {
            LOG.info("Deleting subscription {}", (Object)this.subscriptionName);
            this.subscriptionAdminClient.deleteSubscription(this.subscriptionName);
            LOG.info("Subscription deleted {} - time since startup: {} sec", (Object)this.subscriptionName, (Object)((this.now() - this.startTime) / 1000L));
        }
    }

    @Override
    public boolean isPrimed() {
        return this.primed;
    }

    @Override
    public String getConfigRef() {
        return this.configRef;
    }

    private String getTimeSinceStartupString() {
        return DurationFormatUtils.formatDuration((long)(this.now() - this.startTime), (String)"HH:mm:ss");
    }

    private void initializeData(URI dataInitializationUrl, EstimatedTimetableMessageReceiver receiver) throws IOException {
        if (dataInitializationUrl != null) {
            LOG.info("Fetching initial data from " + dataInitializationUrl);
            long t1 = System.currentTimeMillis();
            InputStream data = HttpUtils.getData(dataInitializationUrl, 30000L, Map.of("Content-Type", "application/x-protobuf"));
            ByteString value = ByteString.readFrom((InputStream)data);
            final long t2 = System.currentTimeMillis();
            LOG.info("Fetching initial data - finished after {} ms, got {} bytes", (Object)(t2 - t1), (Object)FileUtils.byteCountToDisplaySize((long)value.size()));
            PubsubMessage message = PubsubMessage.newBuilder().setData(value).build();
            receiver.receiveMessage(message, new AckReplyConsumer(){

                public void ack() {
                    SiriETGooglePubsubUpdater.this.primed = true;
                    LOG.info("Pubsub updater initialized after {} ms: [messages: {},  updates: {}, total size: {}, time since startup: {}]", new Object[]{System.currentTimeMillis() - t2, messageCounter.get(), updateCounter.get(), FileUtils.byteCountToDisplaySize((long)sizeCounter.get()), SiriETGooglePubsubUpdater.this.getTimeSinceStartupString()});
                }

                public void nack() {
                }
            });
        }
    }

    class EstimatedTimetableMessageReceiver
    implements MessageReceiver {
        EstimatedTimetableMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            Siri siri;
            try {
                sizeCounter.addAndGet(message.getData().size());
                ByteString data = message.getData();
                SiriType siriType = SiriType.parseFrom((ByteString)data);
                siri = SiriMapper.mapToJaxb((SiriType)siriType);
            }
            catch (InvalidProtocolBufferException e) {
                throw new RuntimeException(e);
            }
            if (siri.getServiceDelivery() != null) {
                List estimatedTimetableDeliveries = siri.getServiceDelivery().getEstimatedTimetableDeliveries();
                int numberOfUpdatedTrips = 0;
                try {
                    numberOfUpdatedTrips = ((EstimatedVersionFrameStructure)((EstimatedTimetableDeliveryStructure)estimatedTimetableDeliveries.get(0)).getEstimatedJourneyVersionFrames().get(0)).getEstimatedVehicleJourneies().size();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                long numberOfUpdates = updateCounter.addAndGet(numberOfUpdatedTrips);
                long numberOfMessages = messageCounter.incrementAndGet();
                if (numberOfMessages % 1000L == 0L) {
                    LOG.info("Pubsub stats: [messages: {}, updates: {}, total size: {}, current delay {} ms, time since startup: {}]", new Object[]{numberOfMessages, numberOfUpdates, FileUtils.byteCountToDisplaySize((long)sizeCounter.get()), SiriETGooglePubsubUpdater.this.now() - siri.getServiceDelivery().getResponseTimestamp().toInstant().toEpochMilli(), SiriETGooglePubsubUpdater.this.getTimeSinceStartupString()});
                }
                SiriETGooglePubsubUpdater.this.saveResultOnGraph.execute(graph -> SiriETGooglePubsubUpdater.this.snapshotSource.applyEstimatedTimetable(graph, SiriETGooglePubsubUpdater.this.feedId, false, estimatedTimetableDeliveries));
            }
            consumer.ack();
        }
    }
}

