/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.PunctuationQueue;
import org.apache.kafka.streams.processor.internals.PunctuationSchedule;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

public class StreamTask
extends AbstractTask
implements ProcessorNodePunctuator,
Task {
    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord("__null_topic__", -1, -1L, null, null);
    static final byte LATEST_MAGIC_BYTE = 1;
    private final Time time;
    private final Logger log;
    private final String logPrefix;
    private final Consumer<byte[], byte[]> mainConsumer;
    private final boolean eosEnabled;
    private final long maxTaskIdleMs;
    private final int maxBufferedSize;
    private final PartitionGroup partitionGroup;
    private final RecordCollector recordCollector;
    private final PartitionGroup.RecordInfo recordInfo;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final PunctuationQueue streamTimePunctuationQueue;
    private final PunctuationQueue systemTimePunctuationQueue;
    private final StreamsMetricsImpl streamsMetrics;
    private long processTimeMs = 0L;
    private final Sensor closeTaskSensor;
    private final Sensor processRatioSensor;
    private final Sensor processLatencySensor;
    private final Sensor punctuateLatencySensor;
    private final Sensor bufferedRecordsSensor;
    private final Sensor enforcedProcessingSensor;
    private final Map<String, Sensor> e2eLatencySensors = new HashMap<String, Sensor>();
    private final InternalProcessorContext processorContext;
    private final RecordQueueCreator recordQueueCreator;
    private long idleStartTimeMs;
    private boolean commitNeeded = false;
    private boolean commitRequested = false;
    private boolean checkpointNeededForSuspended = false;

    public StreamTask(TaskId id, Set<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> mainConsumer, StreamsConfig config, StreamsMetricsImpl streamsMetrics, StateDirectory stateDirectory, ThreadCache cache, Time time, ProcessorStateManager stateMgr, RecordCollector recordCollector, InternalProcessorContext processorContext) {
        super(id, topology, stateDirectory, stateMgr, partitions);
        this.mainConsumer = mainConsumer;
        this.processorContext = processorContext;
        processorContext.transitionToActive(this, recordCollector, cache);
        String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
        this.logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", id);
        LogContext logContext = new LogContext(this.logPrefix);
        this.log = logContext.logger(this.getClass());
        this.time = time;
        this.recordCollector = recordCollector;
        this.eosEnabled = StreamThread.eosEnabled(config);
        String threadId = Thread.currentThread().getName();
        this.streamsMetrics = streamsMetrics;
        this.closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
        String taskId = id.toString();
        if (streamsMetrics.version() == StreamsMetricsImpl.Version.FROM_0100_TO_24) {
            Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
            this.enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
        } else {
            this.enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, new Sensor[0]);
        }
        this.processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
        this.processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
        this.punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
        this.bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
        for (String string : topology.terminalNodes()) {
            this.e2eLatencySensors.put(string, TaskMetrics.e2ELatencySensor(threadId, taskId, string, Sensor.RecordingLevel.INFO, streamsMetrics));
        }
        for (ProcessorNode processorNode : topology.sources()) {
            String sourceNodeName = processorNode.name();
            this.e2eLatencySensors.put(sourceNodeName, TaskMetrics.e2ELatencySensor(threadId, taskId, sourceNodeName, Sensor.RecordingLevel.INFO, streamsMetrics));
        }
        this.streamTimePunctuationQueue = new PunctuationQueue();
        this.systemTimePunctuationQueue = new PunctuationQueue();
        this.maxTaskIdleMs = config.getLong("max.task.idle.ms");
        this.maxBufferedSize = config.getInt("buffered.records.per.partition");
        this.consumedOffsets = new HashMap<TopicPartition, Long>();
        this.recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
        this.recordInfo = new PartitionGroup.RecordInfo();
        this.partitionGroup = new PartitionGroup(this.createPartitionQueues(), TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics));
        stateMgr.registerGlobalStateStores(topology.globalStateStores());
    }

    private Map<TopicPartition, RecordQueue> createPartitionQueues() {
        HashMap<TopicPartition, RecordQueue> partitionQueues = new HashMap<TopicPartition, RecordQueue>();
        for (TopicPartition partition : this.inputPartitions()) {
            partitionQueues.put(partition, this.recordQueueCreator.createQueue(partition));
        }
        return partitionQueues;
    }

    @Override
    public boolean isActive() {
        return true;
    }

    @Override
    public void initializeIfNeeded() {
        if (this.state() == Task.State.CREATED) {
            this.recordCollector.initialize();
            StateManagerUtil.registerStateStores(this.log, this.logPrefix, this.topology, this.stateMgr, this.stateDirectory, this.processorContext);
            this.transitionTo(Task.State.RESTORING);
            this.log.info("Initialized");
        }
    }

    @Override
    public void completeRestoration() {
        switch (this.state()) {
            case RUNNING: {
                return;
            }
            case RESTORING: {
                this.initializeMetadata();
                this.initializeTopology();
                this.processorContext.initialize();
                this.idleStartTimeMs = -1L;
                this.transitionTo(Task.State.RUNNING);
                this.log.info("Restored and ready to run");
                break;
            }
            case CREATED: 
            case SUSPENDED: 
            case CLOSED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while completing restoration for active task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while completing restoration for active task " + this.id);
            }
        }
    }

    @Override
    public void suspend() {
        switch (this.state()) {
            case CREATED: {
                this.log.info("Suspended created");
                this.checkpointNeededForSuspended = false;
                this.transitionTo(Task.State.SUSPENDED);
                break;
            }
            case RESTORING: {
                this.log.info("Suspended restoring");
                this.checkpointNeededForSuspended = true;
                this.transitionTo(Task.State.SUSPENDED);
                break;
            }
            case RUNNING: {
                try {
                    this.closeTopology();
                    this.checkpointNeededForSuspended = true;
                    break;
                }
                finally {
                    this.transitionTo(Task.State.SUSPENDED);
                    this.log.info("Suspended running");
                }
            }
            case SUSPENDED: {
                this.log.info("Skip suspending since state is {}", (Object)this.state());
                break;
            }
            case CLOSED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while suspending active task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while suspending active task " + this.id);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeTopology() {
        this.log.trace("Closing processor topology");
        RuntimeException exception = null;
        for (ProcessorNode<?, ?> node : this.topology.processors()) {
            this.processorContext.setCurrentNode(node);
            try {
                node.close();
            }
            catch (RuntimeException e) {
                exception = e;
            }
            finally {
                this.processorContext.setCurrentNode(null);
            }
        }
        if (exception != null) {
            throw exception;
        }
    }

    @Override
    public void resume() {
        switch (this.state()) {
            case RUNNING: 
            case RESTORING: 
            case CREATED: {
                this.log.trace("Skip resuming since state is {}", (Object)this.state());
                break;
            }
            case SUSPENDED: {
                this.transitionTo(Task.State.RESTORING);
                this.log.info("Resumed to restoring state");
                break;
            }
            case CLOSED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while resuming active task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while resuming active task " + this.id);
            }
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
        switch (this.state()) {
            case RUNNING: 
            case RESTORING: 
            case CREATED: 
            case SUSPENDED: {
                this.stateMgr.flush();
                if (this.commitNeeded) {
                    this.recordCollector.flush();
                    this.log.debug("Prepared {} task for committing", (Object)this.state());
                    offsetsToCommit = this.committableOffsetsAndMetadata();
                    break;
                }
                this.log.debug("Skipped preparing {} task for commit since there is nothing to commit", (Object)this.state());
                offsetsToCommit = Collections.emptyMap();
                break;
            }
            case CLOSED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while preparing active task " + this.id + " for committing");
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while preparing active task " + this.id + " for committing");
            }
        }
        return offsetsToCommit;
    }

    private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
        Map<TopicPartition, OffsetAndMetadata> committableOffsets;
        switch (this.state()) {
            case RESTORING: 
            case CREATED: {
                committableOffsets = Collections.emptyMap();
                break;
            }
            case RUNNING: 
            case SUSPENDED: {
                Map<TopicPartition, Long> partitionTimes = this.extractPartitionTimes();
                committableOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(this.consumedOffsets.size());
                for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
                    TopicPartition partition = entry.getKey();
                    Long offset = this.partitionGroup.headRecordOffset(partition);
                    if (offset == null) {
                        try {
                            offset = this.mainConsumer.position(partition);
                        }
                        catch (TimeoutException error) {
                            throw new IllegalStateException(error);
                        }
                        catch (KafkaException fatal) {
                            throw new StreamsException(fatal);
                        }
                    }
                    long partitionTime = partitionTimes.get(partition);
                    committableOffsets.put(partition, new OffsetAndMetadata(offset.longValue(), StreamTask.encodeTimestamp(partitionTime)));
                }
                break;
            }
            case CLOSED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while getting committable offsets for active task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while post committing active task " + this.id);
            }
        }
        return committableOffsets;
    }

    @Override
    public void postCommit() {
        this.commitRequested = false;
        switch (this.state()) {
            case CREATED: {
                this.log.debug("Skipped writing checkpoint for created task");
                break;
            }
            case RESTORING: {
                this.writeCheckpoint();
                this.log.debug("Finalized commit for restoring task");
                break;
            }
            case RUNNING: {
                if (!this.eosEnabled) {
                    this.writeCheckpoint();
                }
                this.log.debug("Finalized commit for running task");
                break;
            }
            case SUSPENDED: {
                this.partitionGroup.clear();
                if (this.checkpointNeededForSuspended) {
                    this.writeCheckpoint();
                    this.log.debug("Finalized commit for suspended task");
                    this.checkpointNeededForSuspended = false;
                    break;
                }
                this.log.debug("Skipped writing checkpoint for uninitialized suspended task");
                break;
            }
            case CLOSED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while post committing active task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while post committing active task " + this.id);
            }
        }
    }

    private Map<TopicPartition, Long> extractPartitionTimes() {
        HashMap<TopicPartition, Long> partitionTimes = new HashMap<TopicPartition, Long>();
        for (TopicPartition partition : this.partitionGroup.partitions()) {
            partitionTimes.put(partition, this.partitionGroup.partitionTimestamp(partition));
        }
        return partitionTimes;
    }

    @Override
    public void closeClean() {
        this.validateClean();
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        this.close(true);
        this.log.info("Closed clean");
    }

    @Override
    public void closeDirty() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        this.close(false);
        this.log.info("Closed dirty");
    }

    @Override
    public void update(Set<TopicPartition> topicPartitions, Map<String, List<String>> nodeToSourceTopics) {
        super.update(topicPartitions, nodeToSourceTopics);
        this.partitionGroup.updatePartitions(topicPartitions, this.recordQueueCreator::createQueue);
    }

    @Override
    public void closeCleanAndRecycleState() {
        this.validateClean();
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        switch (this.state()) {
            case SUSPENDED: {
                this.stateMgr.recycle();
                this.recordCollector.closeClean();
                break;
            }
            case RUNNING: 
            case RESTORING: 
            case CREATED: 
            case CLOSED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while recycling active task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while recycling active task " + this.id);
            }
        }
        this.closeTaskSensor.record();
        this.transitionTo(Task.State.CLOSED);
        this.log.info("Closed clean and recycled state");
    }

    private void writeCheckpoint() {
        if (this.commitNeeded) {
            this.stateMgr.checkpoint(this.checkpointableOffsets());
        } else {
            this.stateMgr.checkpoint(Collections.emptyMap());
        }
        this.commitNeeded = false;
    }

    private void validateClean() {
        if (this.commitNeeded) {
            this.log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to commit and should close as dirty instead");
            throw new TaskMigratedException("Tried to close dirty task as clean");
        }
    }

    private void close(boolean clean) {
        switch (this.state()) {
            case SUSPENDED: {
                TaskManager.executeAndMaybeSwallow(clean, () -> StateManagerUtil.closeStateManager(this.log, this.logPrefix, clean, this.eosEnabled, this.stateMgr, this.stateDirectory, Task.TaskType.ACTIVE), "state manager close", this.log);
                TaskManager.executeAndMaybeSwallow(clean, clean ? this.recordCollector::closeClean : this.recordCollector::closeDirty, "record collector close", this.log);
                break;
            }
            case CLOSED: {
                this.log.trace("Skip closing since state is {}", (Object)this.state());
                return;
            }
            case RUNNING: 
            case RESTORING: 
            case CREATED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while closing active task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while closing active task " + this.id);
            }
        }
        this.partitionGroup.clear();
        this.closeTaskSensor.record();
        this.transitionTo(Task.State.CLOSED);
    }

    public boolean isProcessable(long wallClockTime) {
        if (this.state() == Task.State.CLOSED) {
            this.log.info("Stream task {} is already in {} state, skip processing it.", (Object)this.id(), (Object)this.state());
            return false;
        }
        if (this.partitionGroup.allPartitionsBuffered()) {
            this.idleStartTimeMs = -1L;
            return true;
        }
        if (this.partitionGroup.numBuffered() > 0) {
            if (this.idleStartTimeMs == -1L) {
                this.idleStartTimeMs = wallClockTime;
            }
            if (wallClockTime - this.idleStartTimeMs >= this.maxTaskIdleMs) {
                this.enforcedProcessingSensor.record(1.0, wallClockTime);
                return true;
            }
            return false;
        }
        this.idleStartTimeMs = -1L;
        return false;
    }

    @Override
    public boolean process(long wallClockTime) {
        if (!this.isProcessable(wallClockTime)) {
            return false;
        }
        StampedRecord record = this.partitionGroup.nextRecord(this.recordInfo, wallClockTime);
        if (record == null) {
            return false;
        }
        try {
            ProcessorNode<?, ?> currNode = this.recordInfo.node();
            TopicPartition partition = this.recordInfo.partition();
            this.log.trace("Start processing one record [{}]", (Object)record);
            this.updateProcessorContext(record, currNode, wallClockTime);
            this.maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
            StreamsMetricsImpl.maybeMeasureLatency(() -> currNode.process(record.key(), record.value()), this.time, this.processLatencySensor);
            this.log.trace("Completed processing one record [{}]", (Object)record);
            this.consumedOffsets.put(partition, record.offset());
            this.commitNeeded = true;
            if (this.recordInfo.queue().size() == this.maxBufferedSize) {
                this.mainConsumer.resume(Collections.singleton(partition));
            }
        }
        catch (StreamsException e) {
            throw e;
        }
        catch (RuntimeException e) {
            String stackTrace = this.getStacktraceString(e);
            throw new StreamsException(String.format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", this.id(), this.processorContext.currentNode().name(), record.topic(), record.partition(), record.offset(), stackTrace), e);
        }
        finally {
            this.processorContext.setCurrentNode(null);
        }
        return true;
    }

    @Override
    public void recordProcessBatchTime(long processBatchTime) {
        this.processTimeMs += processBatchTime;
    }

    @Override
    public void recordProcessTimeRatioAndBufferSize(long allTaskProcessMs, long now) {
        this.bufferedRecordsSensor.record((double)this.partitionGroup.numBuffered());
        this.processRatioSensor.record((double)this.processTimeMs / (double)allTaskProcessMs, now);
        this.processTimeMs = 0L;
    }

    private String getStacktraceString(RuntimeException e) {
        String stacktrace = null;
        try (StringWriter stringWriter = new StringWriter();
             PrintWriter printWriter = new PrintWriter(stringWriter);){
            e.printStackTrace(printWriter);
            stacktrace = stringWriter.toString();
        }
        catch (IOException ioe) {
            this.log.error("Encountered error extracting stacktrace from this exception", (Throwable)ioe);
        }
        return stacktrace;
    }

    @Override
    public void punctuate(ProcessorNode<?, ?> node, long timestamp, PunctuationType type, Punctuator punctuator) {
        if (this.processorContext.currentNode() != null) {
            throw new IllegalStateException(String.format("%sCurrent node is not null", this.logPrefix));
        }
        this.updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node, this.time.milliseconds());
        if (this.log.isTraceEnabled()) {
            this.log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", new Object[]{node.name(), timestamp, type});
        }
        try {
            StreamsMetricsImpl.maybeMeasureLatency(() -> node.punctuate(timestamp, punctuator), this.time, this.punctuateLatencySensor);
        }
        catch (StreamsException e) {
            throw e;
        }
        catch (RuntimeException e) {
            throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", this.logPrefix, node.name()), e);
        }
        finally {
            this.processorContext.setCurrentNode(null);
        }
    }

    private void updateProcessorContext(StampedRecord record, ProcessorNode<?, ?> currNode, long wallClockTime) {
        this.processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic(), record.headers()));
        this.processorContext.setCurrentNode(currNode);
        this.processorContext.setSystemTimeMs(wallClockTime);
    }

    private Map<TopicPartition, Long> checkpointableOffsets() {
        HashMap<TopicPartition, Long> checkpointableOffsets = new HashMap<TopicPartition, Long>(this.recordCollector.offsets());
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
        }
        return checkpointableOffsets;
    }

    private void initializeMetadata() {
        try {
            Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = this.mainConsumer.committed(this.inputPartitions()).entrySet().stream().filter(e -> e.getValue() != null).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            this.initializeTaskTime(offsetsAndMetadata);
        }
        catch (TimeoutException e2) {
            this.log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop.\nConsider overwriting consumer config {} to a larger value to avoid timeout errors", (Object)e2.toString(), (Object)"default.api.timeout.ms");
            throw e2;
        }
        catch (KafkaException e3) {
            throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", this.id, this.inputPartitions()), e3);
        }
    }

    private void initializeTaskTime(Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
            TopicPartition partition = entry.getKey();
            OffsetAndMetadata metadata = entry.getValue();
            if (metadata != null) {
                long committedTimestamp = this.decodeTimestamp(metadata.metadata());
                this.partitionGroup.setPartitionTime(partition, committedTimestamp);
                this.log.debug("A committed timestamp was detected: setting the partition time of partition {} to {} in stream task {}", new Object[]{partition, committedTimestamp, this.id});
                continue;
            }
            this.log.debug("No committed timestamp was found in metadata for partition {}", (Object)partition);
        }
        HashSet<TopicPartition> nonCommitted = new HashSet<TopicPartition>(this.inputPartitions());
        nonCommitted.removeAll(offsetsAndMetadata.keySet());
        for (TopicPartition partition : nonCommitted) {
            this.log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", (Object)partition);
        }
    }

    @Override
    public Map<TopicPartition, Long> purgeableOffsets() {
        HashMap<TopicPartition, Long> purgeableConsumedOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            if (!this.topology.isRepartitionTopic(tp.topic())) continue;
            purgeableConsumedOffsets.put(tp, entry.getValue() + 1L);
        }
        return purgeableConsumedOffsets;
    }

    private void initializeTopology() {
        this.log.trace("Initializing processor nodes of the topology");
        for (ProcessorNode<?, ?> node : this.topology.processors()) {
            this.processorContext.setCurrentNode(node);
            try {
                node.init(this.processorContext);
            }
            finally {
                this.processorContext.setCurrentNode(null);
            }
        }
    }

    @Override
    public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
        int newQueueSize = this.partitionGroup.addRawRecords(partition, records);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Added records into the buffered queue of partition {}, new queue size is {}", (Object)partition, (Object)newQueueSize);
        }
        if (newQueueSize > this.maxBufferedSize) {
            this.mainConsumer.pause(Collections.singleton(partition));
        }
    }

    public Cancellable schedule(long interval, PunctuationType type, Punctuator punctuator) {
        switch (type) {
            case STREAM_TIME: {
                return this.schedule(0L, interval, type, punctuator);
            }
            case WALL_CLOCK_TIME: {
                return this.schedule(this.time.milliseconds() + interval, interval, type, punctuator);
            }
        }
        throw new IllegalArgumentException("Unrecognized PunctuationType: " + (Object)((Object)type));
    }

    private Cancellable schedule(long startTime, long interval, PunctuationType type, Punctuator punctuator) {
        if (this.processorContext.currentNode() == null) {
            throw new IllegalStateException(String.format("%sCurrent node is null", this.logPrefix));
        }
        PunctuationSchedule schedule = new PunctuationSchedule(this.processorContext.currentNode(), startTime, interval, punctuator);
        switch (type) {
            case STREAM_TIME: {
                return this.streamTimePunctuationQueue.schedule(schedule);
            }
            case WALL_CLOCK_TIME: {
                return this.systemTimePunctuationQueue.schedule(schedule);
            }
        }
        throw new IllegalArgumentException("Unrecognized PunctuationType: " + (Object)((Object)type));
    }

    @Override
    public boolean maybePunctuateStreamTime() {
        long streamTime = this.partitionGroup.streamTime();
        if (streamTime == -1L) {
            return false;
        }
        boolean punctuated = this.streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
        if (punctuated) {
            this.commitNeeded = true;
        }
        return punctuated;
    }

    @Override
    public boolean maybePunctuateSystemTime() {
        long systemTime = this.time.milliseconds();
        boolean punctuated = this.systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
        if (punctuated) {
            this.commitNeeded = true;
        }
        return punctuated;
    }

    void maybeRecordE2ELatency(long recordTimestamp, long now, String nodeName) {
        Sensor e2eLatencySensor = this.e2eLatencySensors.get(nodeName);
        if (e2eLatencySensor == null) {
            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
        }
        if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
            e2eLatencySensor.record((double)(now - recordTimestamp), now);
        }
    }

    void requestCommit() {
        this.commitRequested = true;
    }

    @Override
    public boolean commitRequested() {
        return this.commitRequested;
    }

    static String encodeTimestamp(long partitionTime) {
        ByteBuffer buffer = ByteBuffer.allocate(9);
        buffer.put((byte)1);
        buffer.putLong(partitionTime);
        return Base64.getEncoder().encodeToString(buffer.array());
    }

    long decodeTimestamp(String encryptedString) {
        if (encryptedString.isEmpty()) {
            return -1L;
        }
        ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
        byte version = buffer.get();
        switch (version) {
            case 1: {
                return buffer.getLong();
            }
        }
        this.log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.", (Object)1, (Object)version);
        return -1L;
    }

    public InternalProcessorContext processorContext() {
        return this.processorContext;
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        Set<TopicPartition> partitions;
        StringBuilder sb = new StringBuilder();
        sb.append(indent);
        sb.append("TaskId: ");
        sb.append(this.id);
        sb.append("\n");
        if (this.topology != null) {
            sb.append(indent).append(this.topology.toString(indent + "\t"));
        }
        if ((partitions = this.inputPartitions()) != null && !partitions.isEmpty()) {
            sb.append(indent).append("Partitions [");
            for (TopicPartition topicPartition : partitions) {
                sb.append(topicPartition).append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    @Override
    public boolean commitNeeded() {
        return this.commitNeeded;
    }

    @Override
    public Map<TopicPartition, Long> changelogOffsets() {
        if (this.state() == Task.State.RUNNING) {
            return this.changelogPartitions().stream().collect(Collectors.toMap(Function.identity(), tp -> -2L));
        }
        return Collections.unmodifiableMap(this.stateMgr.changelogOffsets());
    }

    public boolean hasRecordsQueued() {
        return this.numBuffered() > 0;
    }

    RecordCollector recordCollector() {
        return this.recordCollector;
    }

    int numBuffered() {
        return this.partitionGroup.numBuffered();
    }

    long streamTime() {
        return this.partitionGroup.streamTime();
    }

    private class RecordQueueCreator {
        private final LogContext logContext;
        private final TimestampExtractor defaultTimestampExtractor;
        private final DeserializationExceptionHandler defaultDeserializationExceptionHandler;

        private RecordQueueCreator(LogContext logContext, TimestampExtractor defaultTimestampExtractor, DeserializationExceptionHandler defaultDeserializationExceptionHandler) {
            this.logContext = logContext;
            this.defaultTimestampExtractor = defaultTimestampExtractor;
            this.defaultDeserializationExceptionHandler = defaultDeserializationExceptionHandler;
        }

        public RecordQueue createQueue(TopicPartition partition) {
            SourceNode<?, ?> source = StreamTask.this.topology.source(partition.topic());
            TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor();
            TimestampExtractor timestampExtractor = sourceTimestampExtractor != null ? sourceTimestampExtractor : this.defaultTimestampExtractor;
            return new RecordQueue(partition, source, timestampExtractor, this.defaultDeserializationExceptionHandler, StreamTask.this.processorContext, this.logContext);
        }
    }
}

