/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.fork;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.gobblin.Constructs;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.publisher.TaskPublisher;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
import org.apache.gobblin.records.RecordStreamConsumer;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.runtime.BoundedBlockingRecordQueue;
import org.apache.gobblin.runtime.ExecutionModel;
import org.apache.gobblin.runtime.MultiConverter;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
import org.apache.gobblin.writer.DataWriterWrapperBuilder;
import org.apache.gobblin.writer.Destination;
import org.apache.gobblin.writer.PartitionedDataWriter;
import org.apache.gobblin.writer.WatermarkAwareWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings(value={"unchecked"})
public class Fork<S, D>
implements Closeable,
FinalState,
RecordStreamConsumer<S, D>,
Runnable {
    private final Logger logger;
    private final TaskContext taskContext;
    private final TaskState taskState;
    private final TaskState forkTaskState;
    private final String taskId;
    private final Optional<String> taskAttemptId;
    private final int branches;
    private final int index;
    private final ExecutionModel executionModel;
    private final Converter converter;
    private final Optional<Object> convertedSchema;
    private final RowLevelPolicyChecker rowLevelPolicyChecker;
    private final RowLevelPolicyCheckResults rowLevelPolicyCheckingResult;
    private final Closer closer = Closer.create();
    private Optional<DataWriter<Object>> writer = Optional.absent();
    private volatile boolean parentTaskDone = false;
    private final AtomicReference<ForkState> forkState;
    private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";
    protected static final Object SHUTDOWN_RECORD = new Object();

    public Fork(TaskContext taskContext, Object schema, int branches, int index, ExecutionModel executionModel) throws Exception {
        boolean useEagerWriterInitialization;
        this.logger = LoggerFactory.getLogger((String)(Fork.class.getName() + "-" + index));
        this.taskContext = taskContext;
        this.taskState = this.taskContext.getTaskState();
        this.forkTaskState = branches > 1 ? new TaskState(this.taskState) : this.taskState;
        this.taskId = this.taskState.getTaskId();
        this.taskAttemptId = this.taskState.getTaskAttemptId();
        this.branches = branches;
        this.index = index;
        this.executionModel = executionModel;
        this.converter = (Converter)this.closer.register((Closeable)((Object)new MultiConverter(this.taskContext.getConverters(this.index, this.forkTaskState))));
        this.convertedSchema = Optional.fromNullable((Object)this.converter.convertSchema(schema, (WorkUnitState)this.taskState));
        this.rowLevelPolicyChecker = (RowLevelPolicyChecker)this.closer.register((Closeable)this.taskContext.getRowLevelPolicyChecker(this.index));
        this.rowLevelPolicyCheckingResult = new RowLevelPolicyCheckResults();
        boolean bl = useEagerWriterInitialization = this.taskState.getPropAsBoolean("writer.eager.initialization", false) || this.isStreamingMode();
        if (useEagerWriterInitialization) {
            this.buildWriterIfNotPresent();
        }
        this.forkState = new AtomicReference<ForkState>(ForkState.PENDING);
        if (GobblinMetrics.isEnabled((State)this.taskState)) {
            GobblinMetrics forkMetrics = GobblinMetrics.get((String)Fork.getForkMetricsName(taskContext.getTaskMetrics(), (State)this.taskState, index), (MetricContext)taskContext.getTaskMetrics().getMetricContext(), Fork.getForkMetricsTags((State)this.taskState, index));
            this.closer.register((Closeable)forkMetrics.getMetricContext());
            Instrumented.setMetricContextName((State)this.taskState, (String)forkMetrics.getMetricContext().getName());
        }
    }

    private boolean isStreamingMode() {
        return this.executionModel.equals((Object)ExecutionModel.STREAMING);
    }

    @SuppressWarnings(value={"RV_RETURN_VALUE_IGNORED"}, justification="We actually don't care about the return value of subscribe.")
    public void consumeRecordStream(RecordStreamWithMetadata<D, S> stream) throws RecordStreamProcessor.StreamProcessingException {
        if (this.converter instanceof MultiConverter) {
            for (Converter<?, ?, ?, ?> cverter : ((MultiConverter)this.converter).getConverters()) {
                stream = cverter.processStream(stream, (WorkUnitState)this.taskState);
            }
        } else {
            stream = this.converter.processStream(stream, (WorkUnitState)this.taskState);
        }
        stream = this.rowLevelPolicyChecker.processStream(stream, (WorkUnitState)this.taskState);
        stream = stream.mapStream(s -> s.map(r -> {
            this.onEachRecord();
            return r;
        }));
        stream = stream.mapStream(s -> s.doOnSubscribe(subscription -> this.onStart()));
        stream = stream.mapStream(s -> s.doOnComplete(() -> this.verifyAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED)));
        stream = stream.mapStream(s -> s.doOnCancel(() -> this.verifyAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED)));
        stream = stream.mapStream(s -> s.doOnError(exc -> {
            this.verifyAndSetForkState(ForkState.RUNNING, ForkState.FAILED);
            this.logger.error(String.format("Fork %d of task %s failed to process data records", this.index, this.taskId), exc);
        }));
        stream = stream.mapStream(s -> s.doFinally(this::cleanup));
        stream.getRecordStream().subscribe(r -> {
            if (r instanceof RecordEnvelope) {
                ((DataWriter)this.writer.get()).writeEnvelope((RecordEnvelope)r);
            } else if (r instanceof ControlMessage) {
                ((DataWriter)this.writer.get()).getMessageHandler().handleMessage((ControlMessage)r);
                r.ack();
            }
        }, e -> this.logger.error("Failed to process record.", e), () -> {
            if (this.writer.isPresent()) {
                ((DataWriter)this.writer.get()).close();
            }
        });
    }

    private void onStart() throws IOException {
        this.compareAndSetForkState(ForkState.PENDING, ForkState.RUNNING);
    }

    private void onEachRecord() throws IOException {
        this.buildWriterIfNotPresent();
    }

    @Override
    public void run() {
        this.compareAndSetForkState(ForkState.PENDING, ForkState.RUNNING);
        try {
            this.processRecords();
            this.compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED);
        }
        catch (Throwable t) {
            this.forkState.set(ForkState.FAILED);
            this.logger.error(String.format("Fork %d of task %s failed to process data records", this.index, this.taskId), t);
        }
        finally {
            this.cleanup();
        }
    }

    public State getFinalState() {
        ConstructState state = new ConstructState();
        if (this.converter != null) {
            state.addConstructState(Constructs.CONVERTER, new ConstructState(this.converter.getFinalState()));
        }
        if (this.rowLevelPolicyChecker != null) {
            state.addConstructState(Constructs.ROW_QUALITY_CHECKER, new ConstructState(this.rowLevelPolicyChecker.getFinalState()));
        }
        if (this.writer.isPresent() && this.writer.get() instanceof FinalState) {
            state.addConstructState(Constructs.WRITER, new ConstructState(((FinalState)this.writer.get()).getFinalState()));
        }
        return state;
    }

    public boolean putRecord(Object record) throws InterruptedException {
        if (this.forkState.compareAndSet(ForkState.FAILED, ForkState.FAILED)) {
            throw new IllegalStateException(String.format("Fork %d of task %s has failed and is no longer running", this.index, this.taskId));
        }
        return this.putRecordImpl(record);
    }

    public void markParentTaskDone() {
        this.parentTaskDone = true;
        try {
            this.putRecord(SHUTDOWN_RECORD);
        }
        catch (InterruptedException e) {
            this.logger.info("Interrupted while writing a shutdown record into the fork queue. Ignoring");
        }
    }

    public boolean isParentTaskDone() {
        return this.parentTaskDone;
    }

    public void updateRecordMetrics() {
        if (this.writer.isPresent()) {
            this.taskState.updateRecordMetrics(((DataWriter)this.writer.get()).recordsWritten(), this.index);
        }
    }

    public void updateByteMetrics() throws IOException {
        if (this.writer.isPresent()) {
            this.taskState.updateByteMetrics(((DataWriter)this.writer.get()).bytesWritten(), this.index);
        }
    }

    public boolean commit() throws Exception {
        try {
            if (this.checkDataQuality(this.convertedSchema)) {
                this.logger.info(String.format("Committing data for fork %d of task %s", this.index, this.taskId));
                this.commitData();
                this.verifyAndSetForkState(ForkState.SUCCEEDED, ForkState.COMMITTED);
                return true;
            }
            this.logger.error(String.format("Fork %d of task %s failed to pass quality checking", this.index, this.taskId));
            this.verifyAndSetForkState(ForkState.SUCCEEDED, ForkState.FAILED);
            return false;
        }
        catch (Throwable t) {
            this.logger.error(String.format("Fork %d of task %s failed to commit data", this.index, this.taskId), t);
            this.forkState.set(ForkState.FAILED);
            Throwables.propagate((Throwable)t);
            return false;
        }
    }

    public String getTaskId() {
        return this.taskId;
    }

    public int getIndex() {
        return this.index;
    }

    public Optional<BoundedBlockingRecordQueue.QueueStats> queueStats() {
        return Optional.absent();
    }

    public boolean isSucceeded() {
        return this.forkState.compareAndSet(ForkState.SUCCEEDED, ForkState.SUCCEEDED);
    }

    public boolean isDone() {
        return this.forkState.get() == ForkState.SUCCEEDED || this.forkState.get() == ForkState.FAILED;
    }

    public String toString() {
        return "Fork: TaskId = \"" + this.taskId + "\" Index: \"" + this.index + "\" State: \"" + this.forkState + "\"";
    }

    @Override
    public void close() throws IOException {
        this.parentTaskDone = true;
        this.taskState.setProp(ForkOperatorUtils.getPropertyNameForBranch((String)"fork.state", (int)this.branches, (int)this.index), this.forkState.get().name());
        try {
            this.closer.close();
        }
        finally {
            if (this.writer.isPresent()) {
                ((DataWriter)this.writer.get()).cleanup();
            }
        }
    }

    public long getRecordsWritten() {
        return this.writer.isPresent() ? ((DataWriter)this.writer.get()).recordsWritten() : 0L;
    }

    public long getBytesWritten() {
        try {
            return this.writer.isPresent() ? ((DataWriter)this.writer.get()).bytesWritten() : 0L;
        }
        catch (Throwable t) {
            return 0L;
        }
    }

    protected void processRecords() throws IOException, DataConversionException {
        throw new UnsupportedOperationException();
    }

    protected void processRecord(Object record) throws IOException, DataConversionException {
        if (this.forkState.compareAndSet(ForkState.FAILED, ForkState.FAILED)) {
            throw new IllegalStateException(String.format("Fork %d of task %s has failed and is no longer running", this.index, this.taskId));
        }
        if (record == null || record == SHUTDOWN_RECORD) {
            if (this.parentTaskDone) {
                return;
            }
        } else if (this.isStreamingMode()) {
            RecordEnvelope recordEnvelope = (RecordEnvelope)record;
            for (Object convertedRecord : this.converter.convertRecord(this.convertedSchema, recordEnvelope.getRecord(), (WorkUnitState)this.taskState)) {
                if (!this.rowLevelPolicyChecker.executePolicies(convertedRecord, this.rowLevelPolicyCheckingResult)) continue;
                ((WatermarkAwareWriter)this.writer.get()).writeEnvelope(recordEnvelope.withRecord(convertedRecord));
            }
            recordEnvelope.ack();
        } else {
            this.buildWriterIfNotPresent();
            for (Object convertedRecord : this.converter.convertRecord(this.convertedSchema, record, (WorkUnitState)this.taskState)) {
                if (!this.rowLevelPolicyChecker.executePolicies(convertedRecord, this.rowLevelPolicyCheckingResult)) continue;
                ((DataWriter)this.writer.get()).writeEnvelope(new RecordEnvelope(convertedRecord));
            }
        }
    }

    protected boolean putRecordImpl(Object record) throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    protected void cleanup() {
    }

    private DataWriter<Object> buildWriter() throws IOException {
        DataWriterBuilder builder = this.taskContext.getDataWriterBuilder(this.branches, this.index).writeTo(Destination.of((Destination.DestinationType)this.taskContext.getDestinationType(this.branches, this.index), (State)this.taskState)).writeInFormat(this.taskContext.getWriterOutputFormat(this.branches, this.index)).withWriterId(this.taskId).withSchema(this.convertedSchema.orNull()).withBranches(this.branches).forBranch(this.index);
        if (this.taskAttemptId.isPresent()) {
            builder.withAttemptId((String)this.taskAttemptId.get());
        }
        PartitionedDataWriter writer = new PartitionedDataWriter(builder, (State)this.taskContext.getTaskState());
        this.logger.info("Wrapping writer " + writer);
        return new DataWriterWrapperBuilder((DataWriter)writer, (State)this.taskState).build();
    }

    private void buildWriterIfNotPresent() throws IOException {
        if (!this.writer.isPresent()) {
            this.writer = Optional.of((Object)this.closer.register(this.buildWriter()));
        }
    }

    private boolean checkDataQuality(Optional<Object> schema) throws Exception {
        if (this.branches > 1) {
            this.forkTaskState.setProp("qualitychecker.rows.expected", this.taskState.getProp("qualitychecker.rows.expected"));
            this.forkTaskState.setProp("qualitychecker.rows.extracted", this.taskState.getProp("qualitychecker.rows.extracted"));
        }
        String writerRecordsWrittenKey = ForkOperatorUtils.getPropertyNameForBranch((String)"writer.records.written", (int)this.branches, (int)this.index);
        if (this.writer.isPresent()) {
            this.forkTaskState.setProp("qualitychecker.rows.written", ((DataWriter)this.writer.get()).recordsWritten());
            this.taskState.setProp(writerRecordsWrittenKey, ((DataWriter)this.writer.get()).recordsWritten());
        } else {
            this.forkTaskState.setProp("qualitychecker.rows.written", 0L);
            this.taskState.setProp(writerRecordsWrittenKey, 0L);
        }
        if (schema.isPresent()) {
            this.forkTaskState.setProp("extract.schema", schema.get().toString());
        }
        try {
            TaskLevelPolicyCheckResults taskResults = this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState, this.branches > 1 ? this.index : -1).executePolicies();
            TaskPublisher publisher = this.taskContext.getTaskPublisher(this.forkTaskState, taskResults);
            switch (publisher.canPublish()) {
                case SUCCESS: {
                    return true;
                }
                case CLEANUP_FAIL: {
                    this.logger.error("Cleanup failed for task " + this.taskId);
                    break;
                }
                case POLICY_TESTS_FAIL: {
                    this.logger.error("Not all quality checking passed for task " + this.taskId);
                    break;
                }
                case COMPONENTS_NOT_FINISHED: {
                    this.logger.error("Not all components completed for task " + this.taskId);
                    break;
                }
            }
            return false;
        }
        catch (Throwable t) {
            this.logger.error("Failed to check task-level data quality", t);
            return false;
        }
    }

    private void commitData() throws IOException {
        if (this.writer.isPresent()) {
            ((DataWriter)this.writer.get()).commit();
        }
        try {
            if (GobblinMetrics.isEnabled((State)this.taskState.getWorkunit())) {
                this.updateRecordMetrics();
                this.updateByteMetrics();
            }
        }
        catch (IOException ioe) {
            this.logger.error("Failed to update byte-level metrics of task " + this.taskId);
        }
    }

    private boolean compareAndSetForkState(ForkState expectedState, ForkState newState) {
        return this.forkState.compareAndSet(expectedState, newState);
    }

    private void verifyAndSetForkState(ForkState expectedState, ForkState newState) {
        if (!this.compareAndSetForkState(expectedState, newState)) {
            throw new IllegalStateException(String.format("Expected fork state %s; actual fork state %s", expectedState.name(), this.forkState.get().name()));
        }
    }

    private static List<Tag<?>> getForkMetricsTags(State state, int index) {
        return ImmutableList.of((Object)new Tag(FORK_METRICS_BRANCH_NAME_KEY, (Object)Fork.getForkMetricsId(state, index)));
    }

    private static String getForkMetricsName(TaskMetrics taskMetrics, State state, int index) {
        return taskMetrics.getName() + "." + Fork.getForkMetricsId(state, index);
    }

    private static String getForkMetricsId(State state, int index) {
        return state.getProp("fork.branch.name." + index, "fork_" + index);
    }

    public boolean isSpeculativeExecutionSafe() {
        if (!this.writer.isPresent()) {
            return true;
        }
        if (!(this.writer.get() instanceof SpeculativeAttemptAwareConstruct)) {
            this.logger.info("Writer is not speculative safe: " + ((DataWriter)this.writer.get()).getClass().toString());
            return false;
        }
        return ((SpeculativeAttemptAwareConstruct)this.writer.get()).isSpeculativeAttemptSafe();
    }

    public DataWriter getWriter() throws IOException {
        Preconditions.checkState((boolean)this.writer.isPresent(), (Object)"Asked to get a writer, but writer is null");
        return (DataWriter)this.writer.get();
    }

    static enum ForkState {
        PENDING,
        RUNNING,
        SUCCEEDED,
        FAILED,
        COMMITTED;

    }
}

