/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.CreateTableHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFinalizeWritesDoFn;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TwoLevelMessageConverterCache;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
extends PTransform<PCollection<KV<DestinationT, ElementT>>, PCollection<Void>> {
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiWriteUnshardedRecords.class);
    private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
    private final BigQueryIO.Write.CreateDisposition createDisposition;
    private final String kmsKey;
    private final BigQueryServices bqServices;
    private final Coder<DestinationT> destinationCoder;
    @Nullable
    private BigQueryServices.DatasetService datasetService = null;
    private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();

    public StorageApiWriteUnshardedRecords(StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations, BigQueryIO.Write.CreateDisposition createDisposition, String kmsKey, BigQueryServices bqServices, Coder<DestinationT> destinationCoder) {
        this.dynamicDestinations = dynamicDestinations;
        this.createDisposition = createDisposition;
        this.kmsKey = kmsKey;
        this.bqServices = bqServices;
        this.destinationCoder = destinationCoder;
    }

    private void initializeDatasetService(PipelineOptions pipelineOptions) {
        if (this.datasetService == null) {
            this.datasetService = this.bqServices.getDatasetService((BigQueryOptions)pipelineOptions.as(BigQueryOptions.class));
        }
    }

    public PCollection<Void> expand(PCollection<KV<DestinationT, ElementT>> input) {
        String operationName = input.getName() + "/" + this.getName();
        return (PCollection)((PCollection)((PCollection)input.apply("Write Records", (PTransform)ParDo.of((DoFn)new WriteRecordsDoFn(operationName)).withSideInputs(this.dynamicDestinations.getSideInputs()))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("Reshuffle", (PTransform)Reshuffle.of())).apply("Finalize writes", (PTransform)ParDo.of((DoFn)new StorageApiFinalizeWritesDoFn(this.bqServices)));
    }

    private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) {
        executor.submit(() -> {
            try {
                task.run();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
    }

    class WriteRecordsDoFn
    extends DoFn<KV<DestinationT, ElementT>, KV<String, String>> {
        private Map<DestinationT, org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.DestinationState> destinations = Maps.newHashMap();
        private final TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;

        WriteRecordsDoFn(String operationName) {
            this.messageConverters = new TwoLevelMessageConverterCache(operationName);
        }

        @DoFn.StartBundle
        public void startBundle() throws IOException {
            this.destinations = Maps.newHashMap();
        }

        org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.DestinationState createDestinationState(DoFn.ProcessContext c, DestinationT destination) {
            StorageApiDynamicDestinations.MessageConverter messageConverter;
            TableDestination tableDestination1 = StorageApiWriteUnshardedRecords.this.dynamicDestinations.getTable(destination);
            Preconditions.checkArgument((tableDestination1 != null ? 1 : 0) != 0, (String)"DynamicDestinations.getTable() may not return null, but %s returned null for destination %s", (Object)StorageApiWriteUnshardedRecords.this.dynamicDestinations, destination);
            Supplier schemaSupplier = () -> StorageApiWriteUnshardedRecords.this.dynamicDestinations.getSchema(destination);
            TableDestination createdTable = CreateTableHelpers.possiblyCreateTable(c, tableDestination1, (Supplier<TableSchema>)schemaSupplier, StorageApiWriteUnshardedRecords.this.createDisposition, StorageApiWriteUnshardedRecords.this.destinationCoder, StorageApiWriteUnshardedRecords.this.kmsKey, StorageApiWriteUnshardedRecords.this.bqServices);
            try {
                messageConverter = this.messageConverters.get(destination, StorageApiWriteUnshardedRecords.this.dynamicDestinations);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return new DestinationState(createdTable.getTableUrn(), messageConverter, StorageApiWriteUnshardedRecords.this.datasetService);
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c, PipelineOptions pipelineOptions, @DoFn.Element KV<DestinationT, ElementT> element) throws Exception {
            StorageApiWriteUnshardedRecords.this.initializeDatasetService(pipelineOptions);
            StorageApiWriteUnshardedRecords.this.dynamicDestinations.setSideInputAccessorFromProcessContext(c);
            DestinationState state = this.destinations.computeIfAbsent(element.getKey(), k -> this.createDestinationState(c, k));
            if (state.shouldFlush()) {
                state.flush();
            }
            state.addMessage(element.getValue());
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext context) throws Exception {
            for (DestinationState destinationState : this.destinations.values()) {
                destinationState.flush();
                context.output((Object)KV.of((Object)destinationState.tableUrn, (Object)destinationState.streamName), BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L), (BoundedWindow)GlobalWindow.INSTANCE);
            }
        }

        @DoFn.Teardown
        public void teardown() {
            for (DestinationState destinationState : this.destinations.values()) {
                destinationState.close();
            }
        }

        class DestinationState {
            private final String tableUrn;
            private final StorageApiDynamicDestinations.MessageConverter<ElementT> messageConverter;
            private String streamName = "";
            @Nullable
            private BigQueryServices.StreamAppendClient streamAppendClient = null;
            private long currentOffset = 0L;
            private List<ByteString> pendingMessages;
            @Nullable
            private BigQueryServices.DatasetService datasetService;

            public DestinationState(String tableUrn, StorageApiDynamicDestinations.MessageConverter<ElementT> messageConverter, BigQueryServices.DatasetService datasetService) {
                this.tableUrn = tableUrn;
                this.messageConverter = messageConverter;
                this.pendingMessages = Lists.newArrayList();
                this.datasetService = datasetService;
            }

            void close() {
                if (this.streamAppendClient != null) {
                    try {
                        this.streamAppendClient.close();
                        this.streamAppendClient = null;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }

            BigQueryServices.StreamAppendClient getWriteStream() {
                try {
                    if (this.streamAppendClient == null) {
                        this.streamName = ((BigQueryServices.DatasetService)Preconditions.checkNotNull((Object)this.datasetService)).createWriteStream(this.tableUrn, WriteStream.Type.PENDING).getName();
                        this.streamAppendClient = ((BigQueryServices.DatasetService)Preconditions.checkNotNull((Object)this.datasetService)).getStreamAppendClient(this.streamName);
                        this.currentOffset = 0L;
                    }
                    return this.streamAppendClient;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }

            void invalidateWriteStream() {
                try {
                    StorageApiWriteUnshardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, this.streamAppendClient::close);
                    this.streamAppendClient = null;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }

            void addMessage(ElementT element) throws Exception {
                ByteString message = this.messageConverter.toMessage(element).toByteString();
                this.pendingMessages.add(message);
                if (this.shouldFlush()) {
                    this.flush();
                }
            }

            boolean shouldFlush() {
                return this.pendingMessages.size() > 100;
            }

            void flush() throws Exception {
                if (this.pendingMessages.isEmpty()) {
                    return;
                }
                ProtoRows.Builder inserts = ProtoRows.newBuilder();
                for (ByteString m : this.pendingMessages) {
                    inserts.addSerializedRows(m);
                }
                ProtoRows protoRows = inserts.build();
                this.pendingMessages.clear();
                RetryManager retryManager = new RetryManager(Duration.standardSeconds((long)1L), Duration.standardMinutes((long)1L), 5);
                retryManager.addOperation(c -> {
                    try {
                        long offset = this.currentOffset;
                        this.currentOffset += (long)inserts.getSerializedRowsCount();
                        return this.getWriteStream().appendRows(offset, protoRows, this.messageConverter.getSchemaDescriptor());
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }, contexts -> {
                    LOG.info("Append to stream " + this.streamName + " failed with error " + ((RetryManager.Operation.Context)Iterables.getFirst((Iterable)contexts, null)).getError());
                    this.invalidateWriteStream();
                    return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                }, response -> LOG.info("Append to stream {} succeeded.", (Object)this.streamName), new RetryManager.Operation.Context());
                retryManager.run(true);
            }
        }
    }

    private static interface ThrowingRunnable {
        public void run() throws Exception;
    }
}

