/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerMetricUpdate;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class TieredStorageProducerClient {
    private final boolean isBroadcastOnly;
    private final int numSubpartitions;
    private final BufferAccumulator bufferAccumulator;
    private final BufferCompressor bufferCompressor;
    private final List<TierProducerAgent> tierProducerAgents;
    private final int[] currentSubpartitionSegmentId;
    private final TierProducerAgent[] currentSubpartitionTierAgent;
    @Nullable
    private Consumer<TieredStorageProducerMetricUpdate> metricStatisticsUpdater;

    public TieredStorageProducerClient(int numSubpartitions, boolean isBroadcastOnly, BufferAccumulator bufferAccumulator, @Nullable BufferCompressor bufferCompressor, List<TierProducerAgent> tierProducerAgents) {
        this.isBroadcastOnly = isBroadcastOnly;
        this.numSubpartitions = numSubpartitions;
        this.bufferAccumulator = bufferAccumulator;
        this.bufferCompressor = bufferCompressor;
        this.tierProducerAgents = tierProducerAgents;
        this.currentSubpartitionSegmentId = new int[numSubpartitions];
        this.currentSubpartitionTierAgent = new TierProducerAgent[numSubpartitions];
        Arrays.fill(this.currentSubpartitionSegmentId, -1);
        bufferAccumulator.setup(this::writeAccumulatedBuffers);
    }

    public void write(ByteBuffer record, TieredStorageSubpartitionId subpartitionId, Buffer.DataType dataType, boolean isBroadcast) throws IOException {
        if (isBroadcast && !this.isBroadcastOnly) {
            for (int i = 0; i < this.numSubpartitions; ++i) {
                this.bufferAccumulator.receive(record.duplicate(), new TieredStorageSubpartitionId(i), dataType, isBroadcast);
            }
        } else {
            this.bufferAccumulator.receive(record, subpartitionId, dataType, isBroadcast);
        }
    }

    public void setMetricStatisticsUpdater(Consumer<TieredStorageProducerMetricUpdate> metricStatisticsUpdater) {
        this.metricStatisticsUpdater = (Consumer)Preconditions.checkNotNull(metricStatisticsUpdater);
    }

    public void close() {
        this.bufferAccumulator.close();
        this.tierProducerAgents.forEach(TierProducerAgent::close);
    }

    private void writeAccumulatedBuffers(TieredStorageSubpartitionId subpartitionId, List<Buffer> accumulatedBuffers) {
        Iterator<Buffer> bufferIterator = accumulatedBuffers.iterator();
        int numWriteBytes = 0;
        int numWriteBuffers = 0;
        while (bufferIterator.hasNext()) {
            Buffer buffer = bufferIterator.next();
            ++numWriteBuffers;
            numWriteBytes += buffer.readableBytes();
            try {
                this.writeAccumulatedBuffer(subpartitionId, buffer);
            }
            catch (IOException ioe) {
                buffer.recycleBuffer();
                while (bufferIterator.hasNext()) {
                    bufferIterator.next().recycleBuffer();
                }
                ExceptionUtils.rethrow((Throwable)ioe);
            }
        }
        this.updateMetricStatistics(numWriteBuffers, numWriteBytes);
    }

    private void writeAccumulatedBuffer(TieredStorageSubpartitionId subpartitionId, Buffer accumulatedBuffer) throws IOException {
        Buffer compressedBuffer = this.compressBufferIfPossible(accumulatedBuffer);
        if (this.currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] == null) {
            this.chooseStorageTierToStartSegment(subpartitionId);
        }
        if (!this.currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(subpartitionId, compressedBuffer, this.bufferAccumulator)) {
            this.chooseStorageTierToStartSegment(subpartitionId);
            Preconditions.checkState((boolean)this.currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(subpartitionId, compressedBuffer, this.bufferAccumulator), (Object)"Failed to write the first buffer to the new segment");
        }
    }

    private void chooseStorageTierToStartSegment(TieredStorageSubpartitionId subpartitionId) throws IOException {
        int subpartitionIndex = subpartitionId.getSubpartitionId();
        int segmentIndex = this.currentSubpartitionSegmentId[subpartitionIndex];
        int nextSegmentIndex = segmentIndex + 1;
        for (TierProducerAgent tierProducerAgent : this.tierProducerAgents) {
            if (!tierProducerAgent.tryStartNewSegment(subpartitionId, nextSegmentIndex)) continue;
            this.currentSubpartitionSegmentId[subpartitionIndex] = nextSegmentIndex;
            this.currentSubpartitionTierAgent[subpartitionIndex] = tierProducerAgent;
            return;
        }
        throw new IOException("Failed to choose a storage tier to start a new segment.");
    }

    private Buffer compressBufferIfPossible(Buffer buffer) {
        if (!this.canBeCompressed(buffer)) {
            return buffer;
        }
        return ((BufferCompressor)Preconditions.checkNotNull((Object)this.bufferCompressor)).compressToOriginalBuffer(buffer);
    }

    private boolean canBeCompressed(Buffer buffer) {
        return this.bufferCompressor != null && buffer.isBuffer() && buffer.readableBytes() > 0;
    }

    private void updateMetricStatistics(int numWriteBuffersDelta, int numWriteBytesDelta) {
        ((Consumer)Preconditions.checkNotNull(this.metricStatisticsUpdater)).accept(new TieredStorageProducerMetricUpdate(numWriteBuffersDelta, numWriteBytesDelta));
    }
}

