/*
 * Decompiled with CFR 0.152.
 */
package org.apache.edgent.oplet.core;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.edgent.execution.mbeans.PeriodMXBean;
import org.apache.edgent.execution.services.ControlService;
import org.apache.edgent.oplet.OpletContext;
import org.apache.edgent.oplet.OutputPortContext;
import org.apache.edgent.oplet.core.Source;

public abstract class PeriodicSource<T>
extends Source<T>
implements Runnable,
PeriodMXBean {
    private static final String TSTREAM_TYPE = "stream";
    private long period;
    private TimeUnit unit;
    private ScheduledFuture<?> future;

    protected PeriodicSource(long period, TimeUnit unit) {
        this.period = period;
        this.unit = unit;
    }

    @Override
    public void initialize(OpletContext<Void, T> context) {
        super.initialize(context);
    }

    @Override
    public synchronized void start() {
        ControlService cs = this.getOpletContext().getService(ControlService.class);
        if (cs != null) {
            cs.registerControl(TSTREAM_TYPE, this.getOpletContext().uniquify(this.getClass().getSimpleName()), this.getAlias(), PeriodMXBean.class, (Object)this);
        }
        this.schedule(false);
    }

    private String getAlias() {
        OutputPortContext oc = this.getOpletContext().getOutputContext().get(0);
        return oc.getAlias();
    }

    private synchronized void schedule(boolean delay) {
        this.future = this.getOpletContext().getService(ScheduledExecutorService.class).scheduleAtFixedRate(this.getRunnable(), delay ? this.getPeriod() : 0L, this.getPeriod(), this.getUnit());
    }

    protected Runnable getRunnable() {
        return this;
    }

    protected abstract void fetchTuples() throws Exception;

    @Override
    public void run() {
        try {
            this.fetchTuples();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized long getPeriod() {
        return this.period;
    }

    public synchronized TimeUnit getUnit() {
        return this.unit;
    }

    public synchronized void setPeriod(long period) {
        this.setPeriod(period, this.getUnit());
    }

    public synchronized void setPeriod(long period, TimeUnit unit) {
        if (period <= 0L) {
            throw new IllegalArgumentException();
        }
        if (this.period != period || this.unit != unit) {
            this.future.cancel(false);
            this.period = period;
            this.unit = unit;
            this.schedule(true);
        }
    }
}

