/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.source.extractor;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.runtime.JobShutdownException;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
import org.apache.gobblin.util.Decorator;

public interface Extractor<S, D>
extends Closeable {
    public S getSchema() throws IOException;

    @Nullable
    default public D readRecord(@Deprecated D reuse) throws DataRecordException, IOException {
        throw new UnsupportedOperationException();
    }

    public long getExpectedRecordCount();

    @Deprecated
    public long getHighWatermark();

    default public void shutdown() throws JobShutdownException {
        if (!(this instanceof Decorator) || !(((Decorator)((Object)this)).getDecoratedObject() instanceof Extractor)) {
            throw new JobShutdownException(this.getClass().getName() + ": Extractor does not support shutdown.");
        }
        ((Extractor)((Decorator)((Object)this)).getDecoratedObject()).shutdown();
    }

    @SuppressWarnings(value={"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"}, justification="Findbugs believes readRecord(null) is non-null. This is not true.")
    default public RecordEnvelope<D> readRecordEnvelope() throws DataRecordException, IOException {
        Object record = this.readRecord(null);
        return record == null ? null : new RecordEnvelope<Object>(record);
    }

    default public StreamEntity<D> readStreamEntity() throws DataRecordException, IOException {
        return this.readRecordEnvelope();
    }

    default public RecordStreamWithMetadata<D, S> recordStream(AtomicBoolean shutdownRequest) throws IOException {
        S schema = this.getSchema();
        Flowable recordStream = Flowable.generate(() -> shutdownRequest, (state, emitter) -> {
            if (state.get()) {
                try {
                    this.shutdown();
                }
                catch (JobShutdownException exc) {
                    emitter.onError((Throwable)exc);
                }
            }
            try {
                StreamEntity<D> record = this.readStreamEntity();
                if (record != null) {
                    emitter.onNext(record);
                } else {
                    emitter.onComplete();
                }
            }
            catch (IOException | DataRecordException exc) {
                emitter.onError((Throwable)exc);
            }
        });
        recordStream = recordStream.doFinally(this::close);
        return new RecordStreamWithMetadata(recordStream, GlobalMetadata.builder().schema(schema).build());
    }
}

