package org.apache.rya.kafka.connect.api.sink;

import com.jcabi.manifests.Manifests;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.Sail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/kafka/connect/api/sink/RyaSinkTask.class */
public abstract class RyaSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RyaSinkTask.class);

    @Nullable
    private SailRepository sailRepo = null;

    @Nullable
    private SailRepositoryConnection conn = null;

    protected abstract void checkRyaInstanceExists(Map<String, String> map) throws ConnectException;

    protected abstract Sail makeSail(Map<String, String> map) throws ConnectException;

    public String version() {
        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN";
    }

    public void start(Map<String, String> map) throws ConnectException {
        Objects.requireNonNull(map);
        checkRyaInstanceExists(map);
        this.sailRepo = new SailRepository(makeSail(map));
        this.conn = this.sailRepo.getConnection();
    }

    public void put(Collection<SinkRecord> collection) {
        Objects.requireNonNull(collection);
        if (collection.isEmpty()) {
            return;
        }
        if (!this.conn.isActive()) {
            this.conn.begin();
        }
        Iterator<SinkRecord> it = collection.iterator();
        while (it.hasNext()) {
            this.conn.add((Set) it.next().value(), new Resource[0]);
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        Objects.requireNonNull(map);
        this.conn.commit();
    }

    public void stop() {
        try {
            if (this.conn != null) {
                this.conn.close();
            }
        } catch (Exception e) {
            log.error("Could not close the Sail Repository Connection.", (Throwable) e);
        }
        try {
            if (this.sailRepo != null) {
                this.sailRepo.shutDown();
            }
        } catch (Exception e2) {
            log.error("Could not shut down the Sail Repository.", (Throwable) e2);
        }
    }
}
