/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.ShardedKey;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowInfo;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.values.KV;

@SystemDoFnInternal
@VisibleForTesting
class StreamingWriteFn
extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
    private final BigQueryServices bqServices;
    private transient Map<String, List<TableRow>> tableRows;
    private transient Map<String, List<String>> uniqueIdsForTableRows;
    private Counter byteCounter = SinkMetrics.bytesWritten();

    StreamingWriteFn(BigQueryServices bqServices) {
        this.bqServices = bqServices;
    }

    @DoFn.StartBundle
    public void startBundle() {
        this.tableRows = new HashMap<String, List<TableRow>>();
        this.uniqueIdsForTableRows = new HashMap<String, List<String>>();
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext context) {
        String tableSpec = (String)((ShardedKey)((KV)context.element()).getKey()).getKey();
        List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(this.tableRows, tableSpec);
        List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(this.uniqueIdsForTableRows, tableSpec);
        rows.add(((TableRowInfo)((KV)context.element()).getValue()).tableRow);
        uniqueIds.add(((TableRowInfo)((KV)context.element()).getValue()).uniqueId);
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn.FinishBundleContext context) throws Exception {
        BigQueryOptions options = (BigQueryOptions)context.getPipelineOptions().as(BigQueryOptions.class);
        for (Map.Entry<String, List<TableRow>> entry : this.tableRows.entrySet()) {
            TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
            this.flushRows(tableReference, entry.getValue(), this.uniqueIdsForTableRows.get(entry.getKey()), options);
        }
        this.tableRows.clear();
        this.uniqueIdsForTableRows.clear();
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
    }

    private void flushRows(TableReference tableReference, List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) throws InterruptedException {
        if (!tableRows.isEmpty()) {
            try {
                long totalBytes = this.bqServices.getDatasetService(options).insertAll(tableReference, tableRows, uniqueIds);
                this.byteCounter.inc(totalBytes);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

