/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceCoordinatorProvider<SplitT extends SourceSplit>
extends RecreateOnResetOperatorCoordinator.Provider {
    private static final long serialVersionUID = -1921681440009738462L;
    private final String operatorName;
    private final Source<?, SplitT, ?> source;
    private final int numWorkerThreads;
    private final WatermarkAlignmentParams alignmentParams;
    @Nullable
    private final String coordinatorListeningID;

    public SourceCoordinatorProvider(String operatorName, OperatorID operatorID, Source<?, SplitT, ?> source, int numWorkerThreads, WatermarkAlignmentParams alignmentParams, @Nullable String coordinatorListeningID) {
        super(operatorID);
        this.operatorName = operatorName;
        this.source = source;
        this.numWorkerThreads = numWorkerThreads;
        this.alignmentParams = alignmentParams;
        this.coordinatorListeningID = coordinatorListeningID;
    }

    @Override
    public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
        String coordinatorThreadName = "SourceCoordinator-" + this.operatorName;
        CoordinatorExecutorThreadFactory coordinatorThreadFactory = new CoordinatorExecutorThreadFactory(coordinatorThreadName, context);
        SimpleVersionedSerializer splitSerializer = this.source.getSplitSerializer();
        SourceCoordinatorContext sourceCoordinatorContext = new SourceCoordinatorContext(context.getJobID(), coordinatorThreadFactory, this.numWorkerThreads, context, splitSerializer, context.isConcurrentExecutionAttemptsSupported());
        return new SourceCoordinator(context.getJobID(), this.operatorName, this.source, sourceCoordinatorContext, context.getCoordinatorStore(), this.alignmentParams, this.coordinatorListeningID);
    }

    public static class CoordinatorExecutorThreadFactory
    implements ThreadFactory,
    Thread.UncaughtExceptionHandler {
        private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorProvider.class);
        private final String coordinatorThreadName;
        private final ClassLoader cl;
        private final Thread.UncaughtExceptionHandler errorHandler;
        @Nullable
        private Thread t;

        CoordinatorExecutorThreadFactory(String coordinatorThreadName, OperatorCoordinator.Context context) {
            this(coordinatorThreadName, context.getUserCodeClassloader(), (t, e) -> {
                LOG.error("Thread '{}' produced an uncaught exception. Failing the job.", (Object)t.getName(), (Object)e);
                context.failJob(e);
            });
        }

        CoordinatorExecutorThreadFactory(String coordinatorThreadName, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler errorHandler) {
            this.coordinatorThreadName = coordinatorThreadName;
            this.cl = contextClassLoader;
            this.errorHandler = errorHandler;
        }

        @Override
        public synchronized Thread newThread(Runnable r) {
            Preconditions.checkState((this.t == null ? 1 : 0) != 0, (Object)"Please using the new CoordinatorExecutorThreadFactory, this factory cannot new multiple threads.");
            this.t = new Thread(r, this.coordinatorThreadName);
            this.t.setContextClassLoader(this.cl);
            this.t.setUncaughtExceptionHandler(this);
            return this.t;
        }

        @Override
        public synchronized void uncaughtException(Thread t, Throwable e) {
            this.errorHandler.uncaughtException(t, e);
        }

        String getCoordinatorThreadName() {
            return this.coordinatorThreadName;
        }

        boolean isCurrentThreadCoordinatorThread() {
            return Thread.currentThread() == this.t;
        }
    }
}

