/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.CheckpointAbortRequest;
import org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest;
import org.apache.flink.runtime.checkpoint.channel.CheckpointStartRequest;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.checkpoint.channel.SubtaskRegisterRequest;
import org.apache.flink.runtime.checkpoint.channel.SubtaskReleaseRequest;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class ChannelStateWriteRequest {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequest.class);
    private final JobVertexID jobVertexID;
    private final int subtaskIndex;
    private final long checkpointId;
    private final String name;

    public ChannelStateWriteRequest(JobVertexID jobVertexID, int subtaskIndex, long checkpointId, String name) {
        this.jobVertexID = jobVertexID;
        this.subtaskIndex = subtaskIndex;
        this.checkpointId = checkpointId;
        this.name = name;
    }

    public final JobVertexID getJobVertexID() {
        return this.jobVertexID;
    }

    public final int getSubtaskIndex() {
        return this.subtaskIndex;
    }

    public final long getCheckpointId() {
        return this.checkpointId;
    }

    public CompletableFuture<?> getReadyFuture() {
        return AvailabilityProvider.AVAILABLE;
    }

    public String toString() {
        return this.name + " {jobVertexID=" + String.valueOf(this.jobVertexID) + ", subtaskIndex=" + this.subtaskIndex + ", checkpointId=" + this.checkpointId + "}";
    }

    abstract void cancel(Throwable var1) throws Exception;

    static CheckpointInProgressRequest completeInput(JobVertexID jobVertexID, int subtaskIndex, long checkpointId) {
        return new CheckpointInProgressRequest("completeInput", jobVertexID, subtaskIndex, checkpointId, writer -> writer.completeInput(jobVertexID, subtaskIndex));
    }

    static CheckpointInProgressRequest completeOutput(JobVertexID jobVertexID, int subtaskIndex, long checkpointId) {
        return new CheckpointInProgressRequest("completeOutput", jobVertexID, subtaskIndex, checkpointId, writer -> writer.completeOutput(jobVertexID, subtaskIndex));
    }

    static ChannelStateWriteRequest write(JobVertexID jobVertexID, int subtaskIndex, long checkpointId, InputChannelInfo info, CloseableIterator<Buffer> iterator) {
        return ChannelStateWriteRequest.buildWriteRequest(jobVertexID, subtaskIndex, checkpointId, "writeInput", iterator, (writer, buffer) -> writer.writeInput(jobVertexID, subtaskIndex, info, (Buffer)buffer));
    }

    static ChannelStateWriteRequest write(JobVertexID jobVertexID, int subtaskIndex, long checkpointId, ResultSubpartitionInfo info, Buffer ... buffers) {
        return ChannelStateWriteRequest.buildWriteRequest(jobVertexID, subtaskIndex, checkpointId, "writeOutput", CloseableIterator.ofElements(Buffer::recycleBuffer, buffers), (writer, buffer) -> writer.writeOutput(jobVertexID, subtaskIndex, info, (Buffer)buffer));
    }

    static ChannelStateWriteRequest write(JobVertexID jobVertexID, int subtaskIndex, long checkpointId, ResultSubpartitionInfo info, CompletableFuture<List<Buffer>> dataFuture) {
        return ChannelStateWriteRequest.buildFutureWriteRequest(jobVertexID, subtaskIndex, checkpointId, "writeOutputFuture", dataFuture, (writer, buffer) -> writer.writeOutput(jobVertexID, subtaskIndex, info, (Buffer)buffer));
    }

    static ChannelStateWriteRequest buildFutureWriteRequest(JobVertexID jobVertexID, int subtaskIndex, long checkpointId, String name, CompletableFuture<List<Buffer>> dataFuture, BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
        return new CheckpointInProgressRequest(name, jobVertexID, subtaskIndex, checkpointId, writer -> {
            List buffers;
            Preconditions.checkState(dataFuture.isDone(), "It should be executed when dataFuture is done.");
            try {
                buffers = (List)dataFuture.get();
            }
            catch (ExecutionException e) {
                writer.fail(jobVertexID, subtaskIndex, e);
                return;
            }
            for (Buffer buffer : buffers) {
                ChannelStateWriteRequest.checkBufferIsBuffer(buffer);
                bufferConsumer.accept((ChannelStateCheckpointWriter)writer, buffer);
            }
        }, throwable -> dataFuture.thenAccept(buffers -> {
            try {
                CloseableIterator.fromList(buffers, Buffer::recycleBuffer).close();
            }
            catch (Exception e) {
                LOG.error("Failed to recycle the output buffer of channel state.", (Throwable)e);
            }
        }), dataFuture);
    }

    static ChannelStateWriteRequest buildWriteRequest(JobVertexID jobVertexID, int subtaskIndex, long checkpointId, String name, CloseableIterator<Buffer> iterator, BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
        return new CheckpointInProgressRequest(name, jobVertexID, subtaskIndex, checkpointId, writer -> {
            while (iterator.hasNext()) {
                Buffer buffer = (Buffer)iterator.next();
                ChannelStateWriteRequest.checkBufferIsBuffer(buffer);
                bufferConsumer.accept((ChannelStateCheckpointWriter)writer, buffer);
            }
        }, throwable -> iterator.close());
    }

    static void checkBufferIsBuffer(Buffer buffer) {
        try {
            Preconditions.checkArgument(buffer.isBuffer());
        }
        catch (Exception e) {
            buffer.recycleBuffer();
            throw e;
        }
    }

    static ChannelStateWriteRequest start(JobVertexID jobVertexID, int subtaskIndex, long checkpointId, ChannelStateWriter.ChannelStateWriteResult targetResult, CheckpointStorageLocationReference locationReference) {
        return new CheckpointStartRequest(jobVertexID, subtaskIndex, checkpointId, targetResult, locationReference);
    }

    static ChannelStateWriteRequest abort(JobVertexID jobVertexID, int subtaskIndex, long checkpointId, Throwable cause) {
        return new CheckpointAbortRequest(jobVertexID, subtaskIndex, checkpointId, cause);
    }

    static ChannelStateWriteRequest registerSubtask(JobVertexID jobVertexID, int subtaskIndex) {
        return new SubtaskRegisterRequest(jobVertexID, subtaskIndex);
    }

    static ChannelStateWriteRequest releaseSubtask(JobVertexID jobVertexID, int subtaskIndex) {
        return new SubtaskReleaseRequest(jobVertexID, subtaskIndex);
    }

    static ThrowingConsumer<Throwable, Exception> recycle(Buffer[] flinkBuffers) {
        return unused -> {
            for (Buffer b : flinkBuffers) {
                b.recycleBuffer();
            }
        };
    }
}

