/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.tools;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.tools.SchemaSourceConnector;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.tools.ThroughputThrottler;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaSourceTask
extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(SchemaSourceTask.class);
    public static final String NAME_CONFIG = "name";
    public static final String ID_CONFIG = "id";
    public static final String TOPIC_CONFIG = "topic";
    public static final String NUM_MSGS_CONFIG = "num.messages";
    public static final String THROUGHPUT_CONFIG = "throughput";
    public static final String MULTIPLE_SCHEMA_CONFIG = "multiple.schema";
    public static final String PARTITION_COUNT_CONFIG = "partition.count";
    private static final String ID_FIELD = "id";
    private static final String SEQNO_FIELD = "seqno";
    private ThroughputThrottler throttler;
    private int id;
    private String topic;
    private Map<String, Integer> partition;
    private long startingSeqno;
    private long seqno;
    private long count;
    private long maxNumMsgs;
    private boolean multipleSchema;
    private int partitionCount;
    private static final Schema VALUE_SCHEMA = SchemaBuilder.struct().version(1).name("record").field("boolean", Schema.BOOLEAN_SCHEMA).field("int", Schema.INT32_SCHEMA).field("long", Schema.INT64_SCHEMA).field("float", Schema.FLOAT32_SCHEMA).field("double", Schema.FLOAT64_SCHEMA).field("partitioning", Schema.INT32_SCHEMA).field("id", Schema.INT32_SCHEMA).field("seqno", Schema.INT64_SCHEMA).build();
    private static final Schema VALUE_SCHEMA_2 = SchemaBuilder.struct().version(2).name("record").field("boolean", Schema.BOOLEAN_SCHEMA).field("int", Schema.INT32_SCHEMA).field("long", Schema.INT64_SCHEMA).field("float", Schema.FLOAT32_SCHEMA).field("double", Schema.FLOAT64_SCHEMA).field("partitioning", Schema.INT32_SCHEMA).field("string", SchemaBuilder.string().defaultValue("abc").build()).field("id", Schema.INT32_SCHEMA).field("seqno", Schema.INT64_SCHEMA).build();

    @Override
    public String version() {
        return new SchemaSourceConnector().version();
    }

    @Override
    public void start(Map<String, String> props) {
        long throughput;
        String name = props.get(NAME_CONFIG);
        try {
            this.id = Integer.parseInt(props.get("id"));
            this.topic = props.get(TOPIC_CONFIG);
            this.maxNumMsgs = Long.parseLong(props.get(NUM_MSGS_CONFIG));
            this.multipleSchema = Boolean.parseBoolean(props.get(MULTIPLE_SCHEMA_CONFIG));
            this.partitionCount = Integer.parseInt(props.getOrDefault(PARTITION_COUNT_CONFIG, "1"));
            throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG));
        }
        catch (NumberFormatException e) {
            throw new ConnectException("Invalid SchemaSourceTask configuration", e);
        }
        this.throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
        this.partition = Collections.singletonMap("id", this.id);
        Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(this.partition);
        this.seqno = previousOffset != null ? (Long)previousOffset.get(SEQNO_FIELD) + 1L : 0L;
        this.startingSeqno = this.seqno;
        this.count = 0L;
        log.info("Started SchemaSourceTask {}-{} producing to topic {} resuming from seqno {}", new Object[]{name, this.id, this.topic, this.startingSeqno});
    }

    @Override
    public List<SourceRecord> poll() {
        if (this.count < this.maxNumMsgs) {
            SourceRecord srcRecord;
            long sendStartMs = System.currentTimeMillis();
            if (this.throttler.shouldThrottle(this.seqno - this.startingSeqno, sendStartMs)) {
                this.throttler.throttle();
            }
            Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, this.seqno);
            int partitionVal = (int)(this.seqno % (long)this.partitionCount);
            if (!this.multipleSchema || this.count % 2L == 0L) {
                Struct data = new Struct(VALUE_SCHEMA).put("boolean", (Object)true).put("int", (Object)12).put("long", (Object)12L).put("float", (Object)Float.valueOf(12.2f)).put("double", (Object)12.2).put("partitioning", (Object)partitionVal).put("id", (Object)this.id).put(SEQNO_FIELD, (Object)this.seqno);
                srcRecord = new SourceRecord(this.partition, ccOffset, this.topic, this.id, Schema.STRING_SCHEMA, (Object)"key", VALUE_SCHEMA, data);
            } else {
                Struct data = new Struct(VALUE_SCHEMA_2).put("boolean", (Object)true).put("int", (Object)12).put("long", (Object)12L).put("float", (Object)Float.valueOf(12.2f)).put("double", (Object)12.2).put("partitioning", (Object)partitionVal).put("string", (Object)"def").put("id", (Object)this.id).put(SEQNO_FIELD, (Object)this.seqno);
                srcRecord = new SourceRecord(this.partition, ccOffset, this.topic, this.id, Schema.STRING_SCHEMA, (Object)"key", VALUE_SCHEMA_2, data);
            }
            System.out.println("{\"task\": " + this.id + ", \"seqno\": " + this.seqno + "}");
            ++this.seqno;
            ++this.count;
            return Collections.singletonList(srcRecord);
        }
        this.throttler.throttle();
        return Collections.emptyList();
    }

    @Override
    public void stop() {
        this.throttler.wakeup();
    }
}

