/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;

class WriteBundlesToFiles<DestinationT, ElementT>
extends DoFn<KV<DestinationT, ElementT>, Result<DestinationT>> {
    private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
    private transient Map<DestinationT, TableRowWriter> writers;
    private transient Map<DestinationT, BoundedWindow> writerWindows;
    private final PCollectionView<String> tempFilePrefixView;
    private final TupleTag<KV<ShardedKey<DestinationT>, ElementT>> unwrittenRecordsTag;
    private final int maxNumWritersPerBundle;
    private final long maxFileSize;
    private final SerializableFunction<ElementT, TableRow> toRowFunction;
    private int spilledShardNumber;

    WriteBundlesToFiles(PCollectionView<String> tempFilePrefixView, TupleTag<KV<ShardedKey<DestinationT>, ElementT>> unwrittenRecordsTag, int maxNumWritersPerBundle, long maxFileSize, SerializableFunction<ElementT, TableRow> toRowFunction) {
        this.tempFilePrefixView = tempFilePrefixView;
        this.unwrittenRecordsTag = unwrittenRecordsTag;
        this.maxNumWritersPerBundle = maxNumWritersPerBundle;
        this.maxFileSize = maxFileSize;
        this.toRowFunction = toRowFunction;
    }

    @DoFn.StartBundle
    public void startBundle() {
        this.writers = Maps.newHashMap();
        this.writerWindows = Maps.newHashMap();
        this.spilledShardNumber = ThreadLocalRandom.current().nextInt(10);
    }

    TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePrefix, BoundedWindow window) throws Exception {
        TableRowWriter writer = new TableRowWriter(tempFilePrefix);
        this.writers.put(destination, writer);
        this.writerWindows.put(destination, window);
        return writer;
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext c, @DoFn.Element KV<DestinationT, ElementT> element, BoundedWindow window) throws Exception {
        TableRowWriter writer;
        String tempFilePrefix = (String)c.sideInput(this.tempFilePrefixView);
        Object destination = ((KV)c.element()).getKey();
        if (this.writers.containsKey(destination)) {
            writer = this.writers.get(destination);
        } else if (this.writers.size() <= this.maxNumWritersPerBundle) {
            writer = this.createAndInsertWriter(destination, tempFilePrefix, window);
        } else {
            c.output(this.unwrittenRecordsTag, (Object)KV.of((Object)ShardedKey.of((Object)destination, (int)(++this.spilledShardNumber % 10)), (Object)element.getValue()));
            return;
        }
        if (writer.getByteSize() > this.maxFileSize) {
            writer.close();
            TableRowWriter.Result result = writer.getResult();
            c.output(new Result<Object>(result.resourceId.toString(), result.byteSize, destination));
            writer = this.createAndInsertWriter(destination, tempFilePrefix, window);
        }
        try {
            writer.write((TableRow)this.toRowFunction.apply(element.getValue()));
        }
        catch (Exception e) {
            try {
                writer.close();
            }
            catch (Exception closeException) {
                e.addSuppressed(closeException);
            }
            throw e;
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn.FinishBundleContext c) throws Exception {
        ArrayList exceptionList = Lists.newArrayList();
        for (TableRowWriter tableRowWriter : this.writers.values()) {
            try {
                tableRowWriter.close();
            }
            catch (Exception e) {
                exceptionList.add(e);
            }
        }
        if (!exceptionList.isEmpty()) {
            IOException e = new IOException("Failed to close some writers");
            for (Exception thrown : exceptionList) {
                e.addSuppressed(thrown);
            }
            throw e;
        }
        for (Map.Entry entry : this.writers.entrySet()) {
            try {
                Object destination = entry.getKey();
                TableRowWriter writer = (TableRowWriter)entry.getValue();
                TableRowWriter.Result result = writer.getResult();
                c.output(new Result(result.resourceId.toString(), result.byteSize, destination), this.writerWindows.get(destination).maxTimestamp(), this.writerWindows.get(destination));
            }
            catch (Exception e) {
                exceptionList.add(e);
            }
        }
        this.writers.clear();
    }

    public static class ResultCoder<DestinationT>
    extends StructuredCoder<Result<DestinationT>> {
        private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
        private static final VarLongCoder longCoder = VarLongCoder.of();
        private final Coder<DestinationT> destinationCoder;

        public static <DestinationT> ResultCoder<DestinationT> of(Coder<DestinationT> destinationCoder) {
            return new ResultCoder<DestinationT>(destinationCoder);
        }

        ResultCoder(Coder<DestinationT> destinationCoder) {
            this.destinationCoder = destinationCoder;
        }

        public void encode(Result<DestinationT> value, OutputStream outStream) throws IOException {
            if (value == null) {
                throw new CoderException("cannot encode a null value");
            }
            stringCoder.encode(value.filename, outStream);
            longCoder.encode(value.fileByteSize, outStream);
            this.destinationCoder.encode(value.destination, outStream);
        }

        public Result<DestinationT> decode(InputStream inStream) throws IOException {
            String filename = stringCoder.decode(inStream);
            long fileByteSize = longCoder.decode(inStream);
            Object destination = this.destinationCoder.decode(inStream);
            return new Result<Object>(filename, fileByteSize, destination);
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.singletonList(this.destinationCoder);
        }

        public void verifyDeterministic() {
        }
    }

    static final class Result<DestinationT>
    implements Serializable {
        private static final long serialVersionUID = 1L;
        public final String filename;
        public final Long fileByteSize;
        public final DestinationT destination;

        public Result(String filename, Long fileByteSize, DestinationT destination) {
            Preconditions.checkNotNull(destination);
            this.filename = filename;
            this.fileByteSize = fileByteSize;
            this.destination = destination;
        }

        public boolean equals(Object other) {
            if (other instanceof Result) {
                Result o = (Result)other;
                return Objects.equals(this.filename, o.filename) && Objects.equals(this.fileByteSize, o.fileByteSize) && Objects.equals(this.destination, o.destination);
            }
            return false;
        }

        public int hashCode() {
            return Objects.hash(this.filename, this.fileByteSize, this.destination);
        }

        public String toString() {
            return "Result{filename='" + this.filename + '\'' + ", fileByteSize=" + this.fileByteSize + ", destination=" + this.destination + '}';
        }
    }
}

