package org.apache.kafka.streams.processor.internals.tasks;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.internals.ProcessingThread;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.class */
public class DefaultTaskExecutor implements TaskExecutor {
    private final Time time;
    private final String name;
    private final TaskManager taskManager;
    private final TaskExecutionMetadata taskExecutionMetadata;
    private final Logger log;
    private StreamTask currentTask = null;
    private TaskExecutorThread taskExecutorThread = null;
    private CountDownLatch shutdownGate;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor$TaskExecutorThread.class */
    private class TaskExecutorThread extends Thread implements ProcessingThread {
        private final AtomicBoolean shutdownRequested;
        private final AtomicReference<KafkaFutureImpl<StreamTask>> taskReleaseRequested;
        private final Logger log;

        public TaskExecutorThread(String str) {
            super(str);
            this.shutdownRequested = new AtomicBoolean(false);
            this.taskReleaseRequested = new AtomicReference<>(null);
            this.log = new LogContext(String.format("%s ", str)).logger(DefaultTaskExecutor.class);
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.log.info("Task executor thread started");
            while (!this.shutdownRequested.get()) {
                try {
                    try {
                        runOnce(DefaultTaskExecutor.this.time.milliseconds());
                    } catch (Exception e) {
                        handleException(new StreamsException(e));
                    } catch (StreamsException e2) {
                        handleException(e2);
                    }
                } catch (Throwable th) {
                    if (DefaultTaskExecutor.this.currentTask != null) {
                        this.log.debug("Releasing task {} due to shutdown.", DefaultTaskExecutor.this.currentTask.id());
                        unassignCurrentTask();
                    }
                    DefaultTaskExecutor.this.shutdownGate.countDown();
                    KafkaFutureImpl<StreamTask> andSet = this.taskReleaseRequested.getAndSet(null);
                    if (andSet != null) {
                        this.log.debug("Asked to return current task, but shutting down.");
                        andSet.complete((Object) null);
                    }
                    this.log.info("Task executor thread shutdown");
                    throw th;
                }
            }
            if (DefaultTaskExecutor.this.currentTask != null) {
                this.log.debug("Releasing task {} due to shutdown.", DefaultTaskExecutor.this.currentTask.id());
                unassignCurrentTask();
            }
            DefaultTaskExecutor.this.shutdownGate.countDown();
            KafkaFutureImpl<StreamTask> andSet2 = this.taskReleaseRequested.getAndSet(null);
            if (andSet2 != null) {
                this.log.debug("Asked to return current task, but shutting down.");
                andSet2.complete((Object) null);
            }
            this.log.info("Task executor thread shutdown");
        }

