/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.DatabaseClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
import org.apache.beam.sdk.io.gcp.spanner.MutationSizeEstimator;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@VisibleForTesting
class SpannerWriteGroupFn
extends DoFn<MutationGroup, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class);
    private final SpannerIO.Write spec;
    private List<MutationGroup> mutations;
    private long batchSizeBytes = 0L;
    private static final int MAX_RETRIES = 5;
    private static final FluentBackoff BUNDLE_WRITE_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(5).withInitialBackoff(Duration.standardSeconds((long)5L));
    private transient SpannerAccessor spannerAccessor;

    @VisibleForTesting
    SpannerWriteGroupFn(SpannerIO.Write spec) {
        this.spec = spec;
    }

    @DoFn.Setup
    public void setup() throws Exception {
        this.spannerAccessor = this.spec.getSpannerConfig().connectToSpanner();
        this.mutations = new ArrayList<MutationGroup>();
        this.batchSizeBytes = 0L;
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        this.spannerAccessor.close();
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext c) throws Exception {
        MutationGroup m = (MutationGroup)c.element();
        this.mutations.add(m);
        this.batchSizeBytes += MutationSizeEstimator.sizeOf(m);
        if (this.batchSizeBytes >= this.spec.getBatchSizeBytes()) {
            this.flushBatch();
        }
    }

    @DoFn.FinishBundle
    public void finishBundle() throws Exception {
        if (!this.mutations.isEmpty()) {
            this.flushBatch();
        }
    }

    private void flushBatch() throws AbortedException, IOException, InterruptedException {
        LOG.debug("Writing batch of {} mutations", (Object)this.mutations.size());
        Sleeper sleeper = Sleeper.DEFAULT;
        BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
        DatabaseClient databaseClient = this.spannerAccessor.getDatabaseClient();
        while (true) {
            try {
                databaseClient.writeAtLeastOnce(Iterables.concat(this.mutations));
            }
            catch (AbortedException exception) {
                LOG.error("Error writing to Spanner ({}): {}", (Object)exception.getCode(), (Object)exception.getMessage());
                if (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) continue;
                LOG.error("Aborting after {} retries.", (Object)5);
                throw exception;
            }
            break;
        }
        LOG.debug("Successfully wrote {} mutations", (Object)this.mutations.size());
        this.mutations = new ArrayList<MutationGroup>();
        this.batchSizeBytes = 0L;
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        this.spec.populateDisplayData(builder);
    }
}

