/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.internal.Network;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
import io.streamthoughts.kafka.connect.filepulse.source.SourceMetadata;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffset;
import io.streamthoughts.kafka.connect.filepulse.source.SourceStatus;
import io.streamthoughts.kafka.connect.filepulse.source.StateListener;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.Collections;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaFileStateReporter
implements StateListener {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFileStateReporter.class);
    private final StateBackingStore<SourceFile> store;
    private final OffsetManager offsetManager;

    KafkaFileStateReporter(StateBackingStore<SourceFile> store, OffsetManager offsetManager) {
        Objects.requireNonNull(store, "store can't be null");
        Objects.requireNonNull(offsetManager, "offsetManager can't be null");
        this.store = store;
        this.offsetManager = offsetManager;
    }

    void notify(SourceMetadata metadata, SourceOffset offset, SourceStatus status) {
        Objects.requireNonNull(metadata, "metadata can't be null");
        Objects.requireNonNull(offset, "offset can't be null");
        Objects.requireNonNull(status, "status can't be null");
        String partition = this.offsetManager.toPartitionJson(metadata);
        SourceFile state = new SourceFile(metadata, offset, status, Collections.singletonMap("hostname", Network.HOSTNAME));
        this.store.putAsync(partition, state);
    }

    public void onScheduled(FileContext context) {
        Objects.requireNonNull(context, "context can't be null");
        LOG.debug("Scheduling source file '{}'", (Object)context.metadata());
        this.notify(context.metadata(), context.offset(), SourceStatus.SCHEDULED);
    }

    public void onInvalid(FileContext context) {
        Objects.requireNonNull(context, "context can't be null");
        this.notify(context.metadata(), context.offset(), SourceStatus.INVALID);
    }

    public void onStart(FileContext context) {
        Objects.requireNonNull(context, "context can't be null");
        LOG.debug("Starting to precess source file '{}'", (Object)context.metadata());
        this.notify(context.metadata(), context.offset(), SourceStatus.STARTED);
    }

    public void onCompleted(FileContext context) {
        Objects.requireNonNull(context, "context can't be null");
        LOG.debug("Completed source file '{}'", (Object)context.metadata());
        this.notify(context.metadata(), context.offset(), SourceStatus.COMPLETED);
    }

    public void onFailure(FileContext context, Throwable t) {
        Objects.requireNonNull(context, "context can't be null");
        LOG.error("Error while processing source file '{}'", (Object)context.metadata(), (Object)t);
        this.notify(context.metadata(), context.offset(), SourceStatus.FAILED);
    }
}

