/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.beam.sdk.io.gcp.pubsublite.PerServerPublisherCache;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError;
import org.apache.beam.sdk.io.gcp.pubsublite.Publishers;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;

class PubsubLiteSink
extends DoFn<PubSubMessage, Void> {
    private final PublisherOptions options;
    @GuardedBy(value="this")
    private transient PublisherOrError publisherOrError;
    @GuardedBy(value="this")
    private transient int outstanding;
    @GuardedBy(value="this")
    private transient Deque<CheckedApiException> errorsSinceLastFinish;
    private static final Executor executor = Executors.newCachedThreadPool();

    PubsubLiteSink(PublisherOptions options) {
        this.options = options;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @DoFn.Setup
    public void setup() throws ApiException {
        Publisher<PublishMetadata> publisher = this.options.usesCache() ? PerServerPublisherCache.PUBLISHER_CACHE.get(this.options) : Publishers.newPublisher(this.options);
        PubsubLiteSink pubsubLiteSink = this;
        synchronized (pubsubLiteSink) {
            this.outstanding = 0;
            this.errorsSinceLastFinish = new ArrayDeque<CheckedApiException>();
            this.publisherOrError = PublisherOrError.ofPublisher(publisher);
        }
        final Consumer<Throwable> onFailure = t -> {
            PubsubLiteSink pubsubLiteSink = this;
            synchronized (pubsubLiteSink) {
                this.publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical((Throwable)t));
            }
        };
        publisher.addListener(new ApiService.Listener(){

            public void failed(ApiService.State s, Throwable t) {
                onFailure.accept(t);
            }
        }, MoreExecutors.directExecutor());
        if (!this.options.usesCache()) {
            publisher.startAsync();
        }
    }

    private synchronized void decrementOutstanding() {
        --this.outstanding;
        ((Object)((Object)this)).notify();
    }

    @DoFn.ProcessElement
    public synchronized void processElement(@DoFn.Element PubSubMessage message) throws CheckedApiException {
        ++this.outstanding;
        if (this.publisherOrError.getKind() == PublisherOrError.Kind.ERROR) {
            throw this.publisherOrError.error();
        }
        ApiFuture future = this.publisherOrError.publisher().publish(Message.fromProto((PubSubMessage)message));
        final Consumer<Throwable> onFailure = t -> {
            PubsubLiteSink pubsubLiteSink = this;
            synchronized (pubsubLiteSink) {
                this.decrementOutstanding();
                this.errorsSinceLastFinish.push(ExtractStatus.toCanonical((Throwable)t));
            }
        };
        ApiFutures.addCallback((ApiFuture)future, (ApiFutureCallback)new ApiFutureCallback<PublishMetadata>(){

            public void onSuccess(PublishMetadata publishMetadata) {
                PubsubLiteSink.this.decrementOutstanding();
            }

            public void onFailure(Throwable t) {
                onFailure.accept(t);
            }
        }, (Executor)executor);
    }

    @DoFn.FinishBundle
    public synchronized void finishBundle() throws CheckedApiException, InterruptedException {
        while (this.outstanding > 0) {
            ((Object)((Object)this)).wait();
        }
        if (!this.errorsSinceLastFinish.isEmpty()) {
            CheckedApiException canonical = this.errorsSinceLastFinish.pop();
            while (!this.errorsSinceLastFinish.isEmpty()) {
                canonical.addSuppressed((Throwable)this.errorsSinceLastFinish.pop());
            }
            throw canonical;
        }
        if (this.publisherOrError.getKind() == PublisherOrError.Kind.ERROR) {
            throw this.publisherOrError.error();
        }
    }
}

