/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.Java8StreamUtils;
import io.confluent.parallelconsumer.JStreamParallelStreamProcessor;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JStreamParallelEoSStreamProcessor<K, V>
extends ParallelEoSStreamProcessor<K, V>
implements JStreamParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(JStreamParallelEoSStreamProcessor.class);
    private final Stream<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> stream;
    private final ConcurrentLinkedDeque<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> userProcessResultsStream = new ConcurrentLinkedDeque();

    public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsumerOptions) {
        super(parallelConsumerOptions);
        this.stream = Java8StreamUtils.setupStreamFromDeque(this.userProcessResultsStream);
    }

    @Override
    public Stream<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction) {
        super.pollAndProduceMany(userFunction, result -> {
            log.trace("Wrapper callback applied, sending result to stream. Input: {}", result);
            this.userProcessResultsStream.add((ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>)result);
        });
        return this.stream;
    }
}

