/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskAction;
import org.slf4j.Logger;

class AssignedTasks
implements RestoringTasks {
    private final Logger log;
    private final String taskTypeName;
    private final TaskAction maybeCommitAction;
    private final TaskAction commitAction;
    private Map<TaskId, Task> created = new HashMap<TaskId, Task>();
    private Map<TaskId, Task> suspended = new HashMap<TaskId, Task>();
    private Map<TaskId, Task> restoring = new HashMap<TaskId, Task>();
    private Set<TopicPartition> restoredPartitions = new HashSet<TopicPartition>();
    private Set<TaskId> previousActiveTasks = new HashSet<TaskId>();
    private Map<TaskId, Task> running = new ConcurrentHashMap<TaskId, Task>();
    private Map<TopicPartition, Task> runningByPartition = new HashMap<TopicPartition, Task>();
    private Map<TopicPartition, Task> restoringByPartition = new HashMap<TopicPartition, Task>();
    private int committed = 0;

    AssignedTasks(LogContext logContext, String taskTypeName) {
        this.taskTypeName = taskTypeName;
        this.log = logContext.logger(this.getClass());
        this.maybeCommitAction = new TaskAction(){

            @Override
            public String name() {
                return "maybeCommit";
            }

            @Override
            public void apply(Task task) {
                if (task.commitNeeded()) {
                    AssignedTasks.this.committed++;
                    task.commit();
                    AssignedTasks.this.log.debug("Committed active task {} per user request in", (Object)task.id());
                }
            }
        };
        this.commitAction = new TaskAction(){

            @Override
            public String name() {
                return "commit";
            }

            @Override
            public void apply(Task task) {
                task.commit();
            }
        };
    }

    void addNewTask(Task task) {
        this.created.put(task.id(), task);
    }

    Set<TopicPartition> uninitializedPartitions() {
        if (this.created.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        for (Map.Entry<TaskId, Task> entry : this.created.entrySet()) {
            if (!entry.getValue().hasStateStores()) continue;
            partitions.addAll(entry.getValue().partitions());
        }
        return partitions;
    }

    Set<TopicPartition> initializeNewTasks() {
        HashSet<TopicPartition> readyPartitions = new HashSet<TopicPartition>();
        if (!this.created.isEmpty()) {
            this.log.debug("Initializing {}s {}", (Object)this.taskTypeName, this.created.keySet());
        }
        Iterator<Map.Entry<TaskId, Task>> it = this.created.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, Task> entry = it.next();
            try {
                if (!entry.getValue().initializeStateStores()) {
                    this.log.debug("Transitioning {} {} to restoring", (Object)this.taskTypeName, (Object)entry.getKey());
                    this.addToRestoring(entry.getValue());
                } else {
                    this.transitionToRunning(entry.getValue(), readyPartitions);
                }
                it.remove();
            }
            catch (LockException e) {
                this.log.trace("Could not create {} {} due to {}; will retry", new Object[]{this.taskTypeName, entry.getKey(), e.getMessage()});
            }
        }
        return readyPartitions;
    }

    Set<TopicPartition> updateRestored(Collection<TopicPartition> restored) {
        if (restored.isEmpty()) {
            return Collections.emptySet();
        }
        this.log.trace("{} changelog partitions that have completed restoring so far: {}", (Object)this.taskTypeName, restored);
        HashSet<TopicPartition> resume = new HashSet<TopicPartition>();
        this.restoredPartitions.addAll(restored);
        Iterator<Map.Entry<TaskId, Task>> it = this.restoring.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, Task> entry = it.next();
            Task task = entry.getValue();
            if (this.restoredPartitions.containsAll(task.changelogPartitions())) {
                this.transitionToRunning(task, resume);
                it.remove();
                this.log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state", new Object[]{this.taskTypeName, task.id(), task.changelogPartitions()});
                continue;
            }
            if (!this.log.isTraceEnabled()) continue;
            HashSet<TopicPartition> outstandingPartitions = new HashSet<TopicPartition>(task.changelogPartitions());
            outstandingPartitions.removeAll(this.restoredPartitions);
            this.log.trace("{} {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}", new Object[]{this.taskTypeName, task.id(), outstandingPartitions});
        }
        if (this.allTasksRunning()) {
            this.restoredPartitions.clear();
        }
        return resume;
    }

    boolean allTasksRunning() {
        return this.created.isEmpty() && this.suspended.isEmpty() && this.restoring.isEmpty();
    }

    Collection<Task> running() {
        return this.running.values();
    }

    RuntimeException suspend() {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        this.log.trace("Suspending running {} {}", (Object)this.taskTypeName, this.runningTaskIds());
        firstException.compareAndSet(null, this.suspendTasks(this.running.values()));
        this.log.trace("Close restoring {} {}", (Object)this.taskTypeName, this.restoring.keySet());
        firstException.compareAndSet(null, this.closeNonRunningTasks(this.restoring.values()));
        this.log.trace("Close created {} {}", (Object)this.taskTypeName, this.created.keySet());
        firstException.compareAndSet(null, this.closeNonRunningTasks(this.created.values()));
        this.previousActiveTasks.clear();
        this.previousActiveTasks.addAll(this.running.keySet());
        this.running.clear();
        this.restoring.clear();
        this.created.clear();
        this.runningByPartition.clear();
        this.restoringByPartition.clear();
        return firstException.get();
    }

    private RuntimeException closeNonRunningTasks(Collection<Task> tasks) {
        RuntimeException exception = null;
        for (Task task : tasks) {
            try {
                task.close(false, false);
            }
            catch (RuntimeException e) {
                this.log.error("Failed to close {}, {}", new Object[]{this.taskTypeName, task.id(), e});
                if (exception != null) continue;
                exception = e;
            }
        }
        return exception;
    }

    private RuntimeException suspendTasks(Collection<Task> tasks) {
        RuntimeException exception = null;
        Iterator<Task> it = tasks.iterator();
        while (it.hasNext()) {
            Task task = it.next();
            try {
                task.suspend();
                this.suspended.put(task.id(), task);
            }
            catch (TaskMigratedException closeAsZombieAndSwallow) {
                this.closeZombieTask(task);
                it.remove();
            }
            catch (RuntimeException e) {
                this.log.error("Suspending {} {} failed due to the following error:", new Object[]{this.taskTypeName, task.id(), e});
                try {
                    task.close(false, false);
                }
                catch (Exception f) {
                    this.log.error("After suspending failed, closing the same {} {} failed again due to the following error:", new Object[]{this.taskTypeName, task.id(), f});
                }
                if (exception != null) continue;
                exception = e;
            }
        }
        return exception;
    }

    private void closeZombieTask(Task task) {
        this.log.warn("{} {} got migrated to another thread already. Closing it as zombie.", (Object)this.taskTypeName, (Object)task.id());
        try {
            task.close(false, true);
        }
        catch (Exception e) {
            this.log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", new Object[]{this.taskTypeName, task.id(), e.getMessage()});
        }
    }

    boolean hasRunningTasks() {
        return !this.running.isEmpty();
    }

    boolean maybeResumeSuspendedTask(TaskId taskId, Set<TopicPartition> partitions) {
        if (this.suspended.containsKey(taskId)) {
            Task task = this.suspended.get(taskId);
            this.log.trace("found suspended {} {}", (Object)this.taskTypeName, (Object)taskId);
            if (task.partitions().equals(partitions)) {
                this.suspended.remove(taskId);
                try {
                    task.resume();
                }
                catch (TaskMigratedException e) {
                    this.closeZombieTask(task);
                    this.suspended.remove(taskId);
                    throw e;
                }
                this.transitionToRunning(task, new HashSet<TopicPartition>());
                this.log.trace("resuming suspended {} {}", (Object)this.taskTypeName, (Object)task.id());
                return true;
            }
            this.log.warn("couldn't resume task {} assigned partitions {}, task partitions {}", new Object[]{taskId, partitions, task.partitions()});
        }
        return false;
    }

    private void addToRestoring(Task task) {
        this.restoring.put(task.id(), task);
        for (TopicPartition topicPartition : task.partitions()) {
            this.restoringByPartition.put(topicPartition, task);
        }
        for (TopicPartition topicPartition : task.changelogPartitions()) {
            this.restoringByPartition.put(topicPartition, task);
        }
    }

    private void transitionToRunning(Task task, Set<TopicPartition> readyPartitions) {
        this.log.debug("transitioning {} {} to running", (Object)this.taskTypeName, (Object)task.id());
        this.running.put(task.id(), task);
        task.initializeTopology();
        for (TopicPartition topicPartition : task.partitions()) {
            this.runningByPartition.put(topicPartition, task);
            if (!task.hasStateStores()) continue;
            readyPartitions.add(topicPartition);
        }
        for (TopicPartition topicPartition : task.changelogPartitions()) {
            this.runningByPartition.put(topicPartition, task);
        }
    }

    @Override
    public Task restoringTaskFor(TopicPartition partition) {
        return this.restoringByPartition.get(partition);
    }

    Task runningTaskFor(TopicPartition partition) {
        return this.runningByPartition.get(partition);
    }

    Set<TaskId> runningTaskIds() {
        return this.running.keySet();
    }

    Map<TaskId, Task> runningTaskMap() {
        return Collections.unmodifiableMap(this.running);
    }

    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        this.describe(builder, this.running.values(), indent, "Running:");
        this.describe(builder, this.suspended.values(), indent, "Suspended:");
        this.describe(builder, this.restoring.values(), indent, "Restoring:");
        this.describe(builder, this.created.values(), indent, "New:");
        return builder.toString();
    }

    private void describe(StringBuilder builder, Collection<Task> tasks, String indent, String name) {
        builder.append(indent).append(name);
        for (Task t : tasks) {
            builder.append(indent).append(t.toString(indent + "\t\t"));
        }
        builder.append("\n");
    }

    private List<Task> allTasks() {
        ArrayList<Task> tasks = new ArrayList<Task>();
        tasks.addAll(this.running.values());
        tasks.addAll(this.suspended.values());
        tasks.addAll(this.restoring.values());
        tasks.addAll(this.created.values());
        return tasks;
    }

    Collection<Task> restoringTasks() {
        return Collections.unmodifiableCollection(this.restoring.values());
    }

    Set<TaskId> allAssignedTaskIds() {
        HashSet<TaskId> taskIds = new HashSet<TaskId>();
        taskIds.addAll(this.running.keySet());
        taskIds.addAll(this.suspended.keySet());
        taskIds.addAll(this.restoring.keySet());
        taskIds.addAll(this.created.keySet());
        return taskIds;
    }

    void clear() {
        this.runningByPartition.clear();
        this.restoringByPartition.clear();
        this.running.clear();
        this.created.clear();
        this.suspended.clear();
        this.restoredPartitions.clear();
    }

    Set<TaskId> previousTaskIds() {
        return this.previousActiveTasks;
    }

    int commit() {
        this.applyToRunningTasks(this.commitAction);
        return this.running.size();
    }

    int maybeCommit() {
        this.committed = 0;
        this.applyToRunningTasks(this.maybeCommitAction);
        return this.committed;
    }

    int process() {
        int processed = 0;
        Iterator<Map.Entry<TaskId, Task>> it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            Task task = it.next().getValue();
            try {
                if (!task.process()) continue;
                ++processed;
            }
            catch (TaskMigratedException e) {
                this.closeZombieTask(task);
                it.remove();
                throw e;
            }
            catch (RuntimeException e) {
                this.log.error("Failed to process {} {} due to the following error:", new Object[]{this.taskTypeName, task.id(), e});
                throw e;
            }
        }
        return processed;
    }

    int punctuate() {
        int punctuated = 0;
        Iterator<Map.Entry<TaskId, Task>> it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            Task task = it.next().getValue();
            try {
                if (task.maybePunctuateStreamTime()) {
                    ++punctuated;
                }
                if (!task.maybePunctuateSystemTime()) continue;
                ++punctuated;
            }
            catch (TaskMigratedException e) {
                this.closeZombieTask(task);
                it.remove();
                throw e;
            }
            catch (KafkaException e) {
                this.log.error("Failed to punctuate {} {} due to the following error:", new Object[]{this.taskTypeName, task.id(), e});
                throw e;
            }
        }
        return punctuated;
    }

    private void applyToRunningTasks(TaskAction action) {
        Object firstException = null;
        Iterator<Task> it = this.running().iterator();
        while (it.hasNext()) {
            Task task = it.next();
            try {
                action.apply(task);
            }
            catch (TaskMigratedException e) {
                this.closeZombieTask(task);
                it.remove();
                if (firstException != null) continue;
                firstException = e;
            }
            catch (RuntimeException t) {
                this.log.error("Failed to {} {} {} due to the following error:", new Object[]{action.name(), this.taskTypeName, task.id(), t});
                if (firstException != null) continue;
                firstException = t;
            }
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeNonAssignedSuspendedTasks(Map<TaskId, Set<TopicPartition>> newAssignment) {
        Iterator<Task> standByTaskIterator = this.suspended.values().iterator();
        while (standByTaskIterator.hasNext()) {
            Task suspendedTask = standByTaskIterator.next();
            if (newAssignment.containsKey(suspendedTask.id()) && suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) continue;
            this.log.debug("Closing suspended and not re-assigned {} {}", (Object)this.taskTypeName, (Object)suspendedTask.id());
            try {
                suspendedTask.closeSuspended(true, false, null);
            }
            catch (Exception e) {
                this.log.error("Failed to remove suspended {} {} due to the following error:", new Object[]{this.taskTypeName, suspendedTask.id(), e});
            }
            finally {
                standByTaskIterator.remove();
            }
        }
    }

    void close(boolean clean) {
        this.close(this.allTasks(), clean);
        this.clear();
    }

    private void close(Collection<Task> tasks, boolean clean) {
        for (Task task : tasks) {
            try {
                task.close(clean, false);
            }
            catch (Throwable t) {
                this.log.error("Failed while closing {} {} due to the following error:", new Object[]{task.getClass().getSimpleName(), task.id(), t});
            }
        }
    }
}

