/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.ProducerManager;
import io.confluent.parallelconsumer.internal.UserFunctions;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

public class ParallelEoSStreamProcessor<K, V>
extends AbstractParallelEoSStreamProcessor<K, V>
implements ParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessor.class);

    public ParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions, PCModule<K, V> module) {
        super(newOptions, module);
    }

    public ParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
        super(newOptions);
    }

    @Override
    public void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction) {
        Function wrappedUserFunc = context -> {
            log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", context);
            UserFunctions.carefullyRun(usersVoidConsumptionFunction, context.getPollContext());
            log.trace("asyncPoll - user function finished ok.");
            return UniLists.of();
        };
        Consumer<Object> voidCallBack = ignore -> log.trace("Void callback applied.");
        this.supervisorLoop(wrappedUserFunc, voidCallBack);
    }

    @Override
    public void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction, Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> callback) {
        if (!this.getOptions().isProducerSupplied()) {
            throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options");
        }
        Function producingUserFunctionWrapper = context -> this.processAndProduceResults(userFunction, (PollContextInternal<K, V>)context);
        this.supervisorLoop(producingUserFunctionWrapper, callback);
    }

    private List<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> processAndProduceResults(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction, PollContextInternal<K, V> context) {
        List<ProducerRecord<K, V>> recordListToProduce;
        ProducerManager<K, V> pm = super.getProducerManager().get();
        if (this.options.isUsingTransactionCommitMode() && !this.options.isAllowEagerProcessingDuringTransactionCommit()) {
            try {
                ProducerManager.ProducingLock produceLock = pm.beginProducing(context);
                context.setProducingLock(Optional.of(produceLock));
            }
            catch (TimeoutException e) {
                throw new RuntimeException(StringUtils.msg("Timeout trying to early acquire produce lock to send record in {} mode - could not START record processing phase", new Object[]{ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER}), e);
            }
        }
        if ((recordListToProduce = UserFunctions.carefullyRun(userFunction, context.getPollContext())).isEmpty()) {
            log.debug("No result returned from function to send.");
            return UniLists.of();
        }
        log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", context, recordListToProduce);
        ArrayList<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> results = new ArrayList<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>>();
        log.trace("Producing {} messages in result...", (Object)recordListToProduce.size());
        if (this.options.isUsingTransactionCommitMode() && this.options.isAllowEagerProcessingDuringTransactionCommit()) {
            try {
                ProducerManager.ProducingLock produceLock = pm.beginProducing(context);
                context.setProducingLock(Optional.of(produceLock));
            }
            catch (TimeoutException e) {
                throw new RuntimeException(StringUtils.msg("Timeout trying to late acquire produce lock to send record in {} mode", new Object[]{ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER}), e);
            }
        }
        try {
            List futures = pm.produceMessages(recordListToProduce);
            TimeUtils.time(() -> {
                for (ParallelConsumer.Tuple futureTuple : futures) {
                    Future futureSend = (Future)futureTuple.getRight();
                    RecordMetadata recordMetadata = (RecordMetadata)futureSend.get(this.options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS);
                    ParallelStreamProcessor.ConsumeProduceResult result = new ParallelStreamProcessor.ConsumeProduceResult(context.getPollContext(), (ProducerRecord)futureTuple.getLeft(), recordMetadata);
                    results.add(result);
                }
                return null;
            });
        }
        catch (InvalidPidMappingException invalidPidMappingException) {
            log.error("Closing parallel Consumer due to InvalidPidMappingException", (Throwable)invalidPidMappingException);
            this.closeOnException((Exception)((Object)invalidPidMappingException));
        }
        catch (Exception e) {
            throw new InternalRuntimeException("Error while waiting for produce results", e);
        }
        return results;
    }

    @Override
    public void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction) {
        this.pollAndProduceMany(userFunction, consumerRecord -> log.trace("No-op user callback"));
    }

    @Override
    public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> userFunction) {
        this.pollAndProduce(userFunction, consumerRecord -> log.trace("No-op user callback"));
    }

    @Override
    public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> userFunction, Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> callback) {
        this.pollAndProduceMany(consumerRecord -> UniLists.of((Object)((ProducerRecord)userFunction.apply((PollContext)consumerRecord))), callback);
    }
}

