/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.types.Either;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteChannelStateChecker {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteChannelStateChecker.class);
    private final ResultPartitionID resultPartitionId;
    private final String taskNameWithSubtask;

    public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, String taskNameWithSubtask) {
        this.resultPartitionId = resultPartitionId;
        this.taskNameWithSubtask = taskNameWithSubtask;
    }

    public boolean isProducerReadyOrAbortConsumption(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        Either<ExecutionState, Throwable> result = responseHandle.getProducerExecutionState();
        ExecutionState consumerExecutionState = responseHandle.getConsumerExecutionState();
        if (!RemoteChannelStateChecker.isConsumerStateValidForConsumption(consumerExecutionState)) {
            LOG.debug("Ignore a partition producer state notification for task {}, because it's not running.", (Object)this.taskNameWithSubtask);
        } else if (result.isLeft() || result.right() instanceof TimeoutException) {
            boolean isProducerConsumerReady = this.isProducerConsumerReady(responseHandle);
            if (isProducerConsumerReady) {
                return true;
            }
            this.abortConsumptionOrIgnoreCheckResult(responseHandle);
        } else {
            this.handleFailedCheckResult(responseHandle);
        }
        return false;
    }

    private static boolean isConsumerStateValidForConsumption(ExecutionState consumerExecutionState) {
        return consumerExecutionState == ExecutionState.RUNNING || consumerExecutionState == ExecutionState.DEPLOYING;
    }

    private boolean isProducerConsumerReady(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        ExecutionState producerState = RemoteChannelStateChecker.getProducerState(responseHandle);
        return producerState == ExecutionState.SCHEDULED || producerState == ExecutionState.DEPLOYING || producerState == ExecutionState.RUNNING || producerState == ExecutionState.FINISHED;
    }

    private void abortConsumptionOrIgnoreCheckResult(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        ExecutionState producerState = RemoteChannelStateChecker.getProducerState(responseHandle);
        if (producerState == ExecutionState.CANCELING || producerState == ExecutionState.CANCELED || producerState == ExecutionState.FAILED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Cancelling task {} after the producer of partition {} with attempt ID {} has entered state {}.", new Object[]{this.taskNameWithSubtask, this.resultPartitionId.getPartitionId(), this.resultPartitionId.getProducerId(), producerState});
            }
            responseHandle.cancelConsumption();
        } else {
            String msg = String.format("Producer with attempt ID %s of partition %s in unexpected state %s.", new Object[]{this.resultPartitionId.getProducerId(), this.resultPartitionId.getPartitionId(), producerState});
            responseHandle.failConsumption(new IllegalStateException(msg));
        }
    }

    private static ExecutionState getProducerState(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        Either<ExecutionState, Throwable> result = responseHandle.getProducerExecutionState();
        return result.isLeft() ? (ExecutionState)((Object)result.left()) : ExecutionState.RUNNING;
    }

    private void handleFailedCheckResult(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        Throwable throwable = (Throwable)responseHandle.getProducerExecutionState().right();
        if (throwable instanceof PartitionProducerDisposedException) {
            String msg = String.format("Producer %s of partition %s disposed. Cancelling execution.", new Object[]{this.resultPartitionId.getProducerId(), this.resultPartitionId.getPartitionId()});
            LOG.info(msg, throwable);
            responseHandle.cancelConsumption();
        } else {
            responseHandle.failConsumption(throwable);
        }
    }
}

