/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.model.sink;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.util.Collection;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkLatencyMetrics;
import org.opensearch.dataprepper.model.sink.SinkThread;

public abstract class AbstractSink<T extends Record<?>>
implements Sink<T> {
    protected static final int DEFAULT_MAX_RETRIES = 600;
    protected static final int DEFAULT_WAIT_TIME_MS = 1000;
    protected final PluginMetrics pluginMetrics;
    private final Counter recordsInCounter;
    private final SinkLatencyMetrics latencyMetrics;
    private final Timer timeElapsedTimer;
    private Thread retryThread;
    private int maxRetries;
    private int waitTimeMs;

    public AbstractSink(PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
        this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
        this.recordsInCounter = this.pluginMetrics.counter("recordsIn");
        this.timeElapsedTimer = this.pluginMetrics.timer("timeElapsed");
        this.latencyMetrics = new SinkLatencyMetrics(this.pluginMetrics);
        this.retryThread = null;
        this.maxRetries = numRetries;
        this.waitTimeMs = waitTimeMs;
    }

    public AbstractSink(PluginSetting pluginSetting) {
        this(pluginSetting, 600, 1000);
    }

    public abstract void doInitialize();

    @Override
    public void initialize() {
        this.doInitialize();
        if (!this.isReady() && this.retryThread == null) {
            this.retryThread = new Thread(new SinkThread(this, this.maxRetries, this.waitTimeMs));
            this.retryThread.start();
        }
    }

    @Override
    public void output(Collection<T> records) {
        this.recordsInCounter.increment((double)records.size() * 1.0);
        this.timeElapsedTimer.record(() -> this.doOutput(records));
    }

    public abstract void doOutput(Collection<T> var1);

    @Override
    public void shutdown() {
        if (this.retryThread != null) {
            this.retryThread.stop();
        }
    }

    @Override
    public void updateLatencyMetrics(Collection<T> records) {
        for (Record record : records) {
            if (!(record.getData() instanceof Event)) continue;
            Event event = (Event)record.getData();
            event.getEventHandle().onRelease((eventHandle, result) -> {
                if (result.booleanValue()) {
                    this.latencyMetrics.update((EventHandle)eventHandle);
                }
            });
        }
    }

    Thread.State getRetryThreadState() {
        if (this.retryThread != null) {
            return this.retryThread.getState();
        }
        return null;
    }
}

