/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.continuous;

import java.io.Serializable;
import org.apache.spark.Partition;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkEnv$;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
import org.apache.spark.sql.execution.streaming.continuous.CommitPartitionEpoch;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$;
import org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.continuous.EpochTracker$;
import org.apache.spark.util.Utils$;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001=4Aa\u0003\u0007\u00017!A\u0001\u0006\u0001BA\u0002\u0013\u0005\u0011\u0006\u0003\u00052\u0001\t\u0005\r\u0011\"\u00013\u0011!)\u0004A!A!B\u0013Q\u0003\u0002\u0003\u001c\u0001\u0005\u0003\u0005\u000b\u0011B\u001c\t\u000b\u0001\u0003A\u0011A!\t\u000f\u0019\u0003!\u0019!C!\u000f\"1q\n\u0001Q\u0001\n!CQ\u0001\u0015\u0001\u0005BECQ\u0001\u0017\u0001\u0005BeCQ!\u001c\u0001\u0005B9\u0014!cQ8oi&tWo\\;t/JLG/\u001a*E\t*\u0011QBD\u0001\u000bG>tG/\u001b8v_V\u001c(BA\b\u0011\u0003%\u0019HO]3b[&twM\u0003\u0002\u0012%\u0005IQ\r_3dkRLwN\u001c\u0006\u0003'Q\t1a]9m\u0015\t)b#A\u0003ta\u0006\u00148N\u0003\u0002\u00181\u00051\u0011\r]1dQ\u0016T\u0011!G\u0001\u0004_J<7\u0001A\n\u0003\u0001q\u00012!\b\u0011#\u001b\u0005q\"BA\u0010\u0015\u0003\r\u0011H\rZ\u0005\u0003Cy\u00111A\u0015#E!\t\u0019c%D\u0001%\u0015\u0005)\u0013!B:dC2\f\u0017BA\u0014%\u0005\u0011)f.\u001b;\u0002\tA\u0014XM^\u000b\u0002UA\u0019Q\u0004I\u0016\u0011\u00051zS\"A\u0017\u000b\u00059\u0012\u0012\u0001C2bi\u0006d\u0017p\u001d;\n\u0005Aj#aC%oi\u0016\u0014h.\u00197S_^\f\u0001\u0002\u001d:fm~#S-\u001d\u000b\u0003EMBq\u0001\u000e\u0002\u0002\u0002\u0003\u0007!&A\u0002yIE\nQ\u0001\u001d:fm\u0002\nQb\u001e:ji\u0016\u0014h)Y2u_JL\bC\u0001\u001d?\u001b\u0005I$BA\b;\u0015\tYD(A\u0003xe&$XM\u0003\u0002>%\u0005I1m\u001c8oK\u000e$xN]\u0005\u0003\u007fe\u0012!d\u0015;sK\u0006l\u0017N\\4ECR\fwK]5uKJ4\u0015m\u0019;pef\fa\u0001P5oSRtDc\u0001\"E\u000bB\u00111\tA\u0007\u0002\u0019!)\u0001&\u0002a\u0001U!)a'\u0002a\u0001o\u0005Y\u0001/\u0019:uSRLwN\\3s+\u0005A\u0005cA\u0012J\u0017&\u0011!\n\n\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u00051kU\"\u0001\u000b\n\u00059#\"a\u0003)beRLG/[8oKJ\fA\u0002]1si&$\u0018n\u001c8fe\u0002\nQbZ3u!\u0006\u0014H/\u001b;j_:\u001cX#\u0001*\u0011\u0007\r\u001aV+\u0003\u0002UI\t)\u0011I\u001d:bsB\u0011AJV\u0005\u0003/R\u0011\u0011\u0002U1si&$\u0018n\u001c8\u0002\u000f\r|W\u000e];uKR\u0019!L\u001a5\u0011\u0007m\u001b'E\u0004\u0002]C:\u0011Q\fY\u0007\u0002=*\u0011qLG\u0001\u0007yI|w\u000e\u001e \n\u0003\u0015J!A\u0019\u0013\u0002\u000fA\f7m[1hK&\u0011A-\u001a\u0002\t\u0013R,'/\u0019;pe*\u0011!\r\n\u0005\u0006O&\u0001\r!V\u0001\u0006gBd\u0017\u000e\u001e\u0005\u0006S&\u0001\rA[\u0001\bG>tG/\u001a=u!\ta5.\u0003\u0002m)\tYA+Y:l\u0007>tG/\u001a=u\u0003E\u0019G.Z1s\t\u0016\u0004XM\u001c3f]\u000eLWm\u001d\u000b\u0002E\u0001")
public class ContinuousWriteRDD
extends RDD<BoxedUnit> {
    private RDD<InternalRow> prev;
    private final StreamingDataWriterFactory writerFactory;
    private final Option<Partitioner> partitioner;

    public RDD<InternalRow> prev() {
        return this.prev;
    }

    public void prev_$eq(RDD<InternalRow> x$1) {
        this.prev = x$1;
    }

    public Option<Partitioner> partitioner() {
        return this.partitioner;
    }

    public Partition[] getPartitions() {
        return this.prev().partitions();
    }

    public Iterator<BoxedUnit> compute(Partition split, TaskContext context) {
        RpcEndpointRef epochCoordinator = EpochCoordinatorRef$.MODULE$.get(context.getLocalProperty(ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY()), SparkEnv$.MODULE$.get());
        EpochTracker$.MODULE$.initializeCurrentEpoch(new StringOps(Predef$.MODULE$.augmentString(context.getLocalProperty(ContinuousExecution$.MODULE$.START_EPOCH_KEY()))).toLong());
        while (!context.isInterrupted() && !context.isCompleted()) {
            ObjectRef dataWriter = ObjectRef.create(null);
            Utils$.MODULE$.tryWithSafeFinallyAndFailureCallbacks((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                try {
                    Iterator dataIterator = this.prev().compute(split, context);
                    dataWriter$1.elem = $this.writerFactory.createWriter(context.partitionId(), context.taskAttemptId(), BoxesRunTime.unboxToLong((Object)EpochTracker$.MODULE$.getCurrentEpoch().get()));
                    while (dataIterator.hasNext()) {
                        ((DataWriter)dataWriter$1.elem).write(dataIterator.next());
                    }
                    this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Writer for partition ").append(context.partitionId()).append(" ").append("in epoch ").append(EpochTracker$.MODULE$.getCurrentEpoch().get()).append(" is committing.").toString());
                    WriterCommitMessage msg = ((DataWriter)dataWriter$1.elem).commit();
                    epochCoordinator.send((Object)new CommitPartitionEpoch(context.partitionId(), BoxesRunTime.unboxToLong((Object)EpochTracker$.MODULE$.getCurrentEpoch().get()), msg));
                    this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Writer for partition ").append(context.partitionId()).append(" ").append("in epoch ").append(EpochTracker$.MODULE$.getCurrentEpoch().get()).append(" committed.").toString());
                    EpochTracker$.MODULE$.incrementCurrentEpoch();
                }
                catch (InterruptedException interruptedException) {}
            }, (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                this.logError((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(34).append("Writer for partition ").append(context.partitionId()).append(" is aborting.").toString());
                if ((DataWriter)dataWriter$1.elem != null) {
                    ((DataWriter)dataWriter$1.elem).abort();
                }
                this.logError((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Writer for partition ").append(context.partitionId()).append(" aborted.").toString());
            }, (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> ((DataWriter)dataWriter$1.elem).close());
        }
        return package$.MODULE$.Iterator().apply((Seq)Nil$.MODULE$);
    }

    public void clearDependencies() {
        super.clearDependencies();
        this.prev_$eq(null);
    }

    public ContinuousWriteRDD(RDD<InternalRow> prev, StreamingDataWriterFactory writerFactory) {
        this.prev = prev;
        this.writerFactory = writerFactory;
        super(prev, ClassTag$.MODULE$.Unit());
        this.partitioner = this.prev().partitioner();
    }
}

