/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.tools;

import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.TopicPartition;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.sink.SinkRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.sink.SinkTask;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.tools.VerifiableSinkConnector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class VerifiableSinkTask
extends SinkTask {
    public static final String NAME_CONFIG = "name";
    public static final String ID_CONFIG = "id";
    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
    private String name;
    private int id;
    private final Map<TopicPartition, List<Map<String, Object>>> unflushed = new HashMap<TopicPartition, List<Map<String, Object>>>();

    @Override
    public String version() {
        return new VerifiableSinkConnector().version();
    }

    @Override
    public void start(Map<String, String> props) {
        try {
            this.name = props.get(NAME_CONFIG);
            this.id = Integer.parseInt(props.get(ID_CONFIG));
        }
        catch (NumberFormatException e) {
            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
        }
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        long nowMs = System.currentTimeMillis();
        for (SinkRecord record : records) {
            String dataJson;
            HashMap<String, Object> data = new HashMap<String, Object>();
            data.put(NAME_CONFIG, this.name);
            data.put("task", record.key());
            data.put("sinkTask", this.id);
            data.put("topic", record.topic());
            data.put("time_ms", nowMs);
            data.put("seqno", record.value());
            data.put("offset", record.kafkaOffset());
            try {
                dataJson = JSON_SERDE.writeValueAsString(data);
            }
            catch (JsonProcessingException e) {
                dataJson = "Bad data can't be written as json: " + e.getMessage();
            }
            System.out.println(dataJson);
            this.unflushed.computeIfAbsent(new TopicPartition(record.topic(), record.kafkaPartition()), tp -> new ArrayList()).add(data);
        }
    }

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        long nowMs = System.currentTimeMillis();
        for (TopicPartition topicPartition : offsets.keySet()) {
            if (!this.unflushed.containsKey(topicPartition)) continue;
            for (Map<String, Object> data : this.unflushed.get(topicPartition)) {
                String dataJson;
                data.put("time_ms", nowMs);
                data.put("flushed", true);
                try {
                    dataJson = JSON_SERDE.writeValueAsString(data);
                }
                catch (JsonProcessingException e) {
                    dataJson = "Bad data can't be written as json: " + e.getMessage();
                }
                System.out.println(dataJson);
            }
            this.unflushed.remove(topicPartition);
        }
    }

    @Override
    public void stop() {
    }
}

