/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.processor.heartbeat;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.spanner.SpannerPartition;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.Sizeable;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.SchemaNameAdjuster;
import java.time.Clock;
import java.time.Instant;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerHeartbeat
implements Heartbeat.ScheduledHeartbeat {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerHeartbeat.class);
    private static final String PARTITION_TOKEN_KEY = "partitionToken";
    private final String topicName;
    private final Schema keySchema;
    private final Schema valueSchema;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final Clock clock;

    public SpannerHeartbeat(String topicName, SchemaNameAdjuster schemaNameAdjuster, ChangeEventQueue<DataChangeEvent> queue, Clock clock) {
        this.topicName = topicName;
        this.keySchema = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector.spanner.processor.heartbeat.PartitionTokenKey")).field(PARTITION_TOKEN_KEY, Schema.STRING_SCHEMA).build();
        this.valueSchema = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector.spanner.processor.heartbeat.SpannerHeartbeat")).field("ts_ms", Schema.INT64_SCHEMA).build();
        this.queue = queue;
        this.clock = clock;
    }

    public void emitWithDelay(Map<String, ?> partition, OffsetContext offset) throws InterruptedException {
        this.emit(partition, offset);
    }

    public void emit(Map<String, ?> partition, OffsetContext offset) throws InterruptedException {
        LOGGER.debug("Generating heartbeat event");
        this.queue.enqueue((Sizeable)new DataChangeEvent(this.heartbeatRecord(partition, offset.getOffset())));
    }

    public boolean isEnabled() {
        return true;
    }

    private Struct partitionTokenKey(String partitionToken) {
        Struct result = new Struct(this.keySchema);
        result.put(PARTITION_TOKEN_KEY, (Object)partitionToken);
        return result;
    }

    private Struct messageValue() {
        Struct result = new Struct(this.valueSchema);
        result.put("ts_ms", (Object)Instant.now(this.clock).toEpochMilli());
        return result;
    }

    private SourceRecord heartbeatRecord(Map<String, String> sourcePartition, Map<String, ?> sourceOffset) {
        String token = SpannerPartition.extractToken(sourcePartition);
        return new SourceRecord(sourcePartition, sourceOffset, this.topicName, Integer.valueOf(0), this.keySchema, (Object)this.partitionTokenKey(token), this.valueSchema, (Object)this.messageValue());
    }
}

