package org.springframework.batch.integration.chunk;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.integration.gateway.MessagingGateway;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/batch/integration/chunk/ChunkMessageChannelItemWriter.class */
public class ChunkMessageChannelItemWriter<T> extends StepExecutionListenerSupport implements ItemWriter<T>, ItemStream, StepContributionSource {
    private static final Log logger = LogFactory.getLog(ChunkMessageChannelItemWriter.class);
    static final String ACTUAL = ChunkMessageChannelItemWriter.class.getName() + ".ACTUAL";
    static final String EXPECTED = ChunkMessageChannelItemWriter.class.getName() + ".EXPECTED";
    private static final long DEFAULT_THROTTLE_LIMIT = 6;
    private MessagingGateway messagingGateway;
    private LocalState localState = new LocalState();
    private long throttleLimit = DEFAULT_THROTTLE_LIMIT;
    private int DEFAULT_MAX_WAIT_TIMEOUTS = 40;
    private int maxWaitTimeouts = this.DEFAULT_MAX_WAIT_TIMEOUTS;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/batch/integration/chunk/ChunkMessageChannelItemWriter$LocalState.class */
    public static class LocalState {
        private AtomicInteger actual;
        private AtomicInteger expected;
        private AtomicInteger redelivered;
        private StepExecution stepExecution;
        private Queue<StepContribution> contributions;

        private LocalState() {
            this.actual = new AtomicInteger();
            this.expected = new AtomicInteger();
            this.redelivered = new AtomicInteger();
            this.contributions = new LinkedBlockingQueue();
        }

        public int getExpecting() {
            return this.expected.get() - this.actual.get();
        }

        public void open(int i, int i2) {
            this.actual.set(i2);
            this.expected.set(i);
        }

        public Collection<StepContribution> pollStepContributions() {
            ArrayList arrayList = new ArrayList();
            synchronized (this.contributions) {
                StepContribution poll = this.contributions.poll();
                while (poll != null) {
                    arrayList.add(poll);
                    poll = this.contributions.poll();
                }
            }
            return arrayList;
        }

        public void pushStepContribution(StepContribution stepContribution) {
            synchronized (this.contributions) {
                this.contributions.add(stepContribution);
            }
        }

        public void incrementRedelivered() {
            this.redelivered.incrementAndGet();
        }

        public void incrementActual() {
            this.actual.incrementAndGet();
        }

        public void incrementExpected() {
            this.expected.incrementAndGet();
        }

        public StepContribution createStepContribution() {
            return this.stepExecution.createStepContribution();
        }

        public Long getJobId() {
            return this.stepExecution.getJobExecution().getJobId();
        }

        public void setStepExecution(StepExecution stepExecution) {
            this.stepExecution = stepExecution;
        }

        public void reset() {
            this.expected.set(0);
            this.actual.set(0);
        }
    }

    public void setMaxWaitTimeouts(int i) {
        this.maxWaitTimeouts = i;
    }

    public void setThrottleLimit(long j) {
        this.throttleLimit = j;
    }

    public void setMessagingGateway(MessagingGateway messagingGateway) {
        this.messagingGateway = messagingGateway;
    }

    public void write(List<? extends T> list) throws Exception {
        while (this.localState.getExpecting() > this.throttleLimit) {
            getNextResult();
        }
        if (list.isEmpty()) {
            return;
        }
        logger.debug("Dispatching chunk: " + list);
        this.messagingGateway.send(new ChunkRequest(list, this.localState.getJobId(), this.localState.createStepContribution()));
        this.localState.incrementExpected();
    }

