/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ResourceConfig;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.SnapshotValidationRecord;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.EntryBackupProcessor;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.query.Predicate;
import com.hazelcast.spi.NodeEngine;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;
import java.util.stream.Collectors;
import java.util.zip.DeflaterOutputStream;
import javax.annotation.Nonnull;

public class JobRepository {
    public static final String INTERNAL_JET_OBJECTS_PREFIX = "__jet.";
    public static final String EXPORTED_SNAPSHOTS_PREFIX = "__jet.exportedSnapshot.";
    public static final String EXPORTED_SNAPSHOTS_DETAIL_CACHE = "__jet.exportedSnapshotsCache";
    public static final String RESOURCES_MAP_NAME_PREFIX = "__jet.resources.";
    public static final String RANDOM_IDS_MAP_NAME = "__jet.ids";
    public static final String JOB_RECORDS_MAP_NAME = "__jet.records";
    public static final String JOB_EXECUTION_RECORDS_MAP_NAME = "__jet.executionRecords";
    public static final String JOB_RESULTS_MAP_NAME = "__jet.results";
    public static final String SNAPSHOT_DATA_MAP_PREFIX = "__jet.snapshot.";
    private static final long DEFAULT_RESOURCES_EXPIRATION_MILLIS = TimeUnit.HOURS.toMillis(2L);
    private static final int JOB_ID_STRING_LENGTH = com.hazelcast.jet.Util.idToString(0L).length();
    private final HazelcastInstance instance;
    private final ILogger logger;
    private final IMap<Long, Long> randomIds;
    private final IMap<Long, JobRecord> jobRecords;
    private final IMap<Long, JobExecutionRecord> jobExecutionRecords;
    private final IMap<Long, JobResult> jobResults;
    private final IMap<String, SnapshotValidationRecord> exportedSnapshotDetailsCache;
    private long resourcesExpirationMillis = DEFAULT_RESOURCES_EXPIRATION_MILLIS;

    public JobRepository(JetInstance jetInstance) {
        this.instance = jetInstance.getHazelcastInstance();
        this.logger = this.instance.getLoggingService().getLogger(this.getClass());
        this.randomIds = this.instance.getMap(RANDOM_IDS_MAP_NAME);
        this.jobRecords = this.instance.getMap(JOB_RECORDS_MAP_NAME);
        this.jobExecutionRecords = this.instance.getMap(JOB_EXECUTION_RECORDS_MAP_NAME);
        this.jobResults = this.instance.getMap(JOB_RESULTS_MAP_NAME);
        this.exportedSnapshotDetailsCache = this.instance.getMap(EXPORTED_SNAPSHOTS_DETAIL_CACHE);
    }

    void setResourcesExpirationMillis(long resourcesExpirationMillis) {
        this.resourcesExpirationMillis = resourcesExpirationMillis;
    }

    public long uploadJobResources(JobConfig jobConfig) {
        long jobId = this.newJobId();
        IMap jobResourcesMap = this.getJobResources(jobId);
        HashMap<String, byte[]> tmpMap = new HashMap<String, byte[]>();
        try {
            for (ResourceConfig rc : jobConfig.getResourceConfigs()) {
                if (rc.isArchive()) {
                    this.loadJar(tmpMap, rc.getUrl());
                    continue;
                }
                InputStream in = rc.getUrl().openStream();
                this.readStreamAndPutCompressedToMap(rc.getId(), tmpMap, in);
            }
            jobResourcesMap.putAll(tmpMap);
        }
        catch (Exception e) {
            jobResourcesMap.destroy();
            throw new JetException("Job resource upload failed", e);
        }
        return jobId;
    }

    private long newJobId() {
        long jobId;
        while (this.randomIds.putIfAbsent((Object)(jobId = Util.secureRandomNextLong()), (Object)jobId) != null) {
        }
        return jobId;
    }

    private void loadJar(Map<String, byte[]> map, URL url) throws IOException {
        try (JarInputStream jis = new JarInputStream(new BufferedInputStream(url.openStream()));){
            JarEntry jarEntry;
            while ((jarEntry = jis.getNextJarEntry()) != null) {
                if (jarEntry.isDirectory()) continue;
                this.readStreamAndPutCompressedToMap(jarEntry.getName(), map, jis);
            }
        }
    }

