/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.db.stream;

import io.debezium.connector.spanner.db.dao.ChangeStreamDao;
import io.debezium.connector.spanner.db.dao.ChangeStreamResultSet;
import io.debezium.connector.spanner.db.mapper.ChangeStreamRecordMapper;
import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.db.model.event.ChangeStreamEvent;
import io.debezium.connector.spanner.db.model.event.ChildPartitionsEvent;
import io.debezium.connector.spanner.db.model.event.FinishPartitionEvent;
import io.debezium.connector.spanner.db.model.event.HeartbeatEvent;
import io.debezium.connector.spanner.db.stream.ChangeStreamEventConsumer;
import io.debezium.connector.spanner.db.stream.PartitionEventListener;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.DelayChangeStreamEventsMetricEvent;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerChangeStreamService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerChangeStreamService.class);
    private final ChangeStreamDao changeStreamDao;
    private final ChangeStreamRecordMapper changeStreamRecordMapper;
    private final Duration heartbeatMillis;
    private final MetricsEventPublisher metricsEventPublisher;
    private final String taskUid;

    public SpannerChangeStreamService(String taskUid, ChangeStreamDao changeStreamDao, ChangeStreamRecordMapper changeStreamRecordMapper, Duration heartbeatMillis, MetricsEventPublisher metricsEventPublisher) {
        this.changeStreamDao = changeStreamDao;
        this.changeStreamRecordMapper = changeStreamRecordMapper;
        this.heartbeatMillis = heartbeatMillis;
        this.metricsEventPublisher = metricsEventPublisher;
        this.taskUid = taskUid;
    }

    public void getEvents(Partition partition, ChangeStreamEventConsumer changeStreamEventConsumer, PartitionEventListener partitionEventListener) throws InterruptedException, Exception {
        String token = partition.getToken();
        partitionEventListener.onRun(partition);
        LOGGER.info("Task: {}, Streaming {} from {} to {}", new Object[]{this.taskUid, token, partition.getStartTimestamp(), partition.getEndTimestamp()});
        try (ChangeStreamResultSet resultSet = this.changeStreamDao.streamQuery(token, partition.getStartTimestamp(), partition.getEndTimestamp(), this.heartbeatMillis.toMillis());){
            long start = this.now();
            while (resultSet.next()) {
                long delay = this.now() - start;
                List<ChangeStreamEvent> events = this.changeStreamRecordMapper.toChangeStreamEvents(partition, resultSet, resultSet.getMetadata());
                LOGGER.debug("Task: {}, Events receive from stream: {}", (Object)this.taskUid, events);
                if (!events.isEmpty() && events.get(0) instanceof HeartbeatEvent) {
                    HeartbeatEvent heartbeatEvent = (HeartbeatEvent)events.get(0);
                    long heartbeatLag = System.currentTimeMillis() - heartbeatEvent.getRecordTimestamp().toSqlTimestamp().toInstant().toEpochMilli();
                    if (heartbeatLag > 60000L) {
                        LOGGER.warn("Task: {}, heartbeat has very old timestamp, lag: {}, token: {}, event: {}", new Object[]{this.taskUid, heartbeatLag, heartbeatEvent.getMetadata().getPartitionToken(), heartbeatEvent});
                    }
                }
                this.processEvents(partition, events, changeStreamEventConsumer);
                if (!events.isEmpty() && !(events.get(0) instanceof HeartbeatEvent)) {
                    this.metricsEventPublisher.publishMetricEvent(new DelayChangeStreamEventsMetricEvent((int)delay));
                }
                start = this.now();
            }
        }
        catch (InterruptedException ex) {
            LOGGER.info("task {}, Interrupting streaming partition task with token {}", (Object)this.taskUid, (Object)partition.getToken());
            Thread.currentThread().interrupt();
        }
        partitionEventListener.onFinish(partition);
        LOGGER.info("Task {}, Finished consuming partition {}", (Object)this.taskUid, (Object)partition);
        changeStreamEventConsumer.acceptChangeStreamEvent(new FinishPartitionEvent(partition));
    }

    private long now() {
        return Instant.now().toEpochMilli();
    }

    private void processEvents(Partition partition, List<ChangeStreamEvent> events, ChangeStreamEventConsumer changeStreamEventConsumer) throws InterruptedException {
        for (ChangeStreamEvent changeStreamEvent : events) {
            if (changeStreamEvent instanceof ChildPartitionsEvent) {
                ChildPartitionsEvent childPartitionsEvent = (ChildPartitionsEvent)changeStreamEvent;
                LOGGER.info("Task: {}, Received child partition from partition {}:{}", new Object[]{this.taskUid, partition.getToken(), childPartitionsEvent});
            }
            LOGGER.debug("Task: {}, Received record from partition {}: {}", new Object[]{this.taskUid, partition.getToken(), changeStreamEvent});
            changeStreamEventConsumer.acceptChangeStreamEvent(changeStreamEvent);
        }
    }
}

