/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.google.common.base.Optional;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import io.reactivex.Flowable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.schedulers.Schedulers;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gobblin.ack.Ackable;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
import org.apache.gobblin.fork.ForkOperator;
import org.apache.gobblin.fork.Forker;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.runtime.ExecutionModel;
import org.apache.gobblin.runtime.MultiConverter;
import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.StreamingExtractor;
import org.apache.gobblin.util.ExponentialBackoff;
import org.apache.gobblin.util.LoggingUncaughtExceptionHandler;
import org.apache.gobblin.writer.AcknowledgableWatermark;
import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
import org.apache.gobblin.writer.WatermarkManager;
import org.apache.gobblin.writer.WatermarkStorage;

public class StreamModelTaskRunner {
    private final Task task;
    private final TaskState taskState;
    private final Closer closer;
    private final TaskContext taskContext;
    private final Extractor extractor;
    private final Converter converter;
    private final List<RecordStreamProcessor<?, ?, ?, ?>> recordStreamProcessors;
    private final RowLevelPolicyChecker rowChecker;
    private final TaskExecutor taskExecutor;
    private final ExecutionModel taskMode;
    private final AtomicBoolean shutdownRequested;
    private final Optional<FineGrainedWatermarkTracker> watermarkTracker;
    private final Optional<WatermarkManager> watermarkManager;
    private final Optional<WatermarkStorage> watermarkStorage;
    private final Map<Optional<Fork>, Optional<Future<?>>> forks;

    protected void run() throws Exception {
        long maxWaitInMinute = this.taskState.getPropAsLong("fork.max.wait.minutes", 60L);
        long initialDelay = this.taskState.getPropAsLong("fork.finished.check.interval", 1000L);
        ForkOperator forkOperator = (ForkOperator)this.closer.register((Closeable)this.taskContext.getForkOperator());
        RecordStreamWithMetadata stream = this.extractor.recordStream(this.shutdownRequested);
        ConnectableFlowable connectableStream = stream.getRecordStream().publish();
        Flowable streamWithShutdownOnCancel = connectableStream.doOnCancel(() -> ((Extractor)this.extractor).shutdown());
        stream = stream.withRecordStream(streamWithShutdownOnCancel);
        stream = stream.mapRecords(r -> {
            this.task.onRecordExtract();
            return r;
        });
        if (this.task.isStreamingTask()) {
            if (this.watermarkTracker.isPresent()) {
                ((FineGrainedWatermarkTracker)this.watermarkTracker.get()).start();
            }
            ((WatermarkManager)this.watermarkManager.get()).start();
            ((StreamingExtractor)this.taskContext.getRawSourceExtractor()).start((WatermarkStorage)this.watermarkStorage.get());
            stream = stream.mapRecords(r -> {
                AcknowledgableWatermark ackableWatermark = new AcknowledgableWatermark(r.getWatermark());
                if (this.watermarkTracker.isPresent()) {
                    ((FineGrainedWatermarkTracker)this.watermarkTracker.get()).track(ackableWatermark);
                }
                r.addCallBack((Ackable)ackableWatermark);
                return r;
            });
        }
        if (!this.recordStreamProcessors.isEmpty()) {
            for (RecordStreamProcessor<?, ?, ?, ?> recordStreamProcessor : this.recordStreamProcessors) {
                stream = recordStreamProcessor.processStream(stream, (WorkUnitState)this.taskState);
            }
        } else if (this.converter instanceof MultiConverter) {
            for (Converter converter : ((MultiConverter)this.converter).getConverters()) {
                stream = converter.processStream(stream, (WorkUnitState)this.taskState);
            }
        } else {
            stream = this.converter.processStream(stream, (WorkUnitState)this.taskState);
        }
        stream = this.rowChecker.processStream(stream, (WorkUnitState)this.taskState);
        Forker.ForkedStream forkedStreams = new Forker().forkStream(stream, forkOperator, (WorkUnitState)this.taskState);
        boolean bl = !this.task.areSingleBranchTasksSynchronous(this.taskContext) || forkedStreams.getForkedStreams().size() > 1;
        int bufferSize = this.taskState.getPropAsInt("fork.record.queue.capacity", 100);
        for (int fidx = 0; fidx < forkedStreams.getForkedStreams().size(); ++fidx) {
            RecordStreamWithMetadata forkedStream = (RecordStreamWithMetadata)forkedStreams.getForkedStreams().get(fidx);
            if (forkedStream == null) continue;
            if (bl) {
                forkedStream = forkedStream.mapStream(f -> f.observeOn(Schedulers.from((Executor)this.taskExecutor.getForkExecutor()), false, bufferSize));
            }
            Fork fork = new Fork(this.taskContext, forkedStream.getGlobalMetadata().getSchema(), forkedStreams.getForkedStreams().size(), fidx, this.taskMode);
            fork.consumeRecordStream(forkedStream);
            this.forks.put((Optional<Fork>)Optional.of(fork), Optional.of((Object)Futures.immediateFuture(null)));
            this.task.configureStreamingFork(fork);
        }
        Thread thread = new Thread(() -> connectableStream.connect());
        thread.setName(this.getClass().getSimpleName());
        thread.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)new LoggingUncaughtExceptionHandler(Optional.absent()));
        thread.start();
        if (!ExponentialBackoff.awaitCondition().callable(() -> this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).initialDelay(Long.valueOf(initialDelay)).maxDelay(Long.valueOf(initialDelay)).maxWait(Long.valueOf(TimeUnit.MINUTES.toMillis(maxWaitInMinute))).await()) {
            throw new TimeoutException("Forks did not finish withing specified timeout.");
        }
    }

    public StreamModelTaskRunner(Task task, TaskState taskState, Closer closer, TaskContext taskContext, Extractor extractor, Converter converter, List<RecordStreamProcessor<?, ?, ?, ?>> recordStreamProcessors, RowLevelPolicyChecker rowChecker, TaskExecutor taskExecutor, ExecutionModel taskMode, AtomicBoolean shutdownRequested, Optional<FineGrainedWatermarkTracker> watermarkTracker, Optional<WatermarkManager> watermarkManager, Optional<WatermarkStorage> watermarkStorage, Map<Optional<Fork>, Optional<Future<?>>> forks) {
        this.task = task;
        this.taskState = taskState;
        this.closer = closer;
        this.taskContext = taskContext;
        this.extractor = extractor;
        this.converter = converter;
        this.recordStreamProcessors = recordStreamProcessors;
        this.rowChecker = rowChecker;
        this.taskExecutor = taskExecutor;
        this.taskMode = taskMode;
        this.shutdownRequested = shutdownRequested;
        this.watermarkTracker = watermarkTracker;
        this.watermarkManager = watermarkManager;
        this.watermarkStorage = watermarkStorage;
        this.forks = forks;
    }
}

