/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RecordWriter<T extends IOReadableWritable>
implements AvailabilityProvider {
    @VisibleForTesting
    public static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
    private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
    protected final ResultPartitionWriter targetPartition;
    protected final int numberOfSubpartitions;
    protected final DataOutputSerializer serializer;
    protected final Random rng = new XORShiftRandom();
    protected final boolean flushAlways;
    @Nullable
    private final OutputFlusher outputFlusher;
    private Throwable flusherException;
    private volatile Throwable volatileFlusherException;
    private int volatileFlusherExceptionCheckSkipCount;
    private static final int VOLATILE_FLUSHER_EXCEPTION_MAX_CHECK_SKIP_COUNT = 100;

    RecordWriter(ResultPartitionWriter writer, long timeout, String taskName) {
        this.targetPartition = writer;
        this.numberOfSubpartitions = writer.getNumberOfSubpartitions();
        this.serializer = new DataOutputSerializer(128);
        Preconditions.checkArgument((timeout >= -1L ? 1 : 0) != 0);
        boolean bl = this.flushAlways = timeout == 0L;
        if (timeout == -1L || timeout == 0L) {
            this.outputFlusher = null;
        } else {
            Object threadName = taskName == null ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "OutputFlusher for " + taskName;
            this.outputFlusher = new OutputFlusher((String)threadName, timeout);
            this.outputFlusher.start();
        }
    }

    public void emit(T record, int targetSubpartition) throws IOException {
        this.checkErroneous();
        this.targetPartition.emitRecord(RecordWriter.serializeRecord(this.serializer, record), targetSubpartition);
        if (this.flushAlways) {
            this.targetPartition.flush(targetSubpartition);
        }
    }

    protected void emit(ByteBuffer record, int targetSubpartition) throws IOException {
        this.checkErroneous();
        this.targetPartition.emitRecord(record, targetSubpartition);
        if (this.flushAlways) {
            this.targetPartition.flush(targetSubpartition);
        }
    }

    public void broadcastEvent(AbstractEvent event) throws IOException {
        this.broadcastEvent(event, false);
    }

    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        this.targetPartition.broadcastEvent(event, isPriorityEvent);
        if (this.flushAlways) {
            this.flushAll();
        }
    }

    public void alignedBarrierTimeout(long checkpointId) throws IOException {
        this.targetPartition.alignedBarrierTimeout(checkpointId);
    }

    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
        this.targetPartition.abortCheckpoint(checkpointId, cause);
    }

    @VisibleForTesting
    public static ByteBuffer serializeRecord(DataOutputSerializer serializer, IOReadableWritable record) throws IOException {
        serializer.setPositionUnsafe(4);
        record.write((DataOutputView)serializer);
        serializer.writeIntUnsafe(serializer.length() - 4, 0);
        return serializer.wrapAsByteBuffer();
    }

    public void flushAll() {
        this.targetPartition.flushAll();
    }

    public void setMetricGroup(TaskIOMetricGroup metrics) {
        this.targetPartition.setMetricGroup(metrics);
    }

    public int getNumberOfSubpartitions() {
        return this.numberOfSubpartitions;
    }

    public boolean isSubpartitionDerivable() {
        return !(this.targetPartition instanceof ResultPartition) || !((ResultPartition)this.targetPartition).isNumberOfPartitionConsumerUndefined();
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        return this.targetPartition.getAvailableFuture();
    }

    public abstract void emit(T var1) throws IOException;

    public void randomEmit(T record) throws IOException {
        this.checkErroneous();
        int targetSubpartition = this.rng.nextInt(this.numberOfSubpartitions);
        this.emit(record, targetSubpartition);
    }

    public abstract void broadcastEmit(T var1) throws IOException;

    public void close() {
        if (this.outputFlusher != null) {
            this.outputFlusher.terminate();
            try {
                this.outputFlusher.join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void notifyFlusherException(Throwable t) {
        if (this.flusherException == null) {
            LOG.error("An exception happened while flushing the outputs", t);
            this.flusherException = t;
            this.volatileFlusherException = t;
        }
    }

    protected void checkErroneous() throws IOException {
        if (this.flusherException != null || this.volatileFlusherExceptionCheckSkipCount >= 100 && this.volatileFlusherException != null) {
            throw new IOException("An exception happened while flushing the outputs", this.volatileFlusherException);
        }
        if (++this.volatileFlusherExceptionCheckSkipCount >= 100) {
            this.volatileFlusherExceptionCheckSkipCount = 0;
        }
    }

    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
        this.targetPartition.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
    }

    @VisibleForTesting
    ResultPartitionWriter getTargetPartition() {
        return this.targetPartition;
    }

    private class OutputFlusher
    extends Thread {
        private final long timeout;
        private volatile boolean running;

        OutputFlusher(String name, long timeout) {
            super(name);
            this.running = true;
            this.setDaemon(true);
            this.timeout = timeout;
        }

        public void terminate() {
            this.running = false;
            this.interrupt();
        }

        @Override
        public void run() {
            try {
                while (this.running) {
                    block5: {
                        try {
                            Thread.sleep(this.timeout);
                        }
                        catch (InterruptedException e) {
                            if (!this.running) break block5;
                            throw new Exception(e);
                        }
                    }
                    RecordWriter.this.flushAll();
                }
            }
            catch (Throwable t) {
                RecordWriter.this.notifyFlusherException(t);
            }
        }
    }
}

