/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.sdk.fn.data;

import java.io.IOException;
import java.io.OutputStream;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataSizeBasedBufferingOutboundObserver<T>
implements BeamFnDataBufferingOutboundObserver<T> {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataSizeBasedBufferingOutboundObserver.class);
    private long byteCounter;
    private long counter;
    private boolean closed;
    private final int sizeLimit;
    private final Coder<T> coder;
    private final LogicalEndpoint outputLocation;
    private final StreamObserver<BeamFnApi.Elements> outboundObserver;
    private final ByteString.Output bufferedElements;

    BeamFnDataSizeBasedBufferingOutboundObserver(int sizeLimit, LogicalEndpoint outputLocation, Coder<T> coder, StreamObserver<BeamFnApi.Elements> outboundObserver) {
        this.sizeLimit = sizeLimit;
        this.outputLocation = outputLocation;
        this.coder = coder;
        this.outboundObserver = outboundObserver;
        this.bufferedElements = ByteString.newOutput();
        this.closed = false;
    }

    @Override
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        BeamFnApi.Elements.Builder elements = this.convertBufferForTransmission();
        if (this.outputLocation.isTimer()) {
            elements.addTimersBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setTimerFamilyId(this.outputLocation.getTimerFamilyId()).setIsLast(true);
        } else {
            elements.addDataBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setIsLast(true);
        }
        LOG.debug("Closing stream for instruction {} and transform {} having transmitted {} values {} bytes", new Object[]{this.outputLocation.getInstructionId(), this.outputLocation.getTransformId(), this.counter, this.byteCounter});
        this.outboundObserver.onNext((Object)elements.build());
    }

    @Override
    public void flush() throws IOException {
        if (this.bufferedElements.size() > 0) {
            this.outboundObserver.onNext((Object)this.convertBufferForTransmission().build());
        }
    }

    @Override
    public void accept(T t) throws IOException {
        if (this.closed) {
            throw new IllegalStateException("Already closed.");
        }
        this.coder.encode(t, (OutputStream)this.bufferedElements);
        ++this.counter;
        if (this.bufferedElements.size() >= this.sizeLimit) {
            this.flush();
        }
    }

    private BeamFnApi.Elements.Builder convertBufferForTransmission() {
        BeamFnApi.Elements.Builder elements = BeamFnApi.Elements.newBuilder();
        if (this.bufferedElements.size() == 0) {
            return elements;
        }
        if (this.outputLocation.isTimer()) {
            elements.addTimersBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setTimerFamilyId(this.outputLocation.getTimerFamilyId()).setTimers(this.bufferedElements.toByteString());
        } else {
            elements.addDataBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setData(this.bufferedElements.toByteString());
        }
        this.byteCounter += (long)this.bufferedElements.size();
        this.bufferedElements.reset();
        return elements;
    }
}