    public void beforeStep(StepExecution stepExecution) {
        this.localState.setStepExecution(stepExecution);
    }

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            return ExitStatus.EXECUTING;
        }
        long expecting = this.localState.getExpecting();
        try {
            try {
                logger.debug("Waiting for results in step listener...");
                boolean z = !waitForResults();
                logger.debug("Finished waiting for results in step listener.");
                Iterator<StepContribution> it = getStepContributions().iterator();
                while (it.hasNext()) {
                    stepExecution.apply(it.next());
                }
                if (!z) {
                    return ExitStatus.COMPLETED.addExitDescription("Waited for " + expecting + " results.");
                }
                stepExecution.setStatus(BatchStatus.FAILED);
                throw new ItemStreamException("Timed out waiting for back log at end of step");
            } catch (RuntimeException e) {
                logger.debug("Detected failure waiting for results in step listener.", e);
                stepExecution.setStatus(BatchStatus.FAILED);
                ExitStatus addExitDescription = ExitStatus.FAILED.addExitDescription(e.getClass().getName() + ": " + e.getMessage());
                Iterator<StepContribution> it2 = getStepContributions().iterator();
                while (it2.hasNext()) {
                    stepExecution.apply(it2.next());
                }
                return addExitDescription;
            }
        } catch (Throwable th) {
            Iterator<StepContribution> it3 = getStepContributions().iterator();
            while (it3.hasNext()) {
                stepExecution.apply(it3.next());
            }
            throw th;
        }
    }

    public void close() throws ItemStreamException {
        this.localState.reset();
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(EXPECTED)) {
            this.localState.open(executionContext.getInt(EXPECTED), executionContext.getInt(ACTUAL));
            if (!waitForResults()) {
                throw new ItemStreamException("Timed out waiting for back log on open");
            }
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putInt(EXPECTED, this.localState.expected.intValue());
        executionContext.putInt(ACTUAL, this.localState.actual.intValue());
    }

    @Override // org.springframework.batch.integration.chunk.StepContributionSource
    public Collection<StepContribution> getStepContributions() {
        return this.localState.pollStepContributions();
    }

    private boolean waitForResults() throws AsynchronousFailureException {
        int i = 0;
        int i2 = this.maxWaitTimeouts;
        Throwable th = null;
        logger.error("Waiting for " + this.localState.getExpecting() + " results");
        while (this.localState.getExpecting() > 0) {
            int i3 = i;
            i++;
            if (i3 >= i2) {
                break;
            }
            try {
                getNextResult();
            } catch (Throwable th2) {
                logger.error("Detected error in remote result. Trying to recover " + this.localState.getExpecting() + " outstanding results before completing.", th2);
                th = th2;
            }
        }
        if (th != null) {
            throw wrapIfNecessary(th);
        }
        return i < i2;
    }

    private void getNextResult() throws AsynchronousFailureException {
        ChunkResponse chunkResponse = (ChunkResponse) this.messagingGateway.receive();
        if (chunkResponse != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Found result: " + chunkResponse);
            }
            Long jobId = chunkResponse.getJobId();
            Assert.state(jobId != null, "Message did not contain job instance id.");
            Assert.state(jobId.equals(this.localState.getJobId()), "Message contained wrong job instance id [" + jobId + "] should have been [" + this.localState.getJobId() + "].");
            if (chunkResponse.isRedelivered()) {
                logger.warn("Redelivered result detected, which may indicate stale state. In the best case, we just picked up a timed out message from a previous failed execution. In the worst case (and if this is not a restart), the step may now timeout.  In that case if you believe that all messages from workers have been sent, the business state is probably inconsistent, and the step will fail.");
                this.localState.incrementRedelivered();
            }
            this.localState.pushStepContribution(chunkResponse.getStepContribution());
            this.localState.incrementActual();
            if (!chunkResponse.isSuccessful()) {
                throw new AsynchronousFailureException("Failure or interrupt detected in handler: " + chunkResponse.getMessage());
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static AsynchronousFailureException wrapIfNecessary(Throwable th) {
        if (th instanceof Error) {
            throw ((Error) th);
        }
        return th instanceof AsynchronousFailureException ? (AsynchronousFailureException) th : new AsynchronousFailureException("Exception in remote process", th);
    }
}
