/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

class StandbyTaskCreator {
    private final InternalTopologyBuilder builder;
    private final StreamsConfig config;
    private final StreamsMetricsImpl streamsMetrics;
    private final StateDirectory stateDirectory;
    private final ChangelogReader storeChangelogReader;
    private final ThreadCache dummyCache;
    private final Logger log;
    private final Sensor createTaskSensor;

    StandbyTaskCreator(InternalTopologyBuilder builder, StreamsConfig config, StreamsMetricsImpl streamsMetrics, StateDirectory stateDirectory, ChangelogReader storeChangelogReader, String threadId, Logger log) {
        this.builder = builder;
        this.config = config;
        this.streamsMetrics = streamsMetrics;
        this.stateDirectory = stateDirectory;
        this.storeChangelogReader = storeChangelogReader;
        this.log = log;
        this.createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
        this.dummyCache = new ThreadCache(new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), 0L, streamsMetrics);
    }

    Collection<Task> createTasks(Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
        ArrayList<Task> createdTasks = new ArrayList<Task>();
        for (Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
            TaskId taskId = newTaskAndPartitions.getKey();
            Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
            ProcessorTopology topology = this.builder.buildSubtopology(taskId.topicGroupId);
            if (topology.hasStateWithChangelogs()) {
                ProcessorStateManager stateManager = new ProcessorStateManager(taskId, Task.TaskType.STANDBY, StreamThread.eosEnabled(this.config), this.getLogContext(taskId), this.stateDirectory, this.storeChangelogReader, topology.storeToChangelogTopic(), partitions);
                ProcessorContextImpl context = new ProcessorContextImpl(taskId, this.config, stateManager, this.streamsMetrics, this.dummyCache);
                createdTasks.add(this.createStandbyTask(taskId, partitions, topology, stateManager, context));
                continue;
            }
            this.log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", (Object)taskId, partitions);
        }
        return createdTasks;
    }

    StandbyTask createStandbyTaskFromActive(StreamTask streamTask, Set<TopicPartition> partitions) {
        InternalProcessorContext context = streamTask.processorContext();
        ProcessorStateManager stateManager = streamTask.stateMgr;
        streamTask.closeCleanAndRecycleState();
        stateManager.transitionTaskType(Task.TaskType.STANDBY, this.getLogContext(streamTask.id()));
        return this.createStandbyTask(streamTask.id(), partitions, this.builder.buildSubtopology(streamTask.id.topicGroupId), stateManager, context);
    }

    StandbyTask createStandbyTask(TaskId taskId, Set<TopicPartition> partitions, ProcessorTopology topology, ProcessorStateManager stateManager, InternalProcessorContext context) {
        StandbyTask task = new StandbyTask(taskId, partitions, topology, this.config, this.streamsMetrics, stateManager, this.stateDirectory, this.dummyCache, context);
        this.log.trace("Created task {} with assigned partitions {}", (Object)taskId, partitions);
        this.createTaskSensor.record();
        return task;
    }

    public InternalTopologyBuilder builder() {
        return this.builder;
    }

    public StateDirectory stateDirectory() {
        return this.stateDirectory;
    }

    private LogContext getLogContext(TaskId taskId) {
        String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
        String logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", taskId);
        return new LogContext(logPrefix);
    }
}