    private void readStreamAndPutCompressedToMap(String resourceName, Map<String, byte[]> map, InputStream in) throws IOException {
        if (map.containsKey(resourceName)) {
            return;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (DeflaterOutputStream compressor = new DeflaterOutputStream(baos);){
            IOUtil.drainTo((InputStream)in, (OutputStream)compressor);
        }
        map.put(resourceName, baos.toByteArray());
    }

    void putNewJobRecord(JobRecord jobRecord) {
        long jobId = jobRecord.getJobId();
        JobRecord prev = (JobRecord)this.jobRecords.putIfAbsent((Object)jobId, (Object)jobRecord);
        if (prev != null && !prev.getDag().equals(jobRecord.getDag())) {
            throw new IllegalStateException("Cannot put job record for job " + com.hazelcast.jet.Util.idToString(jobId) + " because it already exists with a different DAG");
        }
    }

    void updateJobQuorumSizeIfSmaller(long jobId, int newQuorumSize) {
        this.jobExecutionRecords.executeOnKey((Object)jobId, Util.entryProcessor((key, value) -> {
            if (value == null) {
                return null;
            }
            value.setLargerQuorumSize(newQuorumSize);
            return value;
        }));
    }

    long newExecutionId(long jobId) {
        long executionId;
        while (this.randomIds.putIfAbsent((Object)(executionId = Util.secureRandomNextLong()), (Object)jobId) != null) {
        }
        return executionId;
    }

    long getExecutionIdCount(long jobId) {
        return this.randomIds.values((Predicate)new FilterExecutionIdByJobIdPredicate(jobId)).size();
    }

    void completeJob(long jobId, String coordinator, long completionTime, Throwable error) {
        JobRecord jobRecord = this.getJobRecord(jobId);
        if (jobRecord == null) {
            throw new JobNotFoundException(jobId);
        }
        JobConfig config = jobRecord.getConfig();
        long creationTime = jobRecord.getCreationTime();
        JobResult jobResult = new JobResult(jobId, config, coordinator, creationTime, completionTime, error != null ? error.toString() : null);
        JobResult prev = (JobResult)this.jobResults.putIfAbsent((Object)jobId, (Object)jobResult);
        if (prev != null) {
            throw new IllegalStateException("Job result already exists in the " + this.jobResults.getName() + " map:\nprevious record: " + prev + "\nnew record: " + jobResult);
        }
        this.deleteJob(jobId);
    }

    void deleteJob(long jobId) {
        this.jobExecutionRecords.remove((Object)jobId);
        this.jobRecords.remove((Object)jobId);
        this.randomIds.removeAll((Predicate)new FilterExecutionIdByJobIdPredicate(jobId));
        this.instance.getMap(JobRepository.snapshotDataMapName(jobId, 0)).destroy();
        this.instance.getMap(JobRepository.snapshotDataMapName(jobId, 1)).destroy();
        this.getJobResources(jobId).destroy();
    }

    void cleanup(NodeEngine nodeEngine) {
        Collection maps = nodeEngine.getProxyService().getDistributedObjects("hz:impl:mapService");
        Set activeJobs = this.jobRecords.keySet();
        for (DistributedObject map : maps) {
            long id;
            if (map.getName().startsWith(SNAPSHOT_DATA_MAP_PREFIX)) {
                id = this.jobIdFromMapName(map.getName(), SNAPSHOT_DATA_MAP_PREFIX);
                if (activeJobs.contains(id)) continue;
                LoggingUtil.logFine(this.logger, "Deleting snapshot data map '%s' because job already finished", map.getName());
                map.destroy();
                continue;
            }
            if (!map.getName().startsWith(RESOURCES_MAP_NAME_PREFIX) || activeJobs.contains(id = this.jobIdFromMapName(map.getName(), RESOURCES_MAP_NAME_PREFIX))) continue;
            if (this.jobResults.containsKey((Object)id)) {
                LoggingUtil.logFine(this.logger, "Deleting job resource map '%s' because job is already finished", map.getName());
                map.destroy();
                continue;
            }
            IMap resourceMap = (IMap)map;
            long creationTime = resourceMap.getLocalMapStats().getCreationTime();
            if (!this.isResourceMapExpired(creationTime)) continue;
            this.logger.fine("Deleting job resource map " + map.getName() + " because the map was created long ago and job record or result still doesn't exist");
            resourceMap.destroy();
        }
    }

    private long jobIdFromMapName(String map, String prefix) {
        int idx = prefix.length();
        String jobId = map.substring(idx, idx + JOB_ID_STRING_LENGTH);
        return com.hazelcast.jet.Util.idFromString(jobId);
    }

    private boolean isResourceMapExpired(long creationTime) {
        return System.currentTimeMillis() - creationTime >= this.resourcesExpirationMillis;
    }

    Set<Long> getAllJobIds() {
        HashSet<Long> ids = new HashSet<Long>();
        ids.addAll(this.jobRecords.keySet());
        ids.addAll(this.jobResults.keySet());
        return ids;
    }

    Collection<JobRecord> getJobRecords() {
        return this.jobRecords.values();
    }

    public JobRecord getJobRecord(long jobId) {
        return (JobRecord)this.jobRecords.get((Object)jobId);
    }

    public JobExecutionRecord getJobExecutionRecord(long jobId) {
        return (JobExecutionRecord)this.jobExecutionRecords.get((Object)jobId);
    }

    <T> IMap<String, T> getJobResources(long jobId) {
        return this.instance.getMap(RESOURCES_MAP_NAME_PREFIX + com.hazelcast.jet.Util.idToString(jobId));
    }

    public JobResult getJobResult(long jobId) {
        return (JobResult)this.jobResults.get((Object)jobId);
    }

    Collection<JobResult> getJobResults() {
        return this.jobResults.values();
    }

    List<JobResult> getJobResults(String name) {
        return this.jobResults.values((Predicate)new FilterJobResultByNamePredicate(name)).stream().sorted(Comparator.comparing(JobResult::getCreationTime).reversed()).collect(Collectors.toList());
    }

    void writeJobExecutionRecord(long jobId, JobExecutionRecord record, boolean canCreate) {
        record.updateTimestamp();
        String message = (String)this.jobExecutionRecords.executeOnKey((Object)jobId, (EntryProcessor)new UpdateJobExecutionRecordEntryProcessor(jobId, record, canCreate));
        if (message != null) {
            this.logger.fine(message);
        }
    }

    public static String snapshotDataMapName(long jobId, int dataMapIndex) {
        return SNAPSHOT_DATA_MAP_PREFIX + com.hazelcast.jet.Util.idToString(jobId) + '.' + dataMapIndex;
    }

    public static String exportedSnapshotMapName(String name) {
        return EXPORTED_SNAPSHOTS_PREFIX + name;
    }

    void clearSnapshotData(long jobId, int dataMapIndex) {
        String mapName = JobRepository.snapshotDataMapName(jobId, dataMapIndex);
        try {
            this.instance.getMap(mapName).clear();
            LoggingUtil.logFine(this.logger, "Cleared snapshot data map %s", mapName);
        }
        catch (Exception logged) {
            this.logger.warning("Cannot delete old snapshot data  " + com.hazelcast.jet.Util.idToString(jobId), (Throwable)logged);
        }
    }

    void cacheValidationRecord(@Nonnull String snapshotName, @Nonnull SnapshotValidationRecord validationRecord) {
        this.exportedSnapshotDetailsCache.set((Object)snapshotName, (Object)validationRecord);
    }

    public static class FilterJobResultByNamePredicate
    implements Predicate<Long, JobResult>,
    IdentifiedDataSerializable {
        private String name;

        public FilterJobResultByNamePredicate() {
        }

        FilterJobResultByNamePredicate(String name) {
            this.name = name;
        }

        public boolean apply(Map.Entry<Long, JobResult> entry) {
            return this.name.equals(entry.getValue().getJobConfig().getName());
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 21;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeUTF(this.name);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.name = in.readUTF();
        }
    }

    public static class FilterJobIdPredicate
    implements Predicate<Long, Long>,
    IdentifiedDataSerializable {
        public boolean apply(Map.Entry<Long, Long> mapEntry) {
            return mapEntry.getKey().equals(mapEntry.getValue());
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 14;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
        }

        public void readData(ObjectDataInput in) throws IOException {
        }
    }

    public static class FilterExecutionIdByJobIdPredicate
    implements Predicate<Long, Long>,
    IdentifiedDataSerializable {
        private long jobId;

        public FilterExecutionIdByJobIdPredicate() {
        }

        FilterExecutionIdByJobIdPredicate(long jobId) {
            this.jobId = jobId;
        }

        public boolean apply(Map.Entry<Long, Long> mapEntry) {
            return mapEntry.getKey() != this.jobId && mapEntry.getValue() == this.jobId;
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 13;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeLong(this.jobId);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.jobId = in.readLong();
        }
    }

    public static final class UpdateJobExecutionRecordEntryProcessor
    implements EntryProcessor<Long, JobExecutionRecord>,
    EntryBackupProcessor<Long, JobExecutionRecord>,
    IdentifiedDataSerializable {
        private long jobId;
        @SuppressFBWarnings(value={"SE_BAD_FIELD"}, justification="this class is not going to be java-serialized")
        private JobExecutionRecord jobExecutionRecord;
        private boolean canCreate;

        public UpdateJobExecutionRecordEntryProcessor() {
        }

        UpdateJobExecutionRecordEntryProcessor(long jobId, JobExecutionRecord jobExecutionRecord, boolean canCreate) {
            this.jobId = jobId;
            this.jobExecutionRecord = jobExecutionRecord;
            this.canCreate = canCreate;
        }

        public Object process(Map.Entry<Long, JobExecutionRecord> entry) {
            if (entry.getValue() == null && !this.canCreate) {
                return "Update to JobRecord for job " + com.hazelcast.jet.Util.idToString(this.jobId) + " ignored, oldValue == null";
            }
            if (entry.getValue() != null && entry.getValue().getTimestamp() >= this.jobExecutionRecord.getTimestamp()) {
                return "Update to JobRecord for job " + com.hazelcast.jet.Util.idToString(this.jobId) + " ignored, newer timestamp found. Stored timestamp=" + entry.getValue().getTimestamp() + ", timestamp of the update=" + this.jobExecutionRecord.getTimestamp();
            }
            entry.setValue(this.jobExecutionRecord);
            return null;
        }

        public EntryBackupProcessor<Long, JobExecutionRecord> getBackupProcessor() {
            return this;
        }

        public void processBackup(Map.Entry<Long, JobExecutionRecord> entry) {
            this.process(entry);
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 18;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeLong(this.jobId);
            out.writeObject((Object)this.jobExecutionRecord);
            out.writeBoolean(this.canCreate);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.jobId = in.readLong();
            this.jobExecutionRecord = (JobExecutionRecord)in.readObject();
            this.canCreate = in.readBoolean();
        }
    }
}