        private void handleTaskReleaseRequested() {
            KafkaFutureImpl<StreamTask> andSet = this.taskReleaseRequested.getAndSet(null);
            if (andSet != null) {
                if (DefaultTaskExecutor.this.currentTask != null) {
                    this.log.debug("Releasing task {} upon request.", DefaultTaskExecutor.this.currentTask.id());
                    andSet.complete(unassignCurrentTask());
                } else {
                    this.log.debug("Asked to return current task, but returned current task already.");
                    andSet.complete((Object) null);
                }
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void handleException(StreamsException streamsException) {
            if (DefaultTaskExecutor.this.currentTask == null) {
                throw streamsException;
            }
            DefaultTaskExecutor.this.taskManager.setUncaughtException(streamsException, DefaultTaskExecutor.this.currentTask.id());
            this.log.debug("Releasing task {} due to uncaught exception.", DefaultTaskExecutor.this.currentTask.id());
            unassignCurrentTask();
        }

        private void runOnce(long j) {
            handleTaskReleaseRequested();
            if (DefaultTaskExecutor.this.currentTask == null) {
                DefaultTaskExecutor.this.currentTask = DefaultTaskExecutor.this.taskManager.assignNextTask(DefaultTaskExecutor.this);
            }
            if (DefaultTaskExecutor.this.currentTask == null) {
                try {
                    DefaultTaskExecutor.this.taskManager.awaitProcessableTasks();
                    return;
                } catch (InterruptedException e) {
                    return;
                }
            }
            boolean z = false;
            if (DefaultTaskExecutor.this.taskExecutionMetadata.canProcessTask(DefaultTaskExecutor.this.currentTask, j) && DefaultTaskExecutor.this.currentTask.isProcessable(j) && processTask(DefaultTaskExecutor.this.currentTask, j, DefaultTaskExecutor.this.time)) {
                this.log.trace("processed a record for {}", DefaultTaskExecutor.this.currentTask.id());
                z = true;
            }
            if (DefaultTaskExecutor.this.taskExecutionMetadata.canPunctuateTask(DefaultTaskExecutor.this.currentTask)) {
                if (DefaultTaskExecutor.this.currentTask.maybePunctuateStreamTime()) {
                    this.log.trace("punctuated stream time for task {} ", DefaultTaskExecutor.this.currentTask.id());
                    z = true;
                }
                if (DefaultTaskExecutor.this.currentTask.maybePunctuateSystemTime()) {
                    this.log.trace("punctuated system time for task {} ", DefaultTaskExecutor.this.currentTask.id());
                    z = true;
                }
            }
            if (z) {
                return;
            }
            this.log.debug("Releasing task {} because we are not making progress.", DefaultTaskExecutor.this.currentTask.id());
            unassignCurrentTask();
        }

        /* JADX WARN: Type inference failed for: r13v2, types: [java.lang.Throwable, org.apache.kafka.streams.errors.StreamsException] */
        private boolean processTask(Task task, long j, Time time) {
            boolean z = false;
            try {
                try {
                    try {
                        z = task.process(j);
                        if (z) {
                            this.log.trace("Successfully processed task {}", task.id());
                            task.clearTaskTimeout();
                            if (DefaultTaskExecutor.this.taskExecutionMetadata.hasNamedTopologies() && DefaultTaskExecutor.this.taskExecutionMetadata.processingMode() != StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
                                DefaultTaskExecutor.this.taskExecutionMetadata.addToSuccessfullyProcessed(task);
                            }
                        }
                        task.recordProcessBatchTime(time.milliseconds() - j);
                    } catch (TaskMigratedException e) {
                        this.log.info("Failed to process stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", task.id());
                        throw e;
                    } catch (StreamsException e2) {
                        this.log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), (Throwable) e2);
                        e2.setTaskId(task.id());
                        throw e2;
                    }
                } catch (RuntimeException e3) {
                    this.log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e3);
                    throw new StreamsException(e3, task.id());
                } catch (TimeoutException e4) {
                    task.maybeInitTaskTimeoutOrThrow(j, e4);
                    this.log.error(String.format("Could not complete processing records for %s due to the following exception; will move to next task and retry later", task.id()), e4);
                    task.recordProcessBatchTime(time.milliseconds() - j);
                }
                return z;
            } catch (Throwable th) {
                task.recordProcessBatchTime(time.milliseconds() - j);
                throw th;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r8v0, types: [java.lang.Throwable, org.apache.kafka.streams.errors.StreamsException] */
        private StreamTask unassignCurrentTask() {
            if (DefaultTaskExecutor.this.currentTask == null) {
                throw new IllegalStateException("Does not own any task while being ask to unassign from task manager");
            }
            if (!DefaultTaskExecutor.this.taskManager.hasUncaughtException(DefaultTaskExecutor.this.currentTask.id())) {
                try {
                    DefaultTaskExecutor.this.currentTask.flush();
                } catch (RuntimeException e) {
                    this.log.error(String.format("Failed to flush stream task %s due to the following error:", DefaultTaskExecutor.this.currentTask.id()), e);
                    DefaultTaskExecutor.this.taskManager.setUncaughtException(new StreamsException(e, DefaultTaskExecutor.this.currentTask.id()), DefaultTaskExecutor.this.currentTask.id());
                } catch (StreamsException e2) {
                    this.log.error(String.format("Failed to flush stream task %s due to the following error:", DefaultTaskExecutor.this.currentTask.id()), (Throwable) e2);
                    e2.setTaskId(DefaultTaskExecutor.this.currentTask.id());
                    DefaultTaskExecutor.this.taskManager.setUncaughtException(e2, DefaultTaskExecutor.this.currentTask.id());
                }
            }
            DefaultTaskExecutor.this.taskManager.unassignTask(DefaultTaskExecutor.this.currentTask, DefaultTaskExecutor.this);
            StreamTask streamTask = DefaultTaskExecutor.this.currentTask;
            DefaultTaskExecutor.this.currentTask = null;
            return streamTask;
        }
    }

    public DefaultTaskExecutor(TaskManager taskManager, String str, Time time, TaskExecutionMetadata taskExecutionMetadata) {
        this.time = time;
        this.name = str;
        this.taskManager = taskManager;
        this.taskExecutionMetadata = taskExecutionMetadata;
        this.log = new LogContext(str).logger(DefaultTaskExecutor.class);
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public void start() {
        if (this.taskExecutorThread == null) {
            this.taskExecutorThread = new TaskExecutorThread(this.name);
            this.taskExecutorThread.start();
            this.shutdownGate = new CountDownLatch(1);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public boolean isRunning() {
        return (this.taskExecutorThread == null || !this.taskExecutorThread.isAlive() || this.shutdownGate.getCount() == 0) ? false : true;
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public void requestShutdown() {
        if (this.taskExecutorThread != null) {
            this.taskExecutorThread.shutdownRequested.set(true);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public void awaitShutdown(Duration duration) {
        if (this.taskExecutorThread != null) {
            try {
                if (!this.shutdownGate.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                    throw new StreamsException("State updater thread did not shutdown within the timeout");
                }
                this.taskExecutorThread = null;
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public ReadOnlyTask currentTask() {
        if (this.currentTask != null) {
            return new ReadOnlyTask(this.currentTask);
        }
        return null;
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public KafkaFuture<StreamTask> unassign() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        if (this.taskExecutorThread != null) {
            this.log.debug("Asking {} to hand back task", this.taskExecutorThread.getName());
            if (!this.taskExecutorThread.taskReleaseRequested.compareAndSet(null, kafkaFutureImpl)) {
                throw new IllegalStateException("There was already a task release request registered");
            }
            if (this.shutdownGate.getCount() == 0) {
                this.log.debug("Completing future, because task executor was just shut down");
                kafkaFutureImpl.complete((Object) null);
            } else {
                this.taskManager.signalTaskExecutors();
            }
        } else {
            this.log.debug("Tried to unassign but no thread is running");
            kafkaFutureImpl.complete((Object) null);
        }
        return kafkaFutureImpl;
    }
}
