/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sort;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.PushSorter;
import org.apache.flink.streaming.api.operators.sort.FixedLengthByteKeyComparator;
import org.apache.flink.streaming.api.operators.sort.KeyAndValueSerializer;
import org.apache.flink.streaming.api.operators.sort.VariableLengthByteKeyComparator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MutableObjectIterator;

public final class SortingDataInput<T, K>
implements StreamTaskInput<T> {
    private final StreamTaskInput<T> wrappedInput;
    private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter;
    private final KeySelector<T, K> keySelector;
    private final TypeSerializer<K> keySerializer;
    private final DataOutputSerializer dataOutputSerializer;
    private final ForwardingDataOutput forwardingDataOutput;
    private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> sortedInput = null;
    private boolean emittedLast;
    private long watermarkSeen = Long.MIN_VALUE;

    public SortingDataInput(StreamTaskInput<T> wrappedInput, TypeSerializer<T> typeSerializer, TypeSerializer<K> keySerializer, KeySelector<T, K> keySelector, MemoryManager memoryManager, IOManager ioManager, boolean objectReuse, double managedMemoryFraction, Configuration taskManagerConfiguration, TaskInvokable containingTask, ExecutionConfig executionConfig) {
        try {
            TypeComparator comparator;
            this.forwardingDataOutput = new ForwardingDataOutput();
            this.keySelector = keySelector;
            this.keySerializer = keySerializer;
            int keyLength = keySerializer.getLength();
            if (keyLength > 0) {
                this.dataOutputSerializer = new DataOutputSerializer(keyLength);
                comparator = new FixedLengthByteKeyComparator(keyLength);
            } else {
                this.dataOutputSerializer = new DataOutputSerializer(64);
                comparator = new VariableLengthByteKeyComparator();
            }
            KeyAndValueSerializer<T> keyAndValueSerializer = new KeyAndValueSerializer<T>(typeSerializer, keyLength);
            this.wrappedInput = wrappedInput;
            this.sorter = ExternalSorter.newBuilder(memoryManager, containingTask, keyAndValueSerializer, comparator, executionConfig).memoryFraction(managedMemoryFraction).enableSpilling(ioManager, ((Float)taskManagerConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)).floatValue()).maxNumFileHandles((Integer)taskManagerConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)).objectReuse(objectReuse).largeRecords((Boolean)taskManagerConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)).build();
        }
        catch (MemoryAllocationException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public int getInputIndex() {
        return this.wrappedInput.getInputIndex();
    }

    @Override
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
        throw new UnsupportedOperationException("Checkpoints are not supported with sorted inputs in the BATCH runtime.");
    }

    @Override
    public void close() throws IOException {
        IOException ex = null;
        try {
            this.wrappedInput.close();
        }
        catch (IOException e) {
            ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, ex);
        }
        try {
            this.sorter.close();
        }
        catch (IOException e) {
            ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)ex);
        }
        if (ex != null) {
            throw ex;
        }
    }

    @Override
    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        if (this.sortedInput != null) {
            return this.emitNextSortedRecord(output);
        }
        DataInputStatus inputStatus = this.wrappedInput.emitNext(this.forwardingDataOutput);
        if (inputStatus == DataInputStatus.END_OF_DATA) {
            this.endSorting();
            return this.emitNextSortedRecord(output);
        }
        return inputStatus;
    }

    @Nonnull
    private DataInputStatus emitNextSortedRecord(PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        if (this.emittedLast) {
            return DataInputStatus.END_OF_INPUT;
        }
        Tuple2 next = (Tuple2)this.sortedInput.next();
        if (next != null) {
            output.emitRecord((StreamRecord)next.f1);
            return DataInputStatus.MORE_AVAILABLE;
        }
        this.emittedLast = true;
        if (this.watermarkSeen > Long.MIN_VALUE) {
            output.emitWatermark(new Watermark(this.watermarkSeen));
        }
        return DataInputStatus.END_OF_DATA;
    }

    private void endSorting() throws Exception {
        this.sorter.finishReading();
        this.sortedInput = this.sorter.getIterator();
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        if (this.sortedInput != null) {
            return AvailabilityProvider.AVAILABLE;
        }
        return this.wrappedInput.getAvailableFuture();
    }

    private class ForwardingDataOutput
    implements PushingAsyncDataInput.DataOutput<T> {
        private ForwardingDataOutput() {
        }

        @Override
        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
            Object key = SortingDataInput.this.keySelector.getKey(streamRecord.getValue());
            SortingDataInput.this.keySerializer.serialize(key, (DataOutputView)SortingDataInput.this.dataOutputSerializer);
            byte[] serializedKey = SortingDataInput.this.dataOutputSerializer.getCopyOfBuffer();
            SortingDataInput.this.dataOutputSerializer.clear();
            SortingDataInput.this.sorter.writeRecord(Tuple2.of((Object)serializedKey, streamRecord));
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            SortingDataInput.this.watermarkSeen = Math.max(SortingDataInput.this.watermarkSeen, watermark.getTimestamp());
        }

        @Override
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }

        @Override
        public void emitRecordAttributes(RecordAttributes recordAttributes) {
        }
    }
}

