/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.gobblin.Constructs;
import org.apache.gobblin.ack.Ackable;
import org.apache.gobblin.broker.EmptyKey;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.NotConfiguredException;
import org.apache.gobblin.broker.iface.SharedResourceFactory;
import org.apache.gobblin.broker.iface.SharedResourceKey;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.fork.CopyHelper;
import org.apache.gobblin.fork.CopyNotSupportedException;
import org.apache.gobblin.fork.Copyable;
import org.apache.gobblin.fork.ForkOperator;
import org.apache.gobblin.instrumented.extractor.InstrumentedExtractorBase;
import org.apache.gobblin.instrumented.extractor.InstrumentedExtractorDecorator;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.FailureEventBuilder;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.SingleTaskDataPublisher;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.runtime.ExecutionModel;
import org.apache.gobblin.runtime.ForkBranchMismatchException;
import org.apache.gobblin.runtime.ForkException;
import org.apache.gobblin.runtime.ForkThrowableHolder;
import org.apache.gobblin.runtime.ForkThrowableHolderFactory;
import org.apache.gobblin.runtime.MultiConverter;
import org.apache.gobblin.runtime.StreamModelTaskRunner;
import org.apache.gobblin.runtime.TaskConfigurationKeys;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskInstantiationException;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.fork.AsynchronousFork;
import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.runtime.fork.SynchronousFork;
import org.apache.gobblin.runtime.task.TaskIFace;
import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.extractor.StreamingExtractor;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AcknowledgableWatermark;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
import org.apache.gobblin.writer.MultiWriterWatermarkManager;
import org.apache.gobblin.writer.TrackerBasedWatermarkManager;
import org.apache.gobblin.writer.WatermarkAwareWriter;
import org.apache.gobblin.writer.WatermarkManager;
import org.apache.gobblin.writer.WatermarkStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class Task
implements TaskIFace {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private static final String TASK_STATE = "taskState";
    private static final String FAILED_TASK_EVENT = "failedTask";
    private final String jobId;
    private final String taskId;
    private final String taskKey;
    private final boolean isIgnoreCloseFailures;
    private final TaskContext taskContext;
    private final TaskState taskState;
    private final TaskStateTracker taskStateTracker;
    private final TaskExecutor taskExecutor;
    private final Optional<CountDownLatch> countDownLatch;
    private final Map<Optional<Fork>, Optional<Future<?>>> forks = Maps.newLinkedHashMap();
    private final AtomicInteger retryCount = new AtomicInteger();
    private final Converter converter;
    private final InstrumentedExtractorBase extractor;
    private final RowLevelPolicyChecker rowChecker;
    private final ExecutionModel taskMode;
    private final String watermarkingStrategy;
    private final Optional<WatermarkManager> watermarkManager;
    private final Optional<FineGrainedWatermarkTracker> watermarkTracker;
    private final Optional<WatermarkStorage> watermarkStorage;
    private final List<RecordStreamProcessor<?, ?, ?, ?>> recordStreamProcessors;
    private final Closer closer;
    private long startTime;
    private volatile long lastRecordPulledTimestampMillis;
    private final AtomicLong recordsPulled;
    private final AtomicBoolean shutdownRequested;
    private volatile long shutdownRequestedTime = Long.MAX_VALUE;
    private final CountDownLatch shutdownLatch;
    private Future<?> taskFuture;

    public Task(TaskContext context, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, Optional<CountDownLatch> countDownLatch) {
        this.taskContext = context;
        this.taskState = context.getTaskState();
        this.jobId = this.taskState.getJobId();
        this.taskId = this.taskState.getTaskId();
        this.taskKey = this.taskState.getTaskKey();
        this.isIgnoreCloseFailures = this.taskState.getJobState().getPropAsBoolean("task.ignoreCloseFailures", false);
        this.taskStateTracker = taskStateTracker;
        this.taskExecutor = taskExecutor;
        this.countDownLatch = countDownLatch;
        this.closer = Closer.create();
        this.closer.register((Closeable)this.taskState.getTaskBrokerNullable());
        this.extractor = (InstrumentedExtractorBase)this.closer.register((Closeable)new InstrumentedExtractorDecorator((WorkUnitState)this.taskState, this.taskContext.getExtractor()));
        this.recordStreamProcessors = this.taskContext.getRecordStreamProcessors();
        for (RecordStreamProcessor<?, ?, ?, ?> r : this.recordStreamProcessors) {
            if (!(r instanceof Closeable)) continue;
            this.closer.register((Closeable)r);
        }
        List<Converter<?, ?, ?, ?>> converters = this.taskContext.getConverters();
        this.converter = (Converter)this.closer.register((Closeable)((Object)new MultiConverter(converters)));
        try {
            Preconditions.checkState((this.recordStreamProcessors.isEmpty() || converters.isEmpty() ? 1 : 0) != 0, (Object)"Converters cannot be specified when RecordStreamProcessors are specified");
        }
        catch (IllegalStateException e) {
            try {
                this.closer.close();
            }
            catch (Throwable t) {
                LOG.error("Failed to close all open resources", t);
            }
            throw new TaskInstantiationException("Converters cannot be specified when RecordStreamProcessors are specified");
        }
        try {
            this.rowChecker = (RowLevelPolicyChecker)this.closer.register((Closeable)this.taskContext.getRowLevelPolicyChecker());
        }
        catch (Exception e) {
            try {
                this.closer.close();
            }
            catch (Throwable t) {
                LOG.error("Failed to close all open resources", t);
            }
            throw new RuntimeException("Failed to instantiate row checker.", e);
        }
        this.taskMode = Task.getExecutionModel((State)this.taskState);
        this.recordsPulled = new AtomicLong(0L);
        this.lastRecordPulledTimestampMillis = 0L;
        this.shutdownRequested = new AtomicBoolean(false);
        this.shutdownLatch = new CountDownLatch(1);
        this.watermarkingStrategy = "FineGrain";
        if (this.isStreamingTask()) {
            Config config;
            Extractor underlyingExtractor = this.taskContext.getRawSourceExtractor();
            if (!(underlyingExtractor instanceof StreamingExtractor)) {
                LOG.error("Extractor {}  is not an instance of StreamingExtractor but the task is configured to run in continuous mode", (Object)underlyingExtractor.getClass().getName());
                throw new TaskInstantiationException("Extraction " + underlyingExtractor.getClass().getName() + " is not an instance of StreamingExtractor but the task is configured to run in continuous mode");
            }
            this.watermarkStorage = Optional.of((Object)this.taskContext.getWatermarkStorage());
            try {
                config = ConfigUtils.propertiesToConfig((Properties)this.taskState.getProperties());
            }
            catch (Exception e) {
                LOG.warn("Failed to deserialize taskState into Config.. continuing with an empty config", (Throwable)e);
                config = ConfigFactory.empty();
            }
            long commitIntervalMillis = ConfigUtils.getLong((Config)config, (String)"streaming.watermark.commitIntervalMillis", (Long)TaskConfigurationKeys.DEFAULT_STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS);
            if (this.watermarkingStrategy.equals("FineGrain")) {
                this.watermarkTracker = Optional.of((Object)this.closer.register((Closeable)new FineGrainedWatermarkTracker(config)));
                this.watermarkManager = Optional.of((Object)((WatermarkManager)this.closer.register((Closeable)new TrackerBasedWatermarkManager((WatermarkStorage)this.watermarkStorage.get(), (FineGrainedWatermarkTracker)this.watermarkTracker.get(), commitIntervalMillis, Optional.of((Object)LOG)))));
            } else {
                this.watermarkManager = Optional.of((Object)((WatermarkManager)this.closer.register((Closeable)new MultiWriterWatermarkManager((WatermarkStorage)this.watermarkStorage.get(), commitIntervalMillis, Optional.of((Object)LOG)))));
                this.watermarkTracker = Optional.absent();
            }
        } else {
            this.watermarkManager = Optional.absent();
            this.watermarkTracker = Optional.absent();
            this.watermarkStorage = Optional.absent();
        }
    }

    public static ForkThrowableHolder getForkThrowableHolder(SharedResourcesBroker<GobblinScopeTypes> broker) {
        try {
            return (ForkThrowableHolder)broker.getSharedResource((SharedResourceFactory)new ForkThrowableHolderFactory(), (SharedResourceKey)EmptyKey.INSTANCE);
        }
        catch (NotConfiguredException e) {
            LOG.error("Fail to get fork throwable holder instance from broker. Will not track fork exception.", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public static ExecutionModel getExecutionModel(State state) {
        String mode = state.getProp("task.executionMode", "BATCH");
        try {
            return ExecutionModel.valueOf(mode.toUpperCase());
        }
        catch (Exception e) {
            LOG.warn("Could not find an execution model corresponding to {}, returning {}", new Object[]{mode, ExecutionModel.BATCH, e});
            return ExecutionModel.BATCH;
        }
    }

    protected boolean areSingleBranchTasksSynchronous(TaskContext taskContext) {
        return BooleanUtils.toBoolean((String)taskContext.getTaskState().getProp("gobblin.task.is.single.branch.synchronous", TaskConfigurationKeys.DEFAULT_TASK_IS_SINGLE_BRANCH_SYNCHRONOUS));
    }

    protected boolean isStreamingTask() {
        return this.taskMode.equals((Object)ExecutionModel.STREAMING);
    }

    @Override
    public boolean awaitShutdown(long timeoutInMillis) throws InterruptedException {
        return this.shutdownLatch.await(timeoutInMillis, TimeUnit.MILLISECONDS);
    }

    private void completeShutdown() {
        this.shutdownLatch.countDown();
    }

    private boolean shutdownRequested() {
        if (!this.shutdownRequested.get()) {
            this.shutdownRequested.set(Thread.currentThread().isInterrupted());
        }
        return this.shutdownRequested.get();
    }

    @Override
    public void shutdown() {
        this.shutdownRequested.set(true);
        this.shutdownRequestedTime = Math.min(System.currentTimeMillis(), this.shutdownRequestedTime);
    }

    @Override
    public String getProgress() {
        long currentTime = System.currentTimeMillis();
        long lastRecordTimeElapsed = currentTime - this.lastRecordPulledTimestampMillis;
        if (this.isStreamingTask()) {
            WatermarkManager.CommitStatus commitStatus = ((WatermarkManager)this.watermarkManager.get()).getCommitStatus();
            long lastWatermarkCommitTimeElapsed = currentTime - commitStatus.getLastWatermarkCommitSuccessTimestampMillis();
            String progressString = String.format("recordsPulled:%d, lastRecordExtracted: %d ms ago, lastWatermarkCommitted: %d ms ago, lastWatermarkCommitted: %s", this.recordsPulled.get(), lastRecordTimeElapsed, lastWatermarkCommitTimeElapsed, commitStatus.getLastCommittedWatermarks());
            return progressString;
        }
        String progressString = String.format("recordsPulled:%d, lastRecordExtracted: %d ms ago", this.recordsPulled.get(), lastRecordTimeElapsed);
        return progressString;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        MDC.put((String)"task.key", (String)this.taskKey);
        this.startTime = System.currentTimeMillis();
        this.taskState.setStartTime(this.startTime);
        this.taskState.setWorkingState(WorkUnitState.WorkingState.RUNNING);
        this.forks.clear();
        try {
            if (this.taskState.getPropAsBoolean("task.execution.synchronousExecutionModel", true)) {
                LOG.warn("Synchronous task execution model is deprecated. Please consider using stream model.");
                this.runSynchronousModel();
            } else {
                new StreamModelTaskRunner(this, this.taskState, this.closer, this.taskContext, (Extractor)this.extractor, this.converter, this.recordStreamProcessors, this.rowChecker, this.taskExecutor, this.taskMode, this.shutdownRequested, this.watermarkTracker, this.watermarkManager, this.watermarkStorage, this.forks, this.watermarkingStrategy).run();
            }
            LOG.info("Extracted " + this.recordsPulled + " data records");
            LOG.info("Row quality checker finished with results: " + this.rowChecker.getResults().getResults());
            this.taskState.setProp("qualitychecker.rows.extracted", this.recordsPulled);
            this.taskState.setProp("qualitychecker.rows.expected", this.extractor.getExpectedRecordCount());
            if (!this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isSucceeded)) {
                throw new RuntimeException("Some forks failed.");
            }
            if (this.watermarkManager.isPresent()) {
                ((WatermarkManager)this.watermarkManager.get()).close();
            }
            if (this.watermarkTracker.isPresent()) {
                ((FineGrainedWatermarkTracker)this.watermarkTracker.get()).close();
            }
        }
        catch (Throwable t) {
            this.failTask(t);
        }
        finally {
            Task task = this;
            synchronized (task) {
                if (this.taskFuture == null || !this.taskFuture.isCancelled()) {
                    this.taskStateTracker.onTaskRunCompletion(this);
                    this.completeShutdown();
                    this.taskFuture = null;
                } else {
                    LOG.info("will not decrease count down latch as this task is cancelled");
                }
            }
        }
    }

    @Deprecated
    private void runSynchronousModel() throws Exception {
        ForkOperator forkOperator = (ForkOperator)this.closer.register((Closeable)this.taskContext.getForkOperator());
        forkOperator.init((WorkUnitState)this.taskState);
        int branches = forkOperator.getBranches((WorkUnitState)this.taskState);
        this.taskState.setProp("fork.branches", branches);
        Object schema = this.converter.convertSchema(this.extractor.getSchema(), (WorkUnitState)this.taskState);
        List forkedSchemas = forkOperator.forkSchema((WorkUnitState)this.taskState, schema);
        if (forkedSchemas.size() != branches) {
            throw new ForkBranchMismatchException(String.format("Number of forked schemas [%d] is not equal to number of branches [%d]", forkedSchemas.size(), branches));
        }
        if (Task.inMultipleBranches(forkedSchemas) && !CopyHelper.isCopyable((Object)schema)) {
            throw new CopyNotSupportedException(schema + " is not copyable");
        }
        RowLevelPolicyCheckResults rowResults = new RowLevelPolicyCheckResults();
        if (!this.areSingleBranchTasksSynchronous(this.taskContext) || branches > 1) {
            for (int i = 0; i < branches; ++i) {
                if (((Boolean)forkedSchemas.get(i)).booleanValue()) {
                    AsynchronousFork asynchronousFork = (AsynchronousFork)this.closer.register((Closeable)new AsynchronousFork(this.taskContext, schema instanceof Copyable ? ((Copyable)schema).copy() : schema, branches, i, this.taskMode));
                    this.configureStreamingFork(asynchronousFork, this.watermarkingStrategy);
                    this.forks.put((Optional<Fork>)Optional.of((Object)asynchronousFork), Optional.of(this.taskExecutor.submit(asynchronousFork)));
                    continue;
                }
                this.forks.put((Optional<Fork>)Optional.absent(), Optional.absent());
            }
        } else {
            SynchronousFork fork = (SynchronousFork)this.closer.register((Closeable)new SynchronousFork(this.taskContext, schema instanceof Copyable ? ((Copyable)schema).copy() : schema, branches, 0, this.taskMode));
            this.configureStreamingFork(fork, this.watermarkingStrategy);
            this.forks.put((Optional<Fork>)Optional.of((Object)fork), Optional.of(this.taskExecutor.submit(fork)));
        }
        if (this.isStreamingTask()) {
            RecordEnvelope recordEnvelope;
            if (this.watermarkTracker.isPresent()) {
                ((FineGrainedWatermarkTracker)this.watermarkTracker.get()).start();
            }
            ((WatermarkManager)this.watermarkManager.get()).start();
            ((StreamingExtractor)this.taskContext.getRawSourceExtractor()).start((WatermarkStorage)this.watermarkStorage.get());
            while (!this.shutdownRequested() && (recordEnvelope = this.extractor.readRecordEnvelope()) != null) {
                this.onRecordExtract();
                AcknowledgableWatermark acknowledgableWatermark = new AcknowledgableWatermark(recordEnvelope.getWatermark());
                if (this.watermarkTracker.isPresent()) {
                    ((FineGrainedWatermarkTracker)this.watermarkTracker.get()).track(acknowledgableWatermark);
                }
                for (Iterator convertedRecord : this.converter.convertRecord(schema, (Object)recordEnvelope, (WorkUnitState)this.taskState)) {
                    this.processRecord(convertedRecord, forkOperator, this.rowChecker, rowResults, branches, acknowledgableWatermark.incrementAck());
                }
                acknowledgableWatermark.ack();
            }
        } else {
            RecordEnvelope record;
            long l = 0L;
            while ((record = this.extractor.readRecordEnvelope()) != null) {
                this.onRecordExtract();
                try {
                    for (Object convertedRecord : this.converter.convertRecord(schema, record.getRecord(), (WorkUnitState)this.taskState)) {
                        this.processRecord(convertedRecord, forkOperator, this.rowChecker, rowResults, branches, null);
                    }
                }
                catch (Exception e) {
                    if (!(e instanceof DataConversionException) && !(e.getCause() instanceof DataConversionException)) {
                        LOG.error("Processing record incurs an unexpected exception: ", (Throwable)e);
                        throw new RuntimeException(e.getCause());
                    }
                    if (++l <= this.taskState.getPropAsLong("task.skip.error.records", 0L)) continue;
                    throw new RuntimeException(e);
                }
            }
        }
        LOG.info("Extracted " + this.recordsPulled + " data records");
        LOG.info("Row quality checker finished with results: " + rowResults.getResults());
        this.taskState.setProp("qualitychecker.rows.extracted", this.recordsPulled);
        this.taskState.setProp("qualitychecker.rows.expected", this.extractor.getExpectedRecordCount());
        for (Optional<Fork> optional : this.forks.keySet()) {
            if (!optional.isPresent()) continue;
            ((Fork)optional.get()).markParentTaskDone();
        }
        for (Optional optional : this.forks.values()) {
            if (!optional.isPresent()) continue;
            try {
                long forkFutureStartTime = System.nanoTime();
                ((Future)optional.get()).get();
                long forkDuration = System.nanoTime() - forkFutureStartTime;
                LOG.info("Task shutdown: Fork future reaped in {} millis", (Object)(forkDuration / 1000000L));
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    protected void configureStreamingFork(Fork fork, String watermarkingStrategy) throws IOException {
        if (this.isStreamingTask()) {
            DataWriter forkWriter = fork.getWriter();
            if (forkWriter instanceof WatermarkAwareWriter) {
                if (watermarkingStrategy.equals("WriterBased")) {
                    ((MultiWriterWatermarkManager)this.watermarkManager.get()).registerWriter((WatermarkAwareWriter)forkWriter);
                }
            } else {
                String errorMessage = String.format("The Task is configured to run in continuous mode, but the writer %s is not a WatermarkAwareWriter", forkWriter.getClass().getName());
                LOG.error(errorMessage);
                throw new RuntimeException(errorMessage);
            }
        }
    }

    protected void onRecordExtract() {
        this.recordsPulled.incrementAndGet();
        this.lastRecordPulledTimestampMillis = System.currentTimeMillis();
    }

    private void failTask(Throwable t) {
        LOG.error(String.format("Task %s failed", this.taskId), t);
        this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
        this.taskState.setProp("task.failure.exception", Throwables.getStackTraceAsString((Throwable)t));
        FailureEventBuilder failureEvent = new FailureEventBuilder(FAILED_TASK_EVENT);
        failureEvent.setRootCause(t);
        failureEvent.addMetadata(TASK_STATE, this.taskState.toString());
        failureEvent.submit(this.taskContext.getTaskMetrics().getMetricContext());
    }

    private boolean shouldPublishDataInTask() {
        boolean publishDataAtJobLevel = this.taskState.getPropAsBoolean("publish.data.at.job.level", true);
        if (publishDataAtJobLevel) {
            LOG.info(String.format("%s is true. Will publish data at the job level.", "publish.data.at.job.level"));
            return false;
        }
        JobCommitPolicy jobCommitPolicy = JobCommitPolicy.getCommitPolicy((State)this.taskState);
        if (jobCommitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS) {
            return this.taskState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL;
        }
        if (jobCommitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS) {
            return true;
        }
        LOG.info("Will publish data at the job level with job commit policy: " + jobCommitPolicy);
        return false;
    }

    private void publishTaskData() throws IOException {
        try (Closer closer = Closer.create();){
            Class<? extends DataPublisher> dataPublisherClass = this.getTaskPublisherClass();
            SingleTaskDataPublisher publisher = (SingleTaskDataPublisher)closer.register((Closeable)SingleTaskDataPublisher.getInstance(dataPublisherClass, (State)this.taskState));
            LOG.info("Publishing data from task " + this.taskId);
            publisher.publish((WorkUnitState)this.taskState);
        }
    }

    private Class<? extends DataPublisher> getTaskPublisherClass() throws ReflectiveOperationException {
        if (this.taskState.contains("data.publisher.task.type")) {
            return Class.forName(this.taskState.getProp("data.publisher.task.type"));
        }
        return Class.forName(this.taskState.getProp("data.publisher.type", "org.apache.gobblin.publisher.BaseDataPublisher"));
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getTaskId() {
        return this.taskId;
    }

    public String getTaskKey() {
        return this.taskKey;
    }

    public TaskContext getTaskContext() {
        return this.taskContext;
    }

    public TaskState getTaskState() {
        return this.taskState;
    }

    @Override
    public State getPersistentState() {
        return this.getTaskState();
    }

    @Override
    public State getExecutionMetadata() {
        return this.getTaskState();
    }

    @Override
    public WorkUnitState.WorkingState getWorkingState() {
        return this.getTaskState().getWorkingState();
    }

    public List<Optional<Fork>> getForks() {
        return ImmutableList.copyOf(this.forks.keySet());
    }

    public void updateRecordMetrics() {
        for (Optional<Fork> fork : this.forks.keySet()) {
            if (!fork.isPresent()) continue;
            ((Fork)fork.get()).updateRecordMetrics();
        }
    }

    public void updateByteMetrics() {
        try {
            for (Optional<Fork> fork : this.forks.keySet()) {
                if (!fork.isPresent()) continue;
                ((Fork)fork.get()).updateByteMetrics();
            }
        }
        catch (IOException ioe) {
            LOG.error("Failed to update byte-level metrics for task " + this.taskId, (Throwable)ioe);
        }
    }

    public void incrementRetryCount() {
        this.retryCount.incrementAndGet();
    }

    public int getRetryCount() {
        return this.retryCount.get();
    }

    public void markTaskCompletion() {
        if (this.countDownLatch.isPresent()) {
            ((CountDownLatch)this.countDownLatch.get()).countDown();
        }
        this.taskState.setProp("task.retries", this.retryCount.get());
    }

    public String toString() {
        return this.taskId;
    }

    private void processRecord(Object convertedRecord, ForkOperator forkOperator, RowLevelPolicyChecker rowChecker, RowLevelPolicyCheckResults rowResults, int branches, AcknowledgableWatermark watermark) throws Exception {
        if (!rowChecker.executePolicies(convertedRecord, rowResults)) {
            if (watermark != null) {
                watermark.ack();
            }
            return;
        }
        List forkedRecords = forkOperator.forkDataRecord((WorkUnitState)this.taskState, convertedRecord);
        if (forkedRecords.size() != branches) {
            throw new ForkBranchMismatchException(String.format("Number of forked data records [%d] is not equal to number of branches [%d]", forkedRecords.size(), branches));
        }
        boolean needToCopy = Task.inMultipleBranches(forkedRecords);
        if (needToCopy && !CopyHelper.isCopyable((Object)convertedRecord)) {
            throw new CopyNotSupportedException(convertedRecord.getClass().getName() + " is not copyable");
        }
        int branch = 0;
        int copyInstance = 0;
        for (Optional<Fork> fork : this.forks.keySet()) {
            if (fork.isPresent() && ((Boolean)forkedRecords.get(branch)).booleanValue()) {
                Object recordForFork = needToCopy ? CopyHelper.copy((Object)convertedRecord) : convertedRecord;
                ++copyInstance;
                if (this.isStreamingTask()) {
                    ((RecordEnvelope)recordForFork).addCallBack((Ackable)watermark.incrementAck());
                }
                boolean succeeded = false;
                while (!succeeded) {
                    succeeded = ((Fork)fork.get()).putRecord(recordForFork);
                }
            }
            ++branch;
        }
        if (watermark != null) {
            watermark.ack();
        }
    }

    private static boolean inMultipleBranches(List<Boolean> branches) {
        int inBranches = 0;
        for (Boolean bool : branches) {
            if (bool.booleanValue() && ++inBranches > 1) break;
        }
        return inBranches > 1;
    }

    private long getRecordsWritten() {
        long recordsWritten = 0L;
        for (Optional<Fork> fork : this.forks.keySet()) {
            recordsWritten += ((Fork)fork.get()).getRecordsWritten();
        }
        return recordsWritten;
    }

    private long getBytesWritten() {
        long bytesWritten = 0L;
        for (Optional<Fork> fork : this.forks.keySet()) {
            bytesWritten += ((Fork)fork.get()).getBytesWritten();
        }
        return bytesWritten;
    }

    private void addConstructsFinalStateToTaskState(InstrumentedExtractorBase<?, ?> extractor, Converter<?, ?, ?, ?> converter, RowLevelPolicyChecker rowChecker) {
        ConstructState constructState = new ConstructState();
        if (extractor != null) {
            constructState.addConstructState(Constructs.EXTRACTOR, new ConstructState(extractor.getFinalState()));
        }
        if (converter != null) {
            constructState.addConstructState(Constructs.CONVERTER, new ConstructState(converter.getFinalState()));
        }
        if (rowChecker != null) {
            constructState.addConstructState(Constructs.ROW_QUALITY_CHECKER, new ConstructState(rowChecker.getFinalState()));
        }
        int forkIdx = 0;
        for (Optional<Fork> fork : this.forks.keySet()) {
            constructState.addConstructState(Constructs.FORK_OPERATOR, new ConstructState(((Fork)fork.get()).getFinalState()), Integer.toString(forkIdx));
            ++forkIdx;
        }
        constructState.mergeIntoWorkUnitState((WorkUnitState)this.taskState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void commit() {
        boolean isTaskFailed;
        block41: {
            isTaskFailed = false;
            ArrayList<Integer> failedForkIds = new ArrayList<Integer>();
            for (Optional<Fork> fork : this.forks.keySet()) {
                if (!fork.isPresent()) continue;
                if (((Fork)fork.get()).isSucceeded()) {
                    if (((Fork)fork.get()).commit()) continue;
                    failedForkIds.add(((Fork)fork.get()).getIndex());
                    continue;
                }
                failedForkIds.add(((Fork)fork.get()).getIndex());
            }
            if (failedForkIds.size() == 0) {
                if (this.taskState.getWorkingState() != WorkUnitState.WorkingState.FAILED) {
                    this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
                }
            } else {
                ForkThrowableHolder holder = Task.getForkThrowableHolder((SharedResourcesBroker<GobblinScopeTypes>)this.taskState.getTaskBroker());
                LOG.info("Holder for this task {} is {}", (Object)this.taskId, (Object)holder);
                if (!holder.isEmpty()) {
                    this.failTask(holder.getAggregatedException(failedForkIds, this.taskId));
                } else {
                    this.failTask(new ForkException("Fork branches " + failedForkIds + " failed for task " + this.taskId));
                }
            }
            this.addConstructsFinalStateToTaskState(this.extractor, this.converter, this.rowChecker);
            this.taskState.setProp("writer.records.written", this.getRecordsWritten());
            this.taskState.setProp("writer.bytes.written", this.getBytesWritten());
            this.submitTaskCommittedEvent();
            try {
                this.closer.close();
            }
            catch (Throwable t) {
                LOG.error("Failed to close all open resources", t);
                if (this.isIgnoreCloseFailures || isTaskFailed) break block41;
                LOG.error("Setting the task state to failed.");
                this.failTask(t);
            }
        }
        for (Map.Entry<Optional<Fork>, Optional<Future<?>>> forkAndFuture : this.forks.entrySet()) {
            if (!forkAndFuture.getKey().isPresent() || !forkAndFuture.getValue().isPresent()) continue;
            try {
                ((Future)forkAndFuture.getValue().get()).cancel(true);
            }
            catch (Throwable t) {
                LOG.error(String.format("Failed to cancel Fork \"%s\"", forkAndFuture.getKey().get()), t);
            }
        }
        try {
            if (!this.shouldPublishDataInTask() || this.taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL) return;
            this.publishTaskData();
            this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
            return;
        }
        catch (IOException ioe) {
            this.failTask(ioe);
            return;
        }
        finally {
            long endTime = System.currentTimeMillis();
            this.taskState.setEndTime(endTime);
            this.taskState.setTaskDuration(endTime - this.startTime);
            this.taskStateTracker.onTaskCommitCompletion(this);
        }
        catch (Throwable t) {
            block42: {
                this.failTask(t);
                isTaskFailed = true;
                this.addConstructsFinalStateToTaskState(this.extractor, this.converter, this.rowChecker);
                this.taskState.setProp("writer.records.written", this.getRecordsWritten());
                this.taskState.setProp("writer.bytes.written", this.getBytesWritten());
                this.submitTaskCommittedEvent();
                try {
                    this.closer.close();
                }
                catch (Throwable t2) {
                    LOG.error("Failed to close all open resources", t2);
                    if (this.isIgnoreCloseFailures || isTaskFailed) break block42;
                    LOG.error("Setting the task state to failed.");
                    this.failTask(t2);
                }
            }
            for (Map.Entry<Optional<Fork>, Optional<Future<?>>> forkAndFuture : this.forks.entrySet()) {
                if (!forkAndFuture.getKey().isPresent() || !forkAndFuture.getValue().isPresent()) continue;
                try {
                    ((Future)forkAndFuture.getValue().get()).cancel(true);
                }
                catch (Throwable t3) {
                    LOG.error(String.format("Failed to cancel Fork \"%s\"", forkAndFuture.getKey().get()), t3);
                }
            }
            try {
                if (!this.shouldPublishDataInTask() || this.taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL) return;
                this.publishTaskData();
                this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                return;
            }
            catch (IOException ioe) {
                this.failTask(ioe);
                return;
            }
            finally {
                long endTime = System.currentTimeMillis();
                this.taskState.setEndTime(endTime);
                this.taskState.setTaskDuration(endTime - this.startTime);
                this.taskStateTracker.onTaskCommitCompletion(this);
            }
            catch (Throwable throwable) {
                block43: {
                    this.addConstructsFinalStateToTaskState(this.extractor, this.converter, this.rowChecker);
                    this.taskState.setProp("writer.records.written", this.getRecordsWritten());
                    this.taskState.setProp("writer.bytes.written", this.getBytesWritten());
                    this.submitTaskCommittedEvent();
                    try {
                        this.closer.close();
                    }
                    catch (Throwable t4) {
                        LOG.error("Failed to close all open resources", t4);
                        if (this.isIgnoreCloseFailures || isTaskFailed) break block43;
                        LOG.error("Setting the task state to failed.");
                        this.failTask(t4);
                    }
                }
                for (Map.Entry<Optional<Fork>, Optional<Future<?>>> forkAndFuture : this.forks.entrySet()) {
                    if (!forkAndFuture.getKey().isPresent() || !forkAndFuture.getValue().isPresent()) continue;
                    try {
                        ((Future)forkAndFuture.getValue().get()).cancel(true);
                    }
                    catch (Throwable t5) {
                        LOG.error(String.format("Failed to cancel Fork \"%s\"", forkAndFuture.getKey().get()), t5);
                    }
                }
                try {
                    if (!this.shouldPublishDataInTask() || this.taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL) throw throwable;
                    this.publishTaskData();
                    this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                    throw throwable;
                }
                catch (IOException ioe) {
                    this.failTask(ioe);
                    throw throwable;
                }
                finally {
                    long endTime = System.currentTimeMillis();
                    this.taskState.setEndTime(endTime);
                    this.taskState.setTaskDuration(endTime - this.startTime);
                    this.taskStateTracker.onTaskCommitCompletion(this);
                }
            }
        }
    }

    protected void submitTaskCommittedEvent() {
        MetricContext taskMetricContext = TaskMetrics.get(this.taskState).getMetricContext();
        EventSubmitter eventSubmitter = new EventSubmitter.Builder(taskMetricContext, "gobblin.runtime.task").build();
        eventSubmitter.submit("taskCommitted", (Map)ImmutableMap.of((Object)"taskId", (Object)this.taskId, (Object)"taskAttemptId", (Object)this.taskState.getTaskAttemptId().or((Object)"")));
    }

    @Override
    public boolean isSpeculativeExecutionSafe() {
        if (this.extractor instanceof SpeculativeAttemptAwareConstruct && !((SpeculativeAttemptAwareConstruct)this.extractor).isSpeculativeAttemptSafe()) {
            return false;
        }
        if (this.converter instanceof SpeculativeAttemptAwareConstruct && !((SpeculativeAttemptAwareConstruct)this.converter).isSpeculativeAttemptSafe()) {
            return false;
        }
        for (Optional<Fork> fork : this.forks.keySet()) {
            if (!fork.isPresent() || ((Fork)fork.get()).isSpeculativeExecutionSafe()) continue;
            return false;
        }
        return true;
    }

    public synchronized void setTaskFuture(Future<?> taskFuture) {
        this.taskFuture = taskFuture;
    }

    @VisibleForTesting
    boolean hasTaskFuture() {
        return this.taskFuture != null;
    }

    public synchronized boolean cancel() {
        if (this.taskFuture != null && this.taskFuture.cancel(true)) {
            this.taskStateTracker.onTaskRunCompletion(this);
            this.completeShutdown();
            return true;
        }
        return false;
    }

    public Task() {
        this.jobId = null;
        this.taskId = null;
        this.taskKey = null;
        this.isIgnoreCloseFailures = false;
        this.taskContext = null;
        this.taskState = null;
        this.taskStateTracker = null;
        this.taskExecutor = null;
        this.countDownLatch = null;
        this.converter = null;
        this.extractor = null;
        this.rowChecker = null;
        this.taskMode = null;
        this.watermarkingStrategy = null;
        this.watermarkManager = null;
        this.watermarkTracker = null;
        this.watermarkStorage = null;
        this.recordStreamProcessors = null;
        this.closer = null;
        this.recordsPulled = null;
        this.shutdownRequested = null;
        this.shutdownLatch = null;
    }
}

