/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.connect.planner;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
import org.apache.spark.SparkException;
import org.apache.spark.api.python.PythonException;
import org.apache.spark.api.python.PythonFunction;
import org.apache.spark.api.python.PythonWorkerUtils$;
import org.apache.spark.api.python.SimplePythonFunction;
import org.apache.spark.api.python.SpecialLengths$;
import org.apache.spark.api.python.StreamingPythonRunner;
import org.apache.spark.api.python.StreamingPythonRunner$;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKey;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.internal.MessageWithContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders;
import org.apache.spark.sql.connect.common.ForeachWriterPacket;
import org.apache.spark.sql.connect.config.Connect$;
import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper;
import org.apache.spark.sql.connect.service.SessionHolder;
import org.apache.spark.sql.connect.service.SparkConnectService$;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;

public final class StreamingForeachBatchHelper$
implements Logging {
    public static final StreamingForeachBatchHelper$ MODULE$ = new StreamingForeachBatchHelper$();
    private static transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        Logging.$init$((Logging)MODULE$);
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public Logging.LogStringContext LogStringContext(StringContext sc) {
        return Logging.LogStringContext$((Logging)this, (StringContext)sc);
    }

    public void withLogContext(Map<String, String> context, Function0<BoxedUnit> body) {
        Logging.withLogContext$((Logging)this, context, body);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logInfo(LogEntry entry) {
        Logging.logInfo$((Logging)this, (LogEntry)entry);
    }

    public void logInfo(LogEntry entry, Throwable throwable) {
        Logging.logInfo$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logDebug(LogEntry entry) {
        Logging.logDebug$((Logging)this, (LogEntry)entry);
    }

    public void logDebug(LogEntry entry, Throwable throwable) {
        Logging.logDebug$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logTrace(LogEntry entry) {
        Logging.logTrace$((Logging)this, (LogEntry)entry);
    }

    public void logTrace(LogEntry entry, Throwable throwable) {
        Logging.logTrace$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logWarning(LogEntry entry) {
        Logging.logWarning$((Logging)this, (LogEntry)entry);
    }

    public void logWarning(LogEntry entry, Throwable throwable) {
        Logging.logWarning$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logError(LogEntry entry) {
        Logging.logError$((Logging)this, (LogEntry)entry);
    }

    public void logError(LogEntry entry, Throwable throwable) {
        Logging.logError$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void logBasedOnLevel(Level level, Function0<MessageWithContext> f) {
        Logging.logBasedOnLevel$((Logging)this, (Level)level, f);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Function2<Dataset<Row>, Object, BoxedUnit> dataFrameCachingWrapper(Function1<StreamingForeachBatchHelper.FnArgsWithId, BoxedUnit> fn, SessionHolder sessionHolder) {
        return (Function2 & Serializable)(df, batchId) -> {
            StreamingForeachBatchHelper$.$anonfun$dataFrameCachingWrapper$1(sessionHolder, fn, df, BoxesRunTime.unboxToLong((Object)batchId));
            return BoxedUnit.UNIT;
        };
    }

    public Function2<Dataset<Row>, Object, BoxedUnit> scalaForeachBatchWrapper(byte[] payloadBytes, SessionHolder sessionHolder) {
        ForeachWriterPacket foreachBatchPkt = (ForeachWriterPacket)Utils$.MODULE$.deserialize(payloadBytes, Utils$.MODULE$.getContextOrSparkClassLoader());
        Function2 fn = (Function2)foreachBatchPkt.foreachWriter();
        AgnosticEncoder<?> encoder = foreachBatchPkt.datasetEncoder();
        return this.dataFrameCachingWrapper((Function1<StreamingForeachBatchHelper.FnArgsWithId, BoxedUnit>)(Function1 & Serializable)args -> {
            StreamingForeachBatchHelper$.$anonfun$scalaForeachBatchWrapper$1(encoder, fn, args);
            return BoxedUnit.UNIT;
        }, sessionHolder);
    }

    public Tuple2<Function2<Dataset<Row>, Object, BoxedUnit>, AutoCloseable> pythonForeachBatchWrapper(SimplePythonFunction pythonFn, SessionHolder sessionHolder) {
        int port = SparkConnectService$.MODULE$.localPort();
        ObjectRef connectUrl = ObjectRef.create((Object)("sc://localhost:" + port + "/;user_id=" + sessionHolder.userId()));
        Connect$.MODULE$.getAuthenticateToken().foreach((Function1 & Serializable)token -> {
            connectUrl.elem = (String)connectUrl.elem + ";token=" + token;
            return BoxedUnit.UNIT;
        });
        StreamingPythonRunner runner = StreamingPythonRunner$.MODULE$.apply((PythonFunction)pythonFn, (String)connectUrl.elem, sessionHolder.sessionId(), "pyspark.sql.connect.streaming.worker.foreach_batch_worker");
        this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"[session: ", "] "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.SESSION_ID$.MODULE$, (Object)sessionHolder.sessionId())})).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"[userId: ", "] Initializing Python runner, "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.USER_ID$.MODULE$, (Object)sessionHolder.userId())}))).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"pythonExec: ", ")"}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.PYTHON_EXEC$.MODULE$, (Object)pythonFn.pythonExec())})))));
        Tuple2 tuple2 = runner.init();
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        DataOutputStream dataOut = (DataOutputStream)tuple2._1();
        DataInputStream dataIn = (DataInputStream)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)dataOut, (Object)dataIn);
        DataOutputStream dataOut2 = (DataOutputStream)tuple22._1();
        DataInputStream dataIn2 = (DataInputStream)tuple22._2();
        Function1 & Serializable foreachBatchRunnerFn = (Function1 & Serializable)args -> {
            StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(dataOut2, dataIn2, sessionHolder, args);
            return BoxedUnit.UNIT;
        };
        return new Tuple2(this.dataFrameCachingWrapper((Function1<StreamingForeachBatchHelper.FnArgsWithId, BoxedUnit>)foreachBatchRunnerFn, sessionHolder), (Object)new StreamingForeachBatchHelper.RunnerCleaner(runner));
    }

    public static final /* synthetic */ void $anonfun$dataFrameCachingWrapper$1(SessionHolder sessionHolder$1, Function1 fn$1, Dataset df, long batchId) {
        String dfId = UUID.randomUUID().toString();
        MODULE$.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"[session: ", "] "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.SESSION_ID$.MODULE$, (Object)sessionHolder$1.sessionId())})).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Caching DataFrame with id ", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.DATAFRAME_ID$.MODULE$, (Object)dfId)})))));
        sessionHolder$1.cacheDataFrameById(dfId, (Dataset<Row>)df);
        try {
            fn$1.apply((Object)new StreamingForeachBatchHelper.FnArgsWithId(dfId, (Dataset<Row>)df, batchId));
        }
        finally {
            MODULE$.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"[session: ", "] "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.SESSION_ID$.MODULE$, (Object)sessionHolder$1.sessionId())})).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Removing DataFrame with id ", " from the cache"}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.DATAFRAME_ID$.MODULE$, (Object)dfId)})))));
            sessionHolder$1.removeCachedDataFrame(dfId);
        }
    }

    public static final /* synthetic */ void $anonfun$scalaForeachBatchWrapper$1(AgnosticEncoder encoder$1, Function2 fn$2, StreamingForeachBatchHelper.FnArgsWithId args) {
        try {
            Dataset ds = AgnosticEncoders.UnboundRowEncoder$.MODULE$.equals(encoder$1) ? args.df() : args.df().as((Encoder)encoder$1);
            fn$2.apply((Object)ds, (Object)BoxesRunTime.boxToLong((long)args.batchId()));
        }
        catch (Throwable t2) {
            MODULE$.logError((Function0<String>)(Function0 & Serializable)() -> "Calling foreachBatch fn failed", t2);
            throw t2;
        }
    }

    public static final /* synthetic */ void $anonfun$pythonForeachBatchWrapper$3(DataOutputStream dataOut$1, DataInputStream dataIn$1, SessionHolder sessionHolder$2, StreamingForeachBatchHelper.FnArgsWithId args) {
        PythonWorkerUtils$.MODULE$.writeUTF(args.dfId(), dataOut$1);
        dataOut$1.writeLong(args.batchId());
        dataOut$1.flush();
        try {
            int n = dataIn$1.readInt();
            if (0 != n) {
                if (SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN() == n) {
                    String msg = PythonWorkerUtils$.MODULE$.readUTF(dataIn$1);
                    throw new PythonException("[session: " + sessionHolder$2.sessionId() + "] [userId: " + sessionHolder$2.userId() + "] Found error inside foreachBatch Python process: " + msg, null);
                }
                throw new IllegalStateException("[session: " + sessionHolder$2.sessionId() + "] [userId: " + sessionHolder$2.userId() + "] Unexpected return value " + n + " from the Python worker.");
            }
            MODULE$.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"[session: ", "] "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.SESSION_ID$.MODULE$, (Object)sessionHolder$2.sessionId())})).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"[userId: ", "] "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.USER_ID$.MODULE$, (Object)sessionHolder$2.userId())}))).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Python foreach batch for dfId ", " "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.DATAFRAME_ID$.MODULE$, (Object)args.dfId())}))).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"completed (ret: 0)"}))).log((Seq)Nil$.MODULE$))));
        }
        catch (EOFException eof) {
            throw new SparkException("[session: " + sessionHolder$2.sessionId() + "] [userId: " + sessionHolder$2.userId() + "] Python worker exited unexpectedly (crashed)", (Throwable)eof);
        }
    }

    private StreamingForeachBatchHelper$() {
    }
}

