/*
 * Decompiled with CFR 0.152.
 */
package stream.runtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Context;
import stream.Data;
import stream.Process;
import stream.ProcessContext;
import stream.Processor;
import stream.StatefulProcessor;
import stream.io.Sink;
import stream.io.Source;
import stream.runtime.ApplicationContext;
import stream.runtime.Priority;
import stream.runtime.ProcessContextImpl;

public abstract class AbstractProcess
implements Process {
    static Logger log = LoggerFactory.getLogger(AbstractProcess.class);
    private String id = UUID.randomUUID().toString();
    protected Context parentContext;
    protected ProcessContext processContext;
    protected Source source;
    protected Sink sink;
    protected final List<Processor> processors = new ArrayList<Processor>();
    protected final Map<String, String> properties = new LinkedHashMap<String, String>();
    protected Priority priority = new Priority();
    protected String onError = "exit";

    public void setInput(Source ds) {
        this.source = ds;
    }

    public Source getInput() {
        return this.source;
    }

    public void setOutput(Sink sink) {
        this.sink = sink;
    }

    public Sink getOutput() {
        return this.sink;
    }

    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Data process(Data data) {
        log.trace("{}: processing data {}", (Object)this, (Object)data);
        for (Processor proc : this.processors) {
            data = proc.process(data);
            if (data != null) continue;
            return null;
        }
        return data;
    }

    public void init(ApplicationContext context) throws Exception {
        this.parentContext = context;
        this.processContext = new ProcessContextImpl(this.getId(), context);
        for (Processor proc : this.processors) {
            if (!(proc instanceof StatefulProcessor)) continue;
            ((StatefulProcessor)proc).init(this.processContext);
        }
        log.debug("Process {} (source: {}) initialized, processors: ", (Object)this, (Object)this.getInput());
    }

    public void finish() throws Exception {
        log.debug("Finishing process {} (source: {})...", (Object)this, (Object)this.getInput());
        for (Processor proc : this.processors) {
            if (!(proc instanceof StatefulProcessor)) continue;
            try {
                log.debug("Finishing processor {}", (Object)proc);
                ((StatefulProcessor)proc).finish();
            }
            catch (Exception e) {
                log.error("Failed to finish processor '{}': {}", (Object)proc, (Object)e.getMessage());
                e.printStackTrace();
            }
        }
    }

    public void execute() throws Exception {
        try {
            if (this.getInput() == null) {
                log.error("Could not read from input!");
                throw new IOException("Can't read from input");
            }
            Data item = this.getInput().read();
            while (item != null) {
                try {
                    item = this.process(item);
                    if (item != null && this.getOutput() != null) {
                        log.trace("Sending process output to connected sink {}", (Object)this.getOutput());
                        this.getOutput().write(item);
                    }
                    log.debug("Reading next item from input '{}'", (Object)this.getInput());
                    item = this.getInput().read();
                }
                catch (Exception e) {
                    if ("continue".equalsIgnoreCase(this.onError)) {
                        log.error("Error while processing data: {}", (Object)e.getMessage());
                        log.error("   continuing with next item...");
                        continue;
                    }
                    throw e;
                }
            }
            log.debug("No more items could be read, exiting process {}", (Object)this.getId());
        }
        catch (Exception e) {
            log.error("Error occurred in process '{}': {}", (Object)this.getId(), (Object)e.getMessage());
            throw e;
        }
        try {
            this.finish();
        }
        catch (Exception e) {
            log.warn("Error while finishing process: {}", (Object)e.getMessage());
            e.printStackTrace();
        }
    }

    public ProcessContext getContext() {
        return this.processContext;
    }

    public void add(Processor p) {
        this.processors.add(p);
    }

    public void remove(Processor p) {
        this.processors.remove(p);
    }

    public List<Processor> getProcessors() {
        return this.processors;
    }

    public Map<String, String> getProperties() {
        return this.properties;
    }

    public Priority getPriority() {
        return this.priority;
    }

    public void setPriority(Priority priority) {
        this.priority = priority;
    }

    public String getOnError() {
        return this.onError;
    }

    public void setOnError(String onError) {
        this.onError = onError;
    }

    public String toString() {
        if (this.id != null) {
            return this.getClass().getSimpleName() + "['" + this.id + "']";
        }
        return this.getClass().getSimpleName() + "['" + super.toString() + "']";
    }
}

