/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.commons.pipe.agent.task.subtask;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PipeAbstractConnectorSubtask
extends PipeReportableSubtask {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeAbstractConnectorSubtask.class);
    protected PipeConnector outputPipeConnector;
    protected ExecutorService subtaskCallbackListeningExecutor;
    protected volatile boolean isSubmitted = false;
    protected volatile Event lastExceptionEvent;

    protected PipeAbstractConnectorSubtask(String taskID, long creationTime, PipeConnector outputPipeConnector) {
        super(taskID, creationTime);
        this.outputPipeConnector = outputPipeConnector;
    }

    @Override
    public void bindExecutors(ListeningExecutorService subtaskWorkerThreadPoolExecutor, ExecutorService subtaskCallbackListeningExecutor, PipeSubtaskScheduler subtaskScheduler) {
        this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
        this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
        this.subtaskScheduler = subtaskScheduler;
    }

    @Override
    public synchronized void onSuccess(Boolean hasAtLeastOneEventProcessed) {
        this.isSubmitted = false;
        super.onSuccess(hasAtLeastOneEventProcessed);
    }

    @Override
    public synchronized void onFailure(Throwable throwable) {
        this.isSubmitted = false;
        if (this.isClosed.get()) {
            LOGGER.info("onFailure in pipe transfer, ignored because the connector subtask is dropped.", throwable);
            this.clearReferenceCountAndReleaseLastEvent(null);
            return;
        }
        if (this.lastExceptionEvent instanceof EnrichedEvent && ((EnrichedEvent)this.lastExceptionEvent).isReleased()) {
            LOGGER.info("onFailure in pipe transfer, ignored because the failure event is released.", throwable);
            this.submitSelf();
            return;
        }
        if (this.lastEvent != this.lastExceptionEvent) {
            LOGGER.info("onFailure in pipe transfer, ignored because the failure event's pipe is dropped.", throwable);
            this.clearReferenceCountAndReleaseLastExceptionEvent();
            this.submitSelf();
            return;
        }
        if (throwable instanceof PipeConnectionException && this.onPipeConnectionException(throwable)) {
            return;
        }
        if (throwable instanceof PipeRuntimeConnectorCriticalException || throwable instanceof PipeConsensusRetryWithIncreasingIntervalException) {
            super.onFailure(throwable);
        } else {
            LOGGER.warn("A non PipeRuntimeConnectorCriticalException occurred, will throw a PipeRuntimeConnectorCriticalException.", throwable);
            super.onFailure((Throwable)((Object)new PipeRuntimeConnectorCriticalException(throwable.getMessage())));
        }
    }

    private boolean onPipeConnectionException(Throwable throwable) {
        LOGGER.warn("PipeConnectionException occurred, {} retries to handshake with the target system.", (Object)this.outputPipeConnector.getClass().getName(), (Object)throwable);
        int retry = 0;
        while (retry < 5) {
            try {
                this.outputPipeConnector.handshake();
                LOGGER.info("{} handshakes with the target system successfully.", (Object)this.outputPipeConnector.getClass().getName());
                break;
            }
            catch (Exception e) {
                LOGGER.warn("{} failed to handshake with the target system for {} times, will retry at most {} times.", new Object[]{this.outputPipeConnector.getClass().getName(), ++retry, 5, e});
                try {
                    Thread.sleep((long)retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
                }
                catch (InterruptedException interruptedException) {
                    LOGGER.info("Interrupted while sleeping, will retry to handshake with the target system.", (Throwable)interruptedException);
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (retry == 5 && this.lastEvent instanceof EnrichedEvent) {
            this.report((EnrichedEvent)this.lastEvent, new PipeRuntimeConnectorCriticalException(throwable.getMessage() + ", root cause: " + this.getRootCause(throwable)));
            LOGGER.warn("{} failed to handshake with the target system after {} times, stopping current subtask {} (creation time: {}, simple class: {}). Status shown when query the pipe will be 'STOPPED'. Please restart the task by executing 'START PIPE' manually if needed.", new Object[]{this.outputPipeConnector.getClass().getName(), 5, this.taskID, this.creationTime, this.getClass().getSimpleName(), throwable});
            return true;
        }
        return false;
    }

    @Override
    public synchronized void submitSelf() {
        if (this.shouldStopSubmittingSelf.get() || this.isSubmitted) {
            return;
        }
        ListenableFuture nextFuture = this.subtaskWorkerThreadPoolExecutor.submit((Callable)this);
        this.registerCallbackHookAfterSubmit((ListenableFuture<Boolean>)nextFuture);
        this.isSubmitted = true;
    }

    protected void registerCallbackHookAfterSubmit(ListenableFuture<Boolean> future) {
        Futures.addCallback(future, (FutureCallback)this, (Executor)this.subtaskCallbackListeningExecutor);
    }

    protected synchronized void setLastExceptionEvent(Event event) {
        this.lastExceptionEvent = event;
    }

    protected synchronized void clearReferenceCountAndReleaseLastExceptionEvent() {
        if (this.lastExceptionEvent != null) {
            if (this.lastExceptionEvent instanceof EnrichedEvent && !((EnrichedEvent)this.lastExceptionEvent).isReleased()) {
                ((EnrichedEvent)this.lastExceptionEvent).clearReferenceCount(PipeSubtask.class.getName());
            }
            this.lastExceptionEvent = null;
        }
    }
}

