/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.connect.pipelines;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.connect.proto.ExecutePlanResponse;
import org.apache.spark.connect.proto.PipelineEvent;
import org.apache.spark.connect.proto.PipelineEventResult;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKey;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.internal.MessageWithContext;
import org.apache.spark.sql.connect.service.SessionHolder;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.pipelines.common.FlowStatus$;
import org.apache.spark.sql.pipelines.logging.EventDetails;
import org.apache.spark.sql.pipelines.logging.FlowProgress;
import org.apache.spark.sql.pipelines.logging.PipelineEvent;
import org.apache.spark.sql.pipelines.logging.RunProgress;
import org.apache.spark.util.ThreadUtils$;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.sparkproject.connect.grpc.stub.StreamObserver;
import org.sparkproject.connect.protobuf.Timestamp;
import scala.Function0;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.SeqOps;
import scala.collection.StringOps$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.LambdaDeserialize;
import scala.runtime.ScalaRunTime$;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0005\u0005]a\u0001\u0002\t\u0012\u0001yA\u0001b\r\u0001\u0003\u0002\u0003\u0006I\u0001\u000e\u0005\t\u000b\u0002\u0011\t\u0011)A\u0005\r\")A\n\u0001C\u0001\u001b\"9!\u000b\u0001b\u0001\n\u001b\u0019\u0006BB,\u0001A\u00035A\u000bC\u0004Y\u0001\t\u0007I\u0011B-\t\r\t\u0004\u0001\u0015!\u0003[\u0011\u001d\u0019\u0007A1A\u0005\n\u0011Daa\u001b\u0001!\u0002\u0013)\u0007\"\u00027\u0001\t\u0003i\u0007\"\u0002>\u0001\t\u0013Y\bbBA\u0001\u0001\u0011\u0005\u00131\u0001\u0005\b\u0003\u000b\u0001A\u0011AA\u0002\u0011!\t9\u0001\u0001C\u0001'\u0005%\u0001bBA\u0007\u0001\u0011%\u0011q\u0002\u0002\u0014!&\u0004X\r\\5oK\u00163XM\u001c;TK:$WM\u001d\u0006\u0003%M\t\u0011\u0002]5qK2Lg.Z:\u000b\u0005Q)\u0012aB2p]:,7\r\u001e\u0006\u0003-]\t1a]9m\u0015\tA\u0012$A\u0003ta\u0006\u00148N\u0003\u0002\u001b7\u00051\u0011\r]1dQ\u0016T\u0011\u0001H\u0001\u0004_J<7\u0001A\n\u0005\u0001})3\u0006\u0005\u0002!G5\t\u0011EC\u0001#\u0003\u0015\u00198-\u00197b\u0013\t!\u0013E\u0001\u0004B]f\u0014VM\u001a\t\u0003M%j\u0011a\n\u0006\u0003Q]\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003U\u001d\u0012q\u0001T8hO&tw\r\u0005\u0002-c5\tQF\u0003\u0002/_\u0005!A.\u00198h\u0015\u0005\u0001\u0014\u0001\u00026bm\u0006L!AM\u0017\u0003\u001b\u0005+Ho\\\"m_N,\u0017M\u00197f\u0003A\u0011Xm\u001d9p]N,wJY:feZ,'\u000fE\u00026yyj\u0011A\u000e\u0006\u0003oa\nAa\u001d;vE*\u0011\u0011HO\u0001\u0005OJ\u00048MC\u0001<\u0003\tIw.\u0003\u0002>m\tq1\u000b\u001e:fC6|%m]3sm\u0016\u0014\bCA D\u001b\u0005\u0001%BA!C\u0003\u0015\u0001(o\u001c;p\u0015\t!r#\u0003\u0002E\u0001\n\u0019R\t_3dkR,\u0007\u000b\\1o%\u0016\u001c\bo\u001c8tK\u0006i1/Z:tS>t\u0007j\u001c7eKJ\u0004\"a\u0012&\u000e\u0003!S!!S\n\u0002\u000fM,'O^5dK&\u00111\n\u0013\u0002\u000e'\u0016\u001c8/[8o\u0011>dG-\u001a:\u0002\rqJg.\u001b;?)\rq\u0005+\u0015\t\u0003\u001f\u0002i\u0011!\u0005\u0005\u0006g\r\u0001\r\u0001\u000e\u0005\u0006\u000b\u000e\u0001\rAR\u0001\u000ecV,W/Z\"ba\u0006\u001c\u0017\u000e^=\u0016\u0003Q\u0003\"\u0001I+\n\u0005Y\u000b#aA%oi\u0006q\u0011/^3vK\u000e\u000b\u0007/Y2jif\u0004\u0013\u0001C3yK\u000e,Ho\u001c:\u0016\u0003i\u0003\"a\u00171\u000e\u0003qS!!\u00180\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002`_\u0005!Q\u000f^5m\u0013\t\tGL\u0001\nUQJ,\u0017\r\u001a)p_2,\u00050Z2vi>\u0014\u0018!C3yK\u000e,Ho\u001c:!\u0003)I7o\u00155vi\u0012|wO\\\u000b\u0002KB\u0011a-[\u0007\u0002O*\u0011\u0001\u000eX\u0001\u0007CR|W.[2\n\u0005)<'!D!u_6L7MQ8pY\u0016\fg.A\u0006jgNCW\u000f\u001e3po:\u0004\u0013!C:f]\u0012,e/\u001a8u)\tq\u0017\u000f\u0005\u0002!_&\u0011\u0001/\t\u0002\u0005+:LG\u000fC\u0003s\u0015\u0001\u00071/A\u0003fm\u0016tG\u000f\u0005\u0002uq6\tQO\u0003\u0002wo\u00069An\\4hS:<'B\u0001\n\u0016\u0013\tIXOA\u0007QSB,G.\u001b8f\u000bZ,g\u000e^\u0001\u0013g\"|W\u000f\u001c3F]F,X-^3Fm\u0016tG\u000f\u0006\u0002}\u007fB\u0011\u0001%`\u0005\u0003}\u0006\u0012qAQ8pY\u0016\fg\u000eC\u0003s\u0017\u0001\u00071/A\u0003dY>\u001cX\rF\u0001o\u0003!\u0019\b.\u001e;e_^t\u0017!E:f]\u0012,e/\u001a8u)>\u001cE.[3oiR\u0019a.a\u0003\t\u000bIt\u0001\u0019A:\u0002'\r|gn\u001d;sk\u000e$\bK]8u_\u00163XM\u001c;\u0015\t\u0005E\u0011Q\u0003\t\u0004\u007f\u0005M\u0011BA=A\u0011\u0015\u0011x\u00021\u0001t\u0001")
public class PipelineEventSender
implements Logging,
AutoCloseable {
    private final StreamObserver<ExecutePlanResponse> responseObserver;
    private final SessionHolder sessionHolder;
    private final int queueCapacity;
    private final ThreadPoolExecutor executor;
    private final AtomicBoolean isShutdown;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public Logging.LogStringContext LogStringContext(StringContext sc) {
        return Logging.LogStringContext$((Logging)this, (StringContext)sc);
    }

    public void withLogContext(Map<String, String> context, Function0<BoxedUnit> body) {
        Logging.withLogContext$((Logging)this, context, body);
    }

    public MDC MDC(LogKey key, Object value) {
        return Logging.MDC$((Logging)this, (LogKey)key, (Object)value);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logInfo(LogEntry entry) {
        Logging.logInfo$((Logging)this, (LogEntry)entry);
    }

    public void logInfo(LogEntry entry, Throwable throwable) {
        Logging.logInfo$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logDebug(LogEntry entry) {
        Logging.logDebug$((Logging)this, (LogEntry)entry);
    }

    public void logDebug(LogEntry entry, Throwable throwable) {
        Logging.logDebug$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logTrace(LogEntry entry) {
        Logging.logTrace$((Logging)this, (LogEntry)entry);
    }

    public void logTrace(LogEntry entry, Throwable throwable) {
        Logging.logTrace$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logWarning(LogEntry entry) {
        Logging.logWarning$((Logging)this, (LogEntry)entry);
    }

    public void logWarning(LogEntry entry, Throwable throwable) {
        Logging.logWarning$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logError(LogEntry entry) {
        Logging.logError$((Logging)this, (LogEntry)entry);
    }

    public void logError(LogEntry entry, Throwable throwable) {
        Logging.logError$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void logBasedOnLevel(Level level, Function0<MessageWithContext> f) {
        Logging.logBasedOnLevel$((Logging)this, (Level)level, f);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private final int queueCapacity() {
        return this.queueCapacity;
    }

    private ThreadPoolExecutor executor() {
        return this.executor;
    }

    private AtomicBoolean isShutdown() {
        return this.isShutdown;
    }

    public void sendEvent(PipelineEvent event) {
        PipelineEventSender pipelineEventSender = this;
        synchronized (pipelineEventSender) {
            if (!this.isShutdown().get()) {
                Object object = this.shouldEnqueueEvent(event) ? this.executor().submit(new Runnable(this, event){
                    private final /* synthetic */ PipelineEventSender $outer;
                    private final PipelineEvent event$1;

                    public void run() {
                        try {
                            this.$outer.sendEventToClient(this.event$1);
                        }
                        catch (Throwable throwable) {
                            Throwable throwable2;
                            Throwable throwable3 = throwable;
                            if (throwable3 != null && NonFatal$.MODULE$.apply(throwable2 = throwable3)) {
                                this.$outer.logError(LogEntry$.MODULE$.from((Function0 & Serializable)() -> $this.$outer.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to send pipeline event to client: "}))).log((Seq)Nil$.MODULE$).$plus($this.$outer.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{$this.$outer.MDC((LogKey)LogKeys.ERROR, $this.event$1.message())})))), throwable2);
                            }
                            throw throwable;
                        }
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.event$1 = event$1;
                    }

                    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                        return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(org.apache.spark.sql.connect.pipelines.PipelineEventSender$$anon$1 )}, serializedLambda);
                    }
                }) : BoxedUnit.UNIT;
            } else {
                throw new IllegalStateException("Cannot send event after shutdown for session " + this.sessionHolder.sessionId());
            }
        }
    }

    private boolean shouldEnqueueEvent(PipelineEvent event) {
        FlowProgress flowProgress;
        EventDetails eventDetails = event.details();
        if (eventDetails instanceof RunProgress) {
            return true;
        }
        if (eventDetails instanceof FlowProgress && FlowStatus$.MODULE$.isTerminal((flowProgress = (FlowProgress)eventDetails).status())) {
            return true;
        }
        return this.executor().getQueue().size() < this.queueCapacity();
    }

    @Override
    public void close() {
        this.shutdown();
    }

    public void shutdown() {
        if (this.isShutdown().compareAndSet(false, true)) {
            this.executor().shutdown();
            if (!this.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {
                this.logError(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Pipeline event sender for session "}))).log((Seq)Nil$.MODULE$).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"", " failed to terminate"}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{this.MDC((LogKey)LogKeys.SESSION_ID, $this.sessionHolder.sessionId())})))));
                v0 = this.executor().shutdownNow();
            } else {
                v0 = BoxedUnit.UNIT;
            }
            this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Pipeline event sender shutdown completed for session "}))).log((Seq)Nil$.MODULE$).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{this.MDC((LogKey)LogKeys.SESSION_ID, $this.sessionHolder.sessionId())})))));
            return;
        }
    }

    public void sendEventToClient(PipelineEvent event) {
        try {
            org.apache.spark.connect.proto.PipelineEvent protoEvent = this.constructProtoEvent(event);
            this.responseObserver.onNext(ExecutePlanResponse.newBuilder().setSessionId(this.sessionHolder.sessionId()).setServerSideSessionId(this.sessionHolder.serverSessionId()).setPipelineEventResult(PipelineEventResult.newBuilder().setEvent(protoEvent).build()).build());
        }
        catch (Throwable throwable) {
            Throwable throwable2;
            Throwable throwable3 = throwable;
            if (throwable3 != null && NonFatal$.MODULE$.apply(throwable2 = throwable3)) {
                this.logError(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to send pipeline event to client: "}))).log((Seq)Nil$.MODULE$).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{this.MDC((LogKey)LogKeys.ERROR, event.message())})))), throwable2);
            }
            throw throwable;
        }
    }

    private org.apache.spark.connect.proto.PipelineEvent constructProtoEvent(PipelineEvent event) {
        String string;
        if (event.error().nonEmpty()) {
            Seq errorMessages = PipelineEventSender.getExceptionMessages$1((Throwable)event.error().get());
            string = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(event.message() + "\n         |Error: " + errorMessages.mkString("\n")));
        } else {
            string = event.message();
        }
        String message = string;
        PipelineEvent.Builder protoEventBuilder = org.apache.spark.connect.proto.PipelineEvent.newBuilder().setTimestamp(Timestamp.newBuilder().setSeconds(event.timestamp().getTime() / 1000L).setNanos(event.timestamp().getNanos()).build()).setMessage(message);
        return protoEventBuilder.build();
    }

    private static final Seq getExceptionMessages$1(Throwable throwable2) {
        String string = throwable2.getMessage();
        return (Seq)((SeqOps)Option$.MODULE$.apply((Object)throwable2.getCause()).map((Function1 & Serializable)throwable -> PipelineEventSender.getExceptionMessages$1(throwable)).getOrElse((Function0 & Serializable)() -> Nil$.MODULE$)).$plus$colon((Object)string);
    }

    public PipelineEventSender(StreamObserver<ExecutePlanResponse> responseObserver, SessionHolder sessionHolder) {
        this.responseObserver = responseObserver;
        this.sessionHolder = sessionHolder;
        Logging.$init$((Logging)this);
        this.queueCapacity = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(sessionHolder.session().conf().get(SQLConf$.MODULE$.PIPELINES_EVENT_QUEUE_CAPACITY().key())));
        this.executor = ThreadUtils$.MODULE$.newDaemonSingleThreadExecutor("PipelineEventSender-" + sessionHolder.sessionId());
        this.isShutdown = new AtomicBoolean(false);
    }
}

