package org.apache.storm.daemon.worker;

import com.codahale.metrics.Meter;
import com.codahale.metrics.SharedMetricRegistries;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.ExecutorShutdown;
import org.apache.storm.executor.IRunningExecutor;
import org.apache.storm.executor.LocalExecutor;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.multilang.JsonSerializer;
import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.shade.com.google.common.base.Preconditions;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.shade.org.apache.commons.lang.ObjectUtils;
import org.apache.storm.shade.uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.SupervisorClient;
import org.apache.storm.utils.SupervisorIfaceFactory;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/worker/Worker.class */
public class Worker implements Shutdownable, DaemonCommon {
    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
    private final Map<String, Object> conf;
    private final Map<String, Object> topologyConf;
    private final IContext context;
    private final String topologyId;
    private final String assignmentId;
    private final int supervisorPort;
    private final int port;
    private final String workerId;
    private final LogConfigManager logConfigManager;
    private final StormMetricRegistry metricRegistry;
    private Meter heatbeatMeter;
    private WorkerState workerState;
    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    private Thread transferThread;
    private Subject subject;
    private Collection<IAutoCredentials> autoCreds;
    private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;

    public Worker(Map<String, Object> map, IContext iContext, String str, String str2, int i, int i2, String str3, Supplier<SupervisorIfaceFactory> supplier) throws IOException {
        this.conf = map;
        this.context = iContext;
        this.topologyId = str;
        this.assignmentId = str2;
        this.supervisorPort = i;
        this.port = i2;
        this.workerId = str3;
        this.logConfigManager = new LogConfigManager();
        this.metricRegistry = new StormMetricRegistry();
        this.topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(map, str));
        this.topologyConf.put(Config.PACEMAKER_AUTH_METHOD, "NONE");
        map.put(Config.PACEMAKER_AUTH_METHOD, "NONE");
        if (supplier == null) {
            this.supervisorIfaceSupplier = () -> {
                try {
                    return SupervisorClient.getConfiguredClient(this.topologyConf, Utils.hostname(), i);
                } catch (UnknownHostException e) {
                    throw Utils.wrapInRuntime(e);
                }
            };
        } else {
            this.supervisorIfaceSupplier = supplier;
        }
    }

    public Worker(Map<String, Object> map, IContext iContext, String str, String str2, int i, int i2, String str3) throws IOException {
        this(map, iContext, str, str2, i, i2, str3, null);
    }

    public static void main(String[] strArr) throws Exception {
        Preconditions.checkArgument(strArr.length == 5, "Illegal number of arguments. Expected: 5, Actual: " + strArr.length);
        String str = strArr[0];
        String str2 = strArr[1];
        String str3 = strArr[2];
        String str4 = strArr[3];
        String str5 = strArr[4];
        Map<String, Object> readStormConfig = ConfigUtils.readStormConfig();
        Utils.setupWorkerUncaughtExceptionHandler();
        StormCommon.validateDistributedMode(readStormConfig);
        Worker worker = new Worker(readStormConfig, null, str, str2, Integer.parseInt(str3), Integer.parseInt(str4), str5);
        int intValue = ObjectReader.getInt(readStormConfig.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)).intValue();
        LOG.info("Adding shutdown hook with kill in {} secs", Integer.valueOf(intValue));
        worker.getClass();
        Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, intValue);
        worker.start();
    }

    public void start() throws Exception {
        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", new Object[]{this.topologyId, this.assignmentId, Integer.valueOf(this.port), this.workerId, ConfigUtils.maskPasswords(this.conf)});
        if (!ConfigUtils.isLocalMode(this.conf)) {
            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
            String processPid = Utils.processPid();
            FileUtils.touch(new File(ConfigUtils.workerPidPath(this.conf, this.workerId, processPid)));
            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(this.conf, this.topologyId, Integer.valueOf(this.port))), processPid, Charset.forName(JsonSerializer.DEFAULT_CHARSET));
        }
        ClusterStateContext clusterStateContext = new ClusterStateContext(DaemonType.WORKER, this.topologyConf);
        IStateStorage mkStateStorage = ClusterUtils.mkStateStorage(this.conf, this.topologyConf, clusterStateContext);
        IStormClusterState mkStormClusterState = ClusterUtils.mkStormClusterState(mkStateStorage, null, clusterStateContext);
        this.metricRegistry.start(this.topologyConf, this.port);
        SharedMetricRegistries.add(Constants.WORKER_METRICS_REGISTRY, this.metricRegistry.getRegistry());
        Credentials credentials = mkStormClusterState.credentials(this.topologyId, null);
        HashMap hashMap = new HashMap();
        if (credentials != null) {
            hashMap.putAll(credentials.get_creds());
        }
        this.autoCreds = ClientAuthUtils.getAutoCredentials(this.topologyConf);
        this.subject = ClientAuthUtils.populateSubject(null, this.autoCreds, hashMap);
        Subject.doAs(this.subject, () -> {
            return loadWorker(mkStateStorage, mkStormClusterState, hashMap, credentials);
        });
    }

    private Object loadWorker(IStateStorage iStateStorage, IStormClusterState iStormClusterState, Map<String, String> map, Credentials credentials) throws Exception {
        this.workerState = new WorkerState(this.conf, this.context, this.topologyId, this.assignmentId, this.supervisorIfaceSupplier, this.port, this.workerId, this.topologyConf, iStateStorage, iStormClusterState, this.autoCreds, this.metricRegistry, credentials);
        this.heatbeatMeter = this.metricRegistry.meter("doHeartbeat-calls", this.workerState.getWorkerTopologyContext(), "__system", -1);
        doHeartBeat();
        this.executorsAtom = new AtomicReference<>(null);
        this.workerState.heartbeatTimer.scheduleRecurring(0, ((Integer) this.conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS)).intValue(), () -> {
            try {
                doHeartBeat();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        this.workerState.executorHeartbeatTimer.scheduleRecurring(0, (this.workerState.stormClusterState.isPacemakerStateStore() ? (Integer) this.conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS) : (Integer) this.conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS)).intValue(), this::doExecutorHeartbeats);
        this.workerState.refreshConnections();
        this.workerState.activateWorkerWhenAllConnectionsReady();
        this.workerState.refreshStormActive(null);
        this.workerState.runWorkerStartHooks();
        ArrayList arrayList = new ArrayList();
        for (List<Long> list : this.workerState.getLocalExecutors()) {
            if (ConfigUtils.isLocalMode(this.conf)) {
                Executor mkExecutor = LocalExecutor.mkExecutor(this.workerState, list, map);
                arrayList.add(mkExecutor);
                for (int i = 0; i < mkExecutor.getTaskIds().size(); i++) {
                    this.workerState.localReceiveQueues.put(mkExecutor.getTaskIds().get(i), mkExecutor.getReceiveQueue());
                }
            } else {
                Executor mkExecutor2 = Executor.mkExecutor(this.workerState, list, map);
                for (int i2 = 0; i2 < mkExecutor2.getTaskIds().size(); i2++) {
                    this.workerState.localReceiveQueues.put(mkExecutor2.getTaskIds().get(i2), mkExecutor2.getReceiveQueue());
                }
                arrayList.add(mkExecutor2);
            }
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(((Executor) it.next()).execute());
        }
        this.executorsAtom.set(arrayList2);
        if (this.workerState.hasRemoteOutboundTasks()) {
            this.transferThread = this.workerState.makeTransferThread();
            this.transferThread.setName("Worker-Transfer");
        }
        establishLogSettingCallback();
        int[] iArr = new int[1];
        this.workerState.refreshCredentialsTimer.scheduleRecurring(0, ((Integer) this.conf.get(Config.TASK_CREDENTIALS_POLL_SECS)).intValue(), () -> {
            try {
                checkCredentialsChanged();
                iArr[0] = 0;
            } catch (Exception e) {
                iArr[0] = iArr[0] + 1;
                if (iArr[0] <= 10) {
                    LOG.warn("Ignoring {} of {} consecutive exceptions when checking for credential change", new Object[]{Integer.valueOf(iArr[0]), 10, e});
                } else {
                    LOG.error("Received {} consecutive exceptions, {} tolerated, when checking for credential change", new Object[]{Integer.valueOf(iArr[0]), 10, e});
                    throw e;
                }
            }
        });
        this.workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0, ((Integer) this.conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10)).intValue(), () -> {
            try {
                LOG.debug("Checking if blobs have updated");
                updateBlobUpdates();
            } catch (IOException e) {
                LOG.error(e.getStackTrace().toString());
            }
        });
        if (!((Boolean) this.topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)).booleanValue()) {
            this.workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, this::doRefreshLoad);
        }
        StormTimer stormTimer = this.workerState.refreshConnectionsTimer;
        int intValue = ((Integer) this.conf.get(Config.TASK_REFRESH_POLL_SECS)).intValue();
        WorkerState workerState = this.workerState;
        workerState.getClass();
        stormTimer.scheduleRecurring(0, intValue, workerState::refreshConnections);
        StormTimer stormTimer2 = this.workerState.resetLogLevelsTimer;
        int intValue2 = ((Integer) this.conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS)).intValue();
        LogConfigManager logConfigManager = this.logConfigManager;
        logConfigManager.getClass();
        stormTimer2.scheduleRecurring(0, intValue2, logConfigManager::resetLogLevels);
        StormTimer stormTimer3 = this.workerState.refreshActiveTimer;
        int intValue3 = ((Integer) this.conf.get(Config.TASK_REFRESH_POLL_SECS)).intValue();
        WorkerState workerState2 = this.workerState;
        workerState2.getClass();
        stormTimer3.scheduleRecurring(0, intValue3, workerState2::refreshStormActive);
        setupFlushTupleTimer(this.topologyConf, arrayList2);
        setupBackPressureCheckTimer(this.topologyConf);
        LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(this.topologyConf));
        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", new Object[]{this.workerId, this.topologyId, this.assignmentId, Integer.valueOf(this.port)});
        return this;
    }

    private void setupFlushTupleTimer(Map<String, Object> map, List<IRunningExecutor> list) {
        Integer num = ObjectReader.getInt(map.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
        Integer num2 = ObjectReader.getInt(map.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
        Long l = ObjectReader.getLong(map.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
        if ((num.intValue() == 1 && num2.intValue() == 1) || l.longValue() == 0) {
            LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", new Object[]{num, num2, l});
        } else {
            this.workerState.flushTupleTimer.scheduleRecurringMs(l.longValue(), l.longValue(), () -> {
                for (int i = 0; i < list.size(); i++) {
                    IRunningExecutor iRunningExecutor = (IRunningExecutor) list.get(i);
                    if (iRunningExecutor.getExecutorId().get(0).longValue() != -1) {
                        iRunningExecutor.publishFlushTuple();
                    }
                }
            });
            LOG.info("Flush tuple will be generated every {} millis", l);
        }
    }

    private void setupBackPressureCheckTimer(Map<String, Object> map) {
        if (this.workerState.isSingleWorker()) {
            LOG.info("BackPressure change checking is disabled as there is only one worker");
            return;
        }
        Long l = ObjectReader.getLong(map.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
        this.workerState.backPressureCheckTimer.scheduleRecurringMs(l.longValue(), l.longValue(), () -> {
            this.workerState.refreshBackPressureStatus();
        });
        LOG.info("BackPressure status change checking will be performed every {} millis", l);
    }

    public void doRefreshLoad() {
        this.workerState.refreshLoad(this.executorsAtom.get());
        Iterator<IRunningExecutor> it = this.executorsAtom.get().iterator();
        while (it.hasNext()) {
            it.next().loadChanged(this.workerState.getLoadMapping());
        }
    }

    public void doHeartBeat() throws IOException {
        LocalState workerState = ConfigUtils.workerState(this.workerState.conf, this.workerState.workerId);
        LSWorkerHeartbeat lSWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecs(), this.workerState.topologyId, (List) this.workerState.localExecutors.stream().map(list -> {
            return new ExecutorInfo(((Long) list.get(0)).intValue(), ((Long) list.get(1)).intValue());
        }).collect(Collectors.toList()), this.workerState.port);
        workerState.setWorkerHeartBeat(lSWorkerHeartbeat);
        workerState.cleanup(60);
        if (!this.workerState.stormClusterState.isPacemakerStateStore()) {
            LOG.debug("The pacemaker is not used, send heartbeat to master.");
            heartbeatToMasterIfLocalbeatFail(lSWorkerHeartbeat);
        }
        this.heatbeatMeter.mark();
    }

    public void doExecutorHeartbeats() {
        List<IRunningExecutor> list = this.executorsAtom.get();
        try {
            this.workerState.stormClusterState.workerHeartbeat(this.workerState.topologyId, this.workerState.assignmentId, Long.valueOf(this.workerState.port), ClientStatsUtil.thriftifyZkWorkerHb(ClientStatsUtil.mkZkWorkerHb(this.workerState.topologyId, null == list ? ClientStatsUtil.mkEmptyExecutorZkHbs(this.workerState.localExecutors) : ClientStatsUtil.convertExecutorZkHbs((Map) list.stream().collect(Collectors.toMap((v0) -> {
                return v0.getExecutorId();
            }, (v0) -> {
                return v0.renderStats();
            }))), Integer.valueOf(this.workerState.uptime.upTime()))));
        } catch (Exception e) {
            LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", e);
        }
    }

    public Map<String, Long> getCurrentBlobVersions() throws IOException {
        HashMap hashMap = new HashMap();
        Map map = (Map) this.workerState.getTopologyConf().get(Config.TOPOLOGY_BLOBSTORE_MAP);
        if (map != null) {
            String supervisorStormDistRoot = ConfigUtils.supervisorStormDistRoot(this.workerState.getTopologyConf(), this.workerState.getTopologyId());
            for (Map.Entry entry : map.entrySet()) {
                String str = (String) entry.getKey();
                Map map2 = (Map) entry.getValue();
                if (map2 != null && map2.containsKey("localname")) {
                    str = (String) map2.get("localname");
                }
                Matcher matcher = BLOB_VERSION_EXTRACTION.matcher(new File(supervisorStormDistRoot, str).getCanonicalFile().getName());
                if (matcher.matches()) {
                    hashMap.put(str, Long.valueOf(matcher.group(1)));
                }
            }
        }
        return hashMap;
    }

    public void updateBlobUpdates() throws IOException {
        Map<String, Long> currentBlobVersions = getCurrentBlobVersions();
        this.workerState.blobToLastKnownVersion.putAll(currentBlobVersions);
        LOG.debug("Latest versions for blobs {}", currentBlobVersions);
    }

    public void checkCredentialsChanged() {
        Credentials credentials = this.workerState.stormClusterState.credentials(this.topologyId, null);
        if (ObjectUtils.equals(credentials, this.workerState.getCredentials())) {
            return;
        }
        ClientAuthUtils.updateSubject(this.subject, this.autoCreds, null == credentials ? null : credentials.get_creds());
        this.workerState.setCredentials(credentials);
        Iterator<IRunningExecutor> it = this.executorsAtom.get().iterator();
        while (it.hasNext()) {
            it.next().credentialsChanged(credentials);
        }
    }

    public void checkLogConfigChanged() {
        this.logConfigManager.processLogConfigChange(this.workerState.stormClusterState.topologyLogConfig(this.topologyId, null));
        establishLogSettingCallback();
    }

    public void establishLogSettingCallback() {
        this.workerState.stormClusterState.topologyLogConfig(this.topologyId, this::checkLogConfigChanged);
    }

    private void heartbeatToMasterIfLocalbeatFail(LSWorkerHeartbeat lSWorkerHeartbeat) {
        if (ConfigUtils.isLocalMode(this.conf)) {
            return;
        }
        SupervisorWorkerHeartbeat supervisorWorkerHeartbeat = new SupervisorWorkerHeartbeat(lSWorkerHeartbeat.get_topology_id(), lSWorkerHeartbeat.get_executors(), lSWorkerHeartbeat.get_time_secs());
        try {
            SupervisorIfaceFactory supervisorIfaceFactory = this.supervisorIfaceSupplier.get();
            Throwable th = null;
            try {
                try {
                    supervisorIfaceFactory.getIface().sendSupervisorWorkerHeartbeat(supervisorWorkerHeartbeat);
                    if (supervisorIfaceFactory != null) {
                        if (0 != 0) {
                            try {
                                supervisorIfaceFactory.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            supervisorIfaceFactory.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.warn("Exception when send heartbeat to local supervisor", e.getMessage());
            try {
                NimbusClient configuredClient = NimbusClient.getConfiguredClient(this.topologyConf);
                Throwable th4 = null;
                try {
                    try {
                        configuredClient.getClient().sendSupervisorWorkerHeartbeat(supervisorWorkerHeartbeat);
                        if (configuredClient != null) {
                            if (0 != 0) {
                                try {
                                    configuredClient.close();
                                } catch (Throwable th5) {
                                    th4.addSuppressed(th5);
                                }
                            } else {
                                configuredClient.close();
                            }
                        }
                    } catch (Throwable th6) {
                        th4 = th6;
                        throw th6;
                    }
                } finally {
                }
            } catch (Exception e2) {
                LOG.error("Exception when send heartbeat to master", e2.getMessage());
            }
        }
    }

    @Override // org.apache.storm.daemon.Shutdownable
    public void shutdown() {
        try {
            LOG.info("Shutting down worker {} {} {}", new Object[]{this.topologyId, this.assignmentId, Integer.valueOf(this.port)});
            if (this.workerState != null) {
                Iterator<IConnection> it = this.workerState.cachedNodeToPortSocket.get().values().iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                LOG.info("Terminating messaging context");
                LOG.info("Shutting down executors");
                Iterator<IRunningExecutor> it2 = this.executorsAtom.get().iterator();
                while (it2.hasNext()) {
                    ((ExecutorShutdown) it2.next()).shutdown();
                }
                LOG.info("Shut down executors");
                LOG.info("Shutting down transfer thread");
                this.workerState.haltWorkerTransfer();
                if (this.transferThread != null) {
                    this.transferThread.interrupt();
                    this.transferThread.join();
                    LOG.info("Shut down transfer thread");
                }
                this.workerState.heartbeatTimer.close();
                this.workerState.refreshConnectionsTimer.close();
                this.workerState.refreshCredentialsTimer.close();
                this.workerState.checkForUpdatedBlobsTimer.close();
                this.workerState.refreshActiveTimer.close();
                this.workerState.executorHeartbeatTimer.close();
                this.workerState.userTimer.close();
                this.workerState.refreshLoadTimer.close();
                this.workerState.resetLogLevelsTimer.close();
                this.workerState.flushTupleTimer.close();
                this.workerState.backPressureCheckTimer.close();
                this.workerState.mqContext.term();
                this.workerState.closeResources();
                LOG.info("Trigger any worker shutdown hooks");
                this.workerState.runWorkerShutdownHooks();
                this.workerState.stormClusterState.removeWorkerHeartbeat(this.topologyId, this.assignmentId, Long.valueOf(this.port));
                LOG.info("Disconnecting from storm cluster state context");
                this.workerState.stormClusterState.disconnect();
                this.workerState.stateStorage.close();
            } else {
                LOG.error("workerState is null");
            }
            this.metricRegistry.stop();
            SharedMetricRegistries.remove(Constants.WORKER_METRICS_REGISTRY);
            LOG.info("Shut down worker {} {} {}", new Object[]{this.topologyId, this.assignmentId, Integer.valueOf(this.port)});
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    @Override // org.apache.storm.daemon.DaemonCommon
    public boolean isWaiting() {
        return this.workerState.heartbeatTimer.isTimerWaiting() && this.workerState.refreshConnectionsTimer.isTimerWaiting() && this.workerState.refreshLoadTimer.isTimerWaiting() && this.workerState.refreshCredentialsTimer.isTimerWaiting() && this.workerState.checkForUpdatedBlobsTimer.isTimerWaiting() && this.workerState.refreshActiveTimer.isTimerWaiting() && this.workerState.executorHeartbeatTimer.isTimerWaiting() && this.workerState.userTimer.isTimerWaiting() && this.workerState.flushTupleTimer.isTimerWaiting();
    }
}
