/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.sdk.fn.data;

import java.io.IOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;

public class BeamFnDataTimeBasedBufferingOutboundObserver<T>
extends BeamFnDataSizeBasedBufferingOutboundObserver<T> {
    @VisibleForTesting
    final ScheduledFuture<?> flushFuture;

    BeamFnDataTimeBasedBufferingOutboundObserver(int sizeLimit, long timeLimit, LogicalEndpoint outputLocation, Coder<T> coder, StreamObserver<BeamFnApi.Elements> outboundObserver) {
        super(sizeLimit, outputLocation, coder, outboundObserver);
        this.flushFuture = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DataBufferOutboundFlusher-thread").build()).scheduleAtFixedRate(this::periodicFlush, timeLimit, timeLimit, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() throws Exception {
        this.checkFlushThreadException();
        this.flushFuture.cancel(false);
        try {
            this.flushFuture.get();
        }
        catch (ExecutionException ee) {
            this.unwrapExecutionException(ee);
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
        super.close();
    }

    @Override
    public synchronized void flush() throws IOException {
        super.flush();
    }

    @Override
    public void accept(T t) throws IOException {
        this.checkFlushThreadException();
        super.accept(t);
    }

    private void periodicFlush() {
        try {
            this.flush();
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    private void checkFlushThreadException() throws IOException {
        if (this.flushFuture.isDone()) {
            try {
                this.flushFuture.get();
                throw new IOException("Periodic flushing thread finished unexpectedly.");
            }
            catch (ExecutionException ee) {
                this.unwrapExecutionException(ee);
            }
            catch (CancellationException ce) {
                throw new IOException(ce);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new IOException(ie);
            }
        }
    }

    private void unwrapExecutionException(ExecutionException ee) throws IOException {
        RuntimeException re = (RuntimeException)ee.getCause();
        if (re.getCause() instanceof IOException) {
            throw (IOException)re.getCause();
        }
        throw new IOException(re.getCause());
    }
}

