package org.apache.kafka.streams.processor.internals;

import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.TopicMetrics;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordCollectorImpl.class */
public class RecordCollectorImpl implements RecordCollector {
    private static final String SEND_EXCEPTION_MESSAGE = "Error encountered sending record to topic %s for task %s due to:%n%s";
    private final Logger log;
    private final TaskId taskId;
    private final StreamsProducer streamsProducer;
    private final ProductionExceptionHandler productionExceptionHandler;
    private final boolean eosEnabled;
    private final Map<TopicPartition, Long> offsets;
    private final StreamsMetricsImpl streamsMetrics;
    private final Sensor droppedRecordsSensor;
    private final Map<String, Sensor> producedSensorByTopic = new HashMap();
    private final AtomicReference<KafkaException> sendException;

    public RecordCollectorImpl(LogContext logContext, TaskId taskId, StreamsProducer streamsProducer, ProductionExceptionHandler productionExceptionHandler, StreamsMetricsImpl streamsMetricsImpl, ProcessorTopology processorTopology) {
        this.log = logContext.logger(getClass());
        this.taskId = taskId;
        this.streamsProducer = streamsProducer;
        this.sendException = streamsProducer.sendException();
        this.productionExceptionHandler = productionExceptionHandler;
        this.eosEnabled = streamsProducer.eosEnabled();
        this.streamsMetrics = streamsMetricsImpl;
        String name = Thread.currentThread().getName();
        this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(name, taskId.toString(), streamsMetricsImpl);
        for (String str : processorTopology.sinkTopics()) {
            this.producedSensorByTopic.put(str, TopicMetrics.producedSensor(name, taskId.toString(), processorTopology.sink(str).name(), str, streamsMetricsImpl));
        }
        this.offsets = new HashMap();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void initialize() {
        if (this.eosEnabled) {
            this.streamsProducer.initTransaction();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public <K, V> void send(String str, K k, V v, Headers headers, Long l, Serializer<K> serializer, Serializer<V> serializer2, String str2, InternalProcessorContext<Void, Void> internalProcessorContext, StreamPartitioner<? super K, ? super V> streamPartitioner) {
        if (streamPartitioner == null) {
            send(str, (String) k, (K) v, headers, (Integer) null, l, (Serializer<String>) serializer, (Serializer<K>) serializer2, str2, internalProcessorContext);
            return;
        }
        try {
            List<PartitionInfo> partitionsFor = this.streamsProducer.partitionsFor(str);
            if (partitionsFor.size() <= 0) {
                throw new StreamsException("Could not get partition information for topic " + str + " for task " + this.taskId + ". This can happen if the topic does not exist.");
            }
            Optional<Set<Integer>> partitions = streamPartitioner.partitions(str, k, v, partitionsFor.size());
            if (!partitions.isPresent()) {
                send(str, (String) k, (K) v, headers, (Integer) null, l, (Serializer<String>) serializer, (Serializer<K>) serializer2, str2, internalProcessorContext);
                return;
            }
            Set<Integer> set = partitions.get();
            if (set.isEmpty()) {
                this.log.warn("Skipping record as partitioner returned empty partitions. topic=[{}]", str);
                this.droppedRecordsSensor.record();
            } else {
                Iterator<Integer> it = set.iterator();
                while (it.hasNext()) {
                    send(str, (String) k, (K) v, headers, Integer.valueOf(it.next().intValue()), l, (Serializer<String>) serializer, (Serializer<K>) serializer2, str2, internalProcessorContext);
                }
            }
        } catch (KafkaException e) {
            throw new StreamsException("Could not determine the number of partitions for topic '" + str + "' for task " + this.taskId + " due to " + e, (Throwable) e);
        } catch (TimeoutException e2) {
            this.log.warn("Could not get partitions for topic {}, will retry", str);
            throw e2;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public <K, V> void send(String str, K k, V v, Headers headers, Integer num, Long l, Serializer<K> serializer, Serializer<V> serializer2, String str2, InternalProcessorContext<Void, Void> internalProcessorContext) {
        checkForException();
        try {
            try {
                ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(str, num, l, serializer.serialize(str, headers, k), serializer2.serialize(str, headers, v), headers);
                this.streamsProducer.send(producerRecord, (recordMetadata, exc) -> {
                    try {
                        if (this.sendException.get() != null) {
                            return;
                        }
                        if (exc == null) {
                            TopicPartition topicPartition = new TopicPartition(recordMetadata.topic(), recordMetadata.partition());
                            if (recordMetadata.offset() >= 0) {
                                this.offsets.put(topicPartition, Long.valueOf(recordMetadata.offset()));
                            } else {
                                this.log.warn("Received offset={} in produce response for {}", Long.valueOf(recordMetadata.offset()), topicPartition);
                            }
                            if (!str.endsWith("-changelog")) {
                                this.producedSensorByTopic.computeIfAbsent(str, str3 -> {
                                    return TopicMetrics.producedSensor(Thread.currentThread().getName(), this.taskId.toString(), str2, str, internalProcessorContext.metrics());
                                }).record(ClientUtils.producerRecordSizeInBytes(producerRecord), internalProcessorContext.currentSystemTimeMs());
                            }
                        } else {
                            recordSendError(str, exc, producerRecord, internalProcessorContext, str2);
                            this.log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", new Object[]{k, v, l, str, num});
                        }
                    } catch (RuntimeException e) {
                        this.sendException.set(new StreamsException("Producer.send `Callback` failed", e));
                    }
                });
            } catch (ClassCastException e) {
                throw createStreamsExceptionForClassCastException(ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, str, v, serializer2, e);
            } catch (RuntimeException e2) {
                handleException(ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, str, k, v, headers, num, l, str2, internalProcessorContext, e2);
            }
        } catch (ClassCastException e3) {
            throw createStreamsExceptionForClassCastException(ProductionExceptionHandler.SerializationExceptionOrigin.KEY, str, k, serializer, e3);
        } catch (RuntimeException e4) {
            handleException(ProductionExceptionHandler.SerializationExceptionOrigin.KEY, str, k, v, headers, num, l, str2, internalProcessorContext, e4);
        }
    }

    private <K, V> void handleException(ProductionExceptionHandler.SerializationExceptionOrigin serializationExceptionOrigin, String str, K k, V v, Headers headers, Integer num, Long l, String str2, InternalProcessorContext<Void, Void> internalProcessorContext, RuntimeException runtimeException) {
        this.log.debug(String.format("Error serializing record for topic %s", str), runtimeException);
        try {
            if (((ProductionExceptionHandler.ProductionExceptionHandlerResponse) Objects.requireNonNull(this.productionExceptionHandler.handleSerializationException(errorHandlerContext(internalProcessorContext, str2), new ProducerRecord(str, num, l, k, v, headers), runtimeException, serializationExceptionOrigin), "Invalid ProductionExceptionHandler response.")) == ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL) {
                throw new StreamsException(String.format("Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", str, num, l), runtimeException);
            }
            this.log.warn("Unable to serialize record, continue processing. ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", new Object[]{str, num, l});
            this.droppedRecordsSensor.record();
        } catch (RuntimeException e) {
            this.log.error(String.format("Production error callback failed after serialization error for record %s: %s", serializationExceptionOrigin.toString().toLowerCase(Locale.ROOT), errorHandlerContext(internalProcessorContext, str2)), runtimeException);
            throw new FailedProcessingException("Fatal user code error in production error callback", e);
        }
    }

    private DefaultErrorHandlerContext errorHandlerContext(InternalProcessorContext<Void, Void> internalProcessorContext, String str) {
        ProcessorRecordContext recordContext = internalProcessorContext != null ? internalProcessorContext.recordContext() : null;
        return recordContext != null ? new DefaultErrorHandlerContext(internalProcessorContext, recordContext.topic(), recordContext.partition(), recordContext.offset(), recordContext.headers(), str, this.taskId, recordContext.timestamp()) : new DefaultErrorHandlerContext(internalProcessorContext, null, -1, -1L, new RecordHeaders(), str, this.taskId, -1L);
    }

    private <KV> StreamsException createStreamsExceptionForClassCastException(ProductionExceptionHandler.SerializationExceptionOrigin serializationExceptionOrigin, String str, KV kv, Serializer<KV> serializer, ClassCastException classCastException) {
        return new StreamsException(MessageFormat.format(String.format("ClassCastException while producing data to topic %s. The {0} serializer %s is not compatible to the actual {0} type: %s. Change the default {0} serde in StreamConfig or provide the correct {0} serde via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.{0}Serde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", str, serializer.getClass().getName(), kv == null ? String.format("unknown because %s is null", serializationExceptionOrigin.toString().toLowerCase(Locale.ROOT)) : kv.getClass().getName()), serializationExceptionOrigin.toString().toLowerCase(Locale.ROOT)), classCastException);
    }

    private void recordSendError(String str, Exception exc, ProducerRecord<byte[], byte[]> producerRecord, InternalProcessorContext<Void, Void> internalProcessorContext, String str2) {
        String str3;
        String format = String.format(SEND_EXCEPTION_MESSAGE, str, this.taskId, exc.toString());
        if (isFatalException(exc)) {
            str3 = format + "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
            this.sendException.set(new StreamsException(str3, exc));
        } else if ((exc instanceof ProducerFencedException) || (exc instanceof InvalidPidMappingException) || (exc instanceof InvalidProducerEpochException) || (exc instanceof OutOfOrderSequenceException)) {
            str3 = format + "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out";
            this.sendException.set(new TaskMigratedException(str3, exc));
        } else if (exc instanceof RetriableException) {
            str3 = format + "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, or the connection to broker was interrupted sending the request or receiving the response. \nConsider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
            this.sendException.set(new TaskCorruptedException(Collections.singleton(this.taskId)));
        } else {
            try {
                if (((ProductionExceptionHandler.ProductionExceptionHandlerResponse) Objects.requireNonNull(this.productionExceptionHandler.handle(errorHandlerContext(internalProcessorContext, str2), producerRecord, exc), "Invalid ProductionExceptionHandler response.")) == ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL) {
                    str3 = format + "\nException handler choose to FAIL the processing, no more records would be sent.";
                    this.sendException.set(new StreamsException(str3, exc));
                } else {
                    str3 = format + "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.";
                    this.droppedRecordsSensor.record();
                }
            } catch (RuntimeException e) {
                this.log.error("Production error callback failed after production error for record {}", producerRecord, exc);
                this.sendException.set(new FailedProcessingException("Fatal user code error in production error callback", e));
                return;
            }
        }
        this.log.error(str3, exc);
    }

    private boolean isFatalException(Exception exc) {
        return ((exc instanceof AuthenticationException) || (exc instanceof AuthorizationException) || (exc instanceof SecurityDisabledException)) || ((exc instanceof InvalidTopicException) || (exc instanceof UnknownServerException) || (exc instanceof SerializationException) || (exc instanceof OffsetMetadataTooLarge) || (exc instanceof IllegalStateException));
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void flush() {
        this.log.debug("Flushing record collector");
        this.streamsProducer.flush();
        checkForException();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void closeClean() {
        this.log.info("Closing record collector clean");
        removeAllProducedSensors();
        close();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void closeDirty() {
        this.log.info("Closing record collector dirty");
        if (this.eosEnabled) {
            this.streamsProducer.abortTransaction();
        }
        close();
    }

    private void close() {
        this.offsets.clear();
        checkForException();
    }

    private void removeAllProducedSensors() {
        Iterator<Sensor> it = this.producedSensorByTopic.values().iterator();
        while (it.hasNext()) {
            this.streamsMetrics.removeSensor(it.next());
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public Map<TopicPartition, Long> offsets() {
        return Collections.unmodifiableMap(new HashMap(this.offsets));
    }

    private void checkForException() {
        KafkaException kafkaException = this.sendException.get();
        if (kafkaException != null) {
            this.sendException.set(null);
            throw kafkaException;
        }
    }

    Producer<byte[], byte[]> producer() {
        return this.streamsProducer.kafkaProducer();
    }
}
