/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowInfo;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BatchedStreamingWrite<ErrorT, ElementT>
extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollectionTuple> {
    private static final TupleTag<Void> mainOutputTag = new TupleTag("mainOutput");
    static final TupleTag<TableRow> SUCCESSFUL_ROWS_TAG = new TupleTag("successfulRows");
    private static final Logger LOG = LoggerFactory.getLogger(BatchedStreamingWrite.class);
    private final BigQueryServices bqServices;
    private final InsertRetryPolicy retryPolicy;
    private final TupleTag<ErrorT> failedOutputTag;
    private final AtomicCoder<ErrorT> failedOutputCoder;
    private final ErrorContainer<ErrorT> errorContainer;
    private final boolean skipInvalidRows;
    private final boolean ignoreUnknownValues;
    private final boolean ignoreInsertIds;
    private final SerializableFunction<ElementT, TableRow> toTableRow;
    private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
    private final Set<String> allowedMetricUrns;
    private Counter byteCounter = SinkMetrics.bytesWritten();
    private final boolean batchViaStateful;
    private static final Duration BATCH_MAX_BUFFERING_DURATION = Duration.millis((long)200L);

    public BatchedStreamingWrite(BigQueryServices bqServices, InsertRetryPolicy retryPolicy, TupleTag<ErrorT> failedOutputTag, AtomicCoder<ErrorT> failedOutputCoder, ErrorContainer<ErrorT> errorContainer, boolean skipInvalidRows, boolean ignoreUnknownValues, boolean ignoreInsertIds, SerializableFunction<ElementT, TableRow> toTableRow, SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
        this.bqServices = bqServices;
        this.retryPolicy = retryPolicy;
        this.failedOutputTag = failedOutputTag;
        this.failedOutputCoder = failedOutputCoder;
        this.errorContainer = errorContainer;
        this.skipInvalidRows = skipInvalidRows;
        this.ignoreUnknownValues = ignoreUnknownValues;
        this.ignoreInsertIds = ignoreInsertIds;
        this.toTableRow = toTableRow;
        this.toFailsafeTableRow = toFailsafeTableRow;
        this.allowedMetricUrns = BatchedStreamingWrite.getAllowedMetricUrns();
        this.batchViaStateful = false;
    }

    private BatchedStreamingWrite(BigQueryServices bqServices, InsertRetryPolicy retryPolicy, TupleTag<ErrorT> failedOutputTag, AtomicCoder<ErrorT> failedOutputCoder, ErrorContainer<ErrorT> errorContainer, boolean skipInvalidRows, boolean ignoreUnknownValues, boolean ignoreInsertIds, SerializableFunction<ElementT, TableRow> toTableRow, SerializableFunction<ElementT, TableRow> toFailsafeTableRow, boolean batchViaStateful) {
        this.bqServices = bqServices;
        this.retryPolicy = retryPolicy;
        this.failedOutputTag = failedOutputTag;
        this.failedOutputCoder = failedOutputCoder;
        this.errorContainer = errorContainer;
        this.skipInvalidRows = skipInvalidRows;
        this.ignoreUnknownValues = ignoreUnknownValues;
        this.ignoreInsertIds = ignoreInsertIds;
        this.toTableRow = toTableRow;
        this.toFailsafeTableRow = toFailsafeTableRow;
        this.allowedMetricUrns = BatchedStreamingWrite.getAllowedMetricUrns();
        this.batchViaStateful = batchViaStateful;
    }

    private static Set<String> getAllowedMetricUrns() {
        ImmutableSet.Builder setBuilder = ImmutableSet.builder();
        setBuilder.add((Object)MonitoringInfoConstants.Urns.API_REQUEST_COUNT);
        setBuilder.add((Object)MonitoringInfoConstants.Urns.API_REQUEST_LATENCIES);
        return setBuilder.build();
    }

    public BatchedStreamingWrite<ErrorT, ElementT> viaDoFnFinalization() {
        return new BatchedStreamingWrite<ErrorT, ElementT>(this.bqServices, this.retryPolicy, this.failedOutputTag, this.failedOutputCoder, this.errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.toTableRow, this.toFailsafeTableRow, false);
    }

    public BatchedStreamingWrite<ErrorT, ElementT> viaStateful() {
        return new BatchedStreamingWrite<ErrorT, ElementT>(this.bqServices, this.retryPolicy, this.failedOutputTag, this.failedOutputCoder, this.errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.toTableRow, this.toFailsafeTableRow, true);
    }

    public PCollectionTuple expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
        return this.batchViaStateful ? (PCollectionTuple)input.apply((PTransform)new ViaStateful()) : (PCollectionTuple)input.apply((PTransform)new ViaBundleFinalization());
    }

    private void flushRows(BigQueryServices.DatasetService datasetService, TableReference tableReference, List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows, List<String> uniqueIds, List<ValueInSingleWindow<ErrorT>> failedInserts, List<ValueInSingleWindow<TableRow>> successfulInserts) throws InterruptedException {
        if (!tableRows.isEmpty()) {
            try {
                long totalBytes = datasetService.insertAll(tableReference, tableRows, uniqueIds, this.retryPolicy, failedInserts, this.errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, successfulInserts);
                this.byteCounter.inc(totalBytes);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void reportStreamingApiLogging(BigQueryOptions options) {
        MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer();
        if (processWideContainer instanceof MetricsLogger) {
            MetricsLogger processWideMetricsLogger = (MetricsLogger)processWideContainer;
            processWideMetricsLogger.tryLoggingMetrics("API call Metrics: \n", this.allowedMetricUrns, (long)options.getBqStreamingApiLoggingFrequencySec().intValue() * 1000L);
        }
    }

    private class InsertBatchedElements
    extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>, Void> {
        @Nullable
        private transient BigQueryServices.DatasetService datasetService;

        private InsertBatchedElements() {
        }

        private BigQueryServices.DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
            if (this.datasetService == null) {
                this.datasetService = BatchedStreamingWrite.this.bqServices.getDatasetService((BigQueryOptions)pipelineOptions.as(BigQueryOptions.class));
            }
            return this.datasetService;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>> input, BoundedWindow window, DoFn.ProcessContext context, DoFn.MultiOutputReceiver out) throws InterruptedException, IOException {
            ArrayList<FailsafeValueInSingleWindow> tableRows = new ArrayList<FailsafeValueInSingleWindow>();
            ArrayList<String> uniqueIds = new ArrayList<String>();
            for (TableRowInfo row : (Iterable)input.getValue()) {
                TableRow tableRow = (TableRow)BatchedStreamingWrite.this.toTableRow.apply(row.tableRow);
                TableRow failsafeTableRow = (TableRow)BatchedStreamingWrite.this.toFailsafeTableRow.apply(row.tableRow);
                tableRows.add(FailsafeValueInSingleWindow.of((Object)tableRow, (Instant)context.timestamp(), (BoundedWindow)window, (PaneInfo)context.pane(), (Object)failsafeTableRow));
                uniqueIds.add(row.uniqueId);
            }
            LOG.info("Writing to BigQuery using Auto-sharding. Flushing {} rows.", (Object)tableRows.size());
            BigQueryOptions options = (BigQueryOptions)context.getPipelineOptions().as(BigQueryOptions.class);
            TableReference tableReference = BigQueryHelpers.parseTableSpec((String)((ShardedKey)input.getKey()).getKey());
            ArrayList failedInserts = Lists.newArrayList();
            ArrayList successfulInserts = Lists.newArrayList();
            BatchedStreamingWrite.this.flushRows(this.getDatasetService(options), tableReference, tableRows, uniqueIds, failedInserts, successfulInserts);
            for (ValueInSingleWindow row : failedInserts) {
                out.get(BatchedStreamingWrite.this.failedOutputTag).output(row.getValue());
            }
            for (ValueInSingleWindow row : successfulInserts) {
                out.get(SUCCESSFUL_ROWS_TAG).output((Object)((TableRow)row.getValue()));
            }
            BatchedStreamingWrite.this.reportStreamingApiLogging(options);
        }

        @DoFn.Teardown
        public void onTeardown() {
            try {
                if (this.datasetService != null) {
                    this.datasetService.close();
                    this.datasetService = null;
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private class ViaStateful
    extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollectionTuple> {
        private ViaStateful() {
        }

        public PCollectionTuple expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
            BigQueryOptions options = (BigQueryOptions)input.getPipeline().getOptions().as(BigQueryOptions.class);
            Duration maxBufferingDuration = options.getMaxBufferingDurationMilliSec() > 0 ? Duration.millis((long)options.getMaxBufferingDurationMilliSec().intValue()) : BATCH_MAX_BUFFERING_DURATION;
            KvCoder inputCoder = (KvCoder)input.getCoder();
            TableRowInfoCoder valueCoder = (TableRowInfoCoder)((Object)inputCoder.getCoderArguments().get(1));
            PCollectionTuple result = (PCollectionTuple)((PCollection)((PCollection)input.apply((PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)DefaultTrigger.of()).discardingFiredPanes())).apply((PTransform)GroupIntoBatches.ofSize((long)options.getMaxStreamingRowsToBatch()).withMaxBufferingDuration(maxBufferingDuration).withShardedKey())).setCoder((Coder)KvCoder.of((Coder)ShardedKey.Coder.of((Coder)StringUtf8Coder.of()), (Coder)IterableCoder.of((Coder)valueCoder))).apply((PTransform)ParDo.of((DoFn)new InsertBatchedElements()).withOutputTags(mainOutputTag, TupleTagList.of((TupleTag)BatchedStreamingWrite.this.failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));
            result.get(BatchedStreamingWrite.this.failedOutputTag).setCoder((Coder)BatchedStreamingWrite.this.failedOutputCoder);
            result.get(SUCCESSFUL_ROWS_TAG).setCoder((Coder)TableRowJsonCoder.of());
            return result;
        }
    }

    @VisibleForTesting
    private class BatchAndInsertElements
    extends DoFn<KV<String, TableRowInfo<ElementT>>, Void> {
        private transient Map<String, List<FailsafeValueInSingleWindow<TableRow, TableRow>>> tableRows;
        private transient Map<String, List<String>> uniqueIdsForTableRows;
        @Nullable
        private transient BigQueryServices.DatasetService datasetService;

        private BatchAndInsertElements() {
        }

        private BigQueryServices.DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
            if (this.datasetService == null) {
                this.datasetService = BatchedStreamingWrite.this.bqServices.getDatasetService((BigQueryOptions)pipelineOptions.as(BigQueryOptions.class));
            }
            return this.datasetService;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.tableRows = new HashMap<String, List<FailsafeValueInSingleWindow<TableRow, TableRow>>>();
            this.uniqueIdsForTableRows = new HashMap<String, List<String>>();
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<String, TableRowInfo<ElementT>> element, @DoFn.Timestamp Instant timestamp, BoundedWindow window, PaneInfo pane) {
            String tableSpec = (String)element.getKey();
            TableRow tableRow = (TableRow)BatchedStreamingWrite.this.toTableRow.apply(((TableRowInfo)element.getValue()).tableRow);
            TableRow failsafeTableRow = (TableRow)BatchedStreamingWrite.this.toFailsafeTableRow.apply(((TableRowInfo)element.getValue()).tableRow);
            this.tableRows.computeIfAbsent(tableSpec, k -> new ArrayList()).add(FailsafeValueInSingleWindow.of((Object)tableRow, (Instant)timestamp, (BoundedWindow)window, (PaneInfo)pane, (Object)failsafeTableRow));
            this.uniqueIdsForTableRows.computeIfAbsent(tableSpec, k -> new ArrayList()).add(((TableRowInfo)element.getValue()).uniqueId);
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext context) throws Exception {
            ArrayList failedInserts = Lists.newArrayList();
            ArrayList successfulInserts = Lists.newArrayList();
            BigQueryOptions options = (BigQueryOptions)context.getPipelineOptions().as(BigQueryOptions.class);
            for (Map.Entry<String, List<FailsafeValueInSingleWindow<TableRow, TableRow>>> entry : this.tableRows.entrySet()) {
                TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
                BatchedStreamingWrite.this.flushRows(this.getDatasetService(options), tableReference, entry.getValue(), this.uniqueIdsForTableRows.get(entry.getKey()), failedInserts, successfulInserts);
            }
            this.tableRows.clear();
            this.uniqueIdsForTableRows.clear();
            for (ValueInSingleWindow row : failedInserts) {
                context.output(BatchedStreamingWrite.this.failedOutputTag, row.getValue(), row.getTimestamp(), row.getWindow());
            }
            BatchedStreamingWrite.this.reportStreamingApiLogging(options);
        }

        @DoFn.Teardown
        public void onTeardown() {
            try {
                if (this.datasetService != null) {
                    this.datasetService.close();
                    this.datasetService = null;
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private class ViaBundleFinalization
    extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollectionTuple> {
        private ViaBundleFinalization() {
        }

        public PCollectionTuple expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
            PCollectionTuple result = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new BatchAndInsertElements()).withOutputTags(mainOutputTag, TupleTagList.of((TupleTag)BatchedStreamingWrite.this.failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));
            result.get(BatchedStreamingWrite.this.failedOutputTag).setCoder((Coder)BatchedStreamingWrite.this.failedOutputCoder);
            result.get(SUCCESSFUL_ROWS_TAG).setCoder((Coder)TableRowJsonCoder.of());
            return result;
        }
    }
}

