/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.EntryView;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ResourceConfig;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.SnapshotRepository;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.map.EntryBackupProcessor;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.query.Predicate;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;
import java.util.stream.Collectors;
import java.util.zip.DeflaterOutputStream;
import javax.annotation.Nullable;

public class JobRepository {
    public static final String RESOURCES_MAP_NAME_PREFIX = "__jet.resources.";
    public static final String RANDOM_IDS_MAP_NAME = "__jet.ids";
    public static final String JOB_RECORDS_MAP_NAME = "__jet.records";
    public static final String JOB_RESULTS_MAP_NAME = "__jet.results";
    private static final String RESOURCE_MARKER = "__jet.resourceMarker";
    private static final long DEFAULT_RESOURCES_EXPIRATION_MILLIS = TimeUnit.HOURS.toMillis(2L);
    private final HazelcastInstance instance;
    private final SnapshotRepository snapshotRepository;
    private final IMap<Long, Long> randomIds;
    private final IMap<Long, JobRecord> jobRecords;
    private final IMap<Long, JobResult> jobResults;
    private long resourcesExpirationMillis = DEFAULT_RESOURCES_EXPIRATION_MILLIS;
    private final Set<Long> deletedJobs = Collections.newSetFromMap(new ConcurrentHashMap());

    JobRepository(JetInstance jetInstance, @Nullable SnapshotRepository snapshotRepository) {
        this.instance = jetInstance.getHazelcastInstance();
        this.snapshotRepository = snapshotRepository;
        this.randomIds = this.instance.getMap(RANDOM_IDS_MAP_NAME);
        this.jobRecords = this.instance.getMap(JOB_RECORDS_MAP_NAME);
        this.jobResults = this.instance.getMap(JOB_RESULTS_MAP_NAME);
    }

    void setResourcesExpirationMillis(long resourcesExpirationMillis) {
        this.resourcesExpirationMillis = resourcesExpirationMillis;
    }

    public long uploadJobResources(JobConfig jobConfig) {
        long jobId = this.newJobId();
        IMap jobResourcesMap = this.getJobResources(jobId);
        for (ResourceConfig rc : jobConfig.getResourceConfigs()) {
            HashMap<String, byte[]> tmpMap = new HashMap<String, byte[]>();
            if (rc.isArchive()) {
                try {
                    this.loadJar(tmpMap, rc.getUrl());
                }
                catch (IOException e) {
                    this.cleanupJobResourcesAndSnapshots(jobId, jobResourcesMap);
                    this.randomIds.remove((Object)jobId);
                    throw new JetException("Job resource upload failed", e);
                }
            }
            try {
                InputStream in = rc.getUrl().openStream();
                this.readStreamAndPutCompressedToMap(rc.getId(), tmpMap, in);
            }
            catch (IOException e) {
                this.cleanupJobResourcesAndSnapshots(jobId, jobResourcesMap);
                this.randomIds.remove((Object)jobId);
                throw new JetException("Job resource upload failed", e);
            }
            jobResourcesMap.putAll(tmpMap);
        }
        jobResourcesMap.put((Object)RESOURCE_MARKER, (Object)System.currentTimeMillis());
        return jobId;
    }

    private long newJobId() {
        long jobId;
        while (this.randomIds.putIfAbsent((Object)(jobId = Util.secureRandomNextLong()), (Object)jobId) != null) {
        }
        return jobId;
    }

    private void loadJar(Map<String, byte[]> map, URL url) throws IOException {
        try (JarInputStream jis = new JarInputStream(new BufferedInputStream(url.openStream()));){
            JarEntry jarEntry;
            while ((jarEntry = jis.getNextJarEntry()) != null) {
                if (jarEntry.isDirectory()) continue;
                this.readStreamAndPutCompressedToMap(jarEntry.getName(), map, jis);
            }
        }
    }

    private void readStreamAndPutCompressedToMap(String resourceName, Map<String, byte[]> map, InputStream in) throws IOException {
        if (map.containsKey(resourceName)) {
            return;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (DeflaterOutputStream compressor = new DeflaterOutputStream(baos);){
            IOUtil.drainTo((InputStream)in, (OutputStream)compressor);
        }
        map.put(resourceName, baos.toByteArray());
    }

    private void cleanupJobResourcesAndSnapshots(long jobId, IMap<String, Object> jobResourcesMap) {
        if (this.snapshotRepository != null) {
            this.snapshotRepository.deleteAllSnapshots(jobId);
        }
        jobResourcesMap.destroy();
    }

    void putNewJobRecord(JobRecord jobRecord) {
        long jobId = jobRecord.getJobId();
        JobRecord prev = (JobRecord)this.jobRecords.putIfAbsent((Object)jobId, (Object)jobRecord);
        if (prev != null && !prev.getDag().equals(jobRecord.getDag())) {
            throw new IllegalStateException("Cannot put job record for job " + Util.idToString(jobId) + " because it already exists with a different dag");
        }
    }

    boolean updateJobQuorumSizeIfLargerThanCurrent(long jobId, int newQuorumSize) {
        return (Boolean)this.jobRecords.executeOnKey((Object)jobId, (EntryProcessor)new UpdateJobRecordQuorumEntryProcessor(newQuorumSize));
    }

    long newExecutionId(long jobId) {
        long executionId;
        while (this.randomIds.putIfAbsent((Object)(executionId = Util.secureRandomNextLong()), (Object)jobId) != null) {
        }
        return executionId;
    }

    long getExecutionIdCount(long jobId) {
        return this.randomIds.values((Predicate)new FilterExecutionIdByJobIdPredicate(jobId)).size();
    }

    void completeJob(long jobId, String coordinator, long completionTime, Throwable error) {
        JobRecord jobRecord = this.getJobRecord(jobId);
        if (jobRecord == null) {
            throw new JobNotFoundException(jobId);
        }
        JobConfig config = jobRecord.getConfig();
        long creationTime = jobRecord.getCreationTime();
        JobResult jobResult = new JobResult(jobId, config, coordinator, creationTime, completionTime, error);
        JobResult prev = (JobResult)this.jobResults.putIfAbsent((Object)jobId, (Object)jobResult);
        if (prev != null) {
            throw new IllegalStateException("Job result already exists in the " + this.jobResults.getName() + " map:\nprevious record: " + prev + "\nnew record: " + jobResult);
        }
        this.deleteJob(jobId);
    }

    private void deleteJob(long jobId) {
        if (this.deletedJobs.contains(jobId)) {
            return;
        }
        this.jobRecords.remove((Object)jobId);
        this.randomIds.removeAll((Predicate)new FilterExecutionIdByJobIdPredicate(jobId));
        this.cleanupJobResourcesAndSnapshots(jobId, this.getJobResources(jobId));
        this.deletedJobs.add(jobId);
    }

    void cleanup(Set<Long> runningJobIds) {
        Set completedJobIds = this.jobResults.keySet();
        completedJobIds.forEach(this::deleteJob);
        HashSet<Long> validJobIds = new HashSet<Long>();
        validJobIds.addAll(completedJobIds);
        validJobIds.addAll(runningJobIds);
        validJobIds.addAll(this.jobRecords.keySet());
        this.randomIds.keySet((Predicate)new FilterJobIdPredicate()).stream().filter(jobId -> !validJobIds.contains(jobId)).forEach(jobId -> {
            IMap resources = this.getJobResources((long)jobId);
            EntryView marker = resources.getEntryView((Object)RESOURCE_MARKER);
            if (marker == null) {
                resources.putIfAbsent((Object)RESOURCE_MARKER, (Object)System.currentTimeMillis());
            } else if (this.isMarkerExpired((EntryView<String, Object>)marker)) {
                this.cleanupJobResourcesAndSnapshots((long)jobId, (IMap<String, Object>)resources);
            }
        });
    }

    private boolean isMarkerExpired(EntryView<String, Object> record) {
        return System.currentTimeMillis() - (Long)record.getValue() >= this.resourcesExpirationMillis;
    }

    List<JobRecord> getJobRecords(String name) {
        return this.jobRecords.values((Predicate)new FilterJobRecordByNamePredicate(name)).stream().sorted(Comparator.comparing(JobRecord::getCreationTime).reversed()).collect(Collectors.toList());
    }

    Set<Long> getAllJobIds() {
        HashSet<Long> ids = new HashSet<Long>();
        ids.addAll(this.jobRecords.keySet());
        ids.addAll(this.jobResults.keySet());
        return ids;
    }

    Collection<JobRecord> getJobRecords() {
        return this.jobRecords.values();
    }

    public JobRecord getJobRecord(long jobId) {
        return (JobRecord)this.jobRecords.get((Object)jobId);
    }

    <T> IMap<String, T> getJobResources(long jobId) {
        return this.instance.getMap(RESOURCES_MAP_NAME_PREFIX + Util.idToString(jobId));
    }

    public JobResult getJobResult(long jobId) {
        return (JobResult)this.jobResults.get((Object)jobId);
    }

    List<JobResult> getJobResults(String name) {
        return this.jobResults.values((Predicate)new FilterJobResultByNamePredicate(name)).stream().sorted(Comparator.comparing(JobResult::getCreationTime).reversed()).collect(Collectors.toList());
    }

    public static class FilterJobResultByNamePredicate
    implements Predicate<Long, JobResult>,
    IdentifiedDataSerializable {
        private String name;

        public FilterJobResultByNamePredicate() {
        }

        public FilterJobResultByNamePredicate(String name) {
            this.name = name;
        }

        public boolean apply(Map.Entry<Long, JobResult> entry) {
            return this.name.equals(entry.getValue().getJobConfig().getName());
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 23;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeUTF(this.name);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.name = in.readUTF();
        }
    }

    public static class FilterJobRecordByNamePredicate
    implements Predicate<Long, JobRecord>,
    IdentifiedDataSerializable {
        private String name;

        public FilterJobRecordByNamePredicate() {
        }

        public FilterJobRecordByNamePredicate(String name) {
            this.name = name;
        }

        public boolean apply(Map.Entry<Long, JobRecord> entry) {
            return this.name.equals(entry.getValue().getConfig().getName());
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 22;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeUTF(this.name);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.name = in.readUTF();
        }
    }

    public static class UpdateJobRecordQuorumEntryBackupProcessor
    implements EntryBackupProcessor<Long, JobRecord>,
    IdentifiedDataSerializable {
        private int newQuorumSize;

        public UpdateJobRecordQuorumEntryBackupProcessor() {
        }

        UpdateJobRecordQuorumEntryBackupProcessor(int newQuorumSize) {
            this.newQuorumSize = newQuorumSize;
        }

        public void processBackup(Map.Entry<Long, JobRecord> entry) {
            JobRecord jobRecord = entry.getValue();
            if (jobRecord == null) {
                return;
            }
            JobRecord newJobRecord = new JobRecord(jobRecord.getJobId(), jobRecord.getCreationTime(), jobRecord.getDag(), jobRecord.getConfig(), this.newQuorumSize);
            entry.setValue(newJobRecord);
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 19;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.newQuorumSize);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.newQuorumSize = in.readInt();
        }
    }

    public static class UpdateJobRecordQuorumEntryProcessor
    implements EntryProcessor<Long, JobRecord>,
    IdentifiedDataSerializable {
        private int newQuorumSize;
        private boolean updated;

        public UpdateJobRecordQuorumEntryProcessor() {
        }

        UpdateJobRecordQuorumEntryProcessor(int newQuorumSize) {
            this.newQuorumSize = newQuorumSize;
        }

        public Object process(Map.Entry<Long, JobRecord> entry) {
            JobRecord jobRecord = entry.getValue();
            if (jobRecord == null) {
                return false;
            }
            boolean bl = this.updated = this.newQuorumSize > jobRecord.getQuorumSize();
            if (this.updated) {
                JobRecord newJobRecord = new JobRecord(jobRecord.getJobId(), jobRecord.getCreationTime(), jobRecord.getDag(), jobRecord.getConfig(), this.newQuorumSize);
                entry.setValue(newJobRecord);
            }
            return this.updated;
        }

        public EntryBackupProcessor<Long, JobRecord> getBackupProcessor() {
            return this.updated ? new UpdateJobRecordQuorumEntryBackupProcessor(this.newQuorumSize) : null;
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 18;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.newQuorumSize);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.newQuorumSize = in.readInt();
        }
    }

    public static class FilterJobIdPredicate
    implements Predicate<Long, Long>,
    IdentifiedDataSerializable {
        public boolean apply(Map.Entry<Long, Long> mapEntry) {
            return mapEntry.getKey().equals(mapEntry.getValue());
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 14;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
        }

        public void readData(ObjectDataInput in) throws IOException {
        }
    }

    public static class FilterExecutionIdByJobIdPredicate
    implements Predicate<Long, Long>,
    IdentifiedDataSerializable {
        private long jobId;

        public FilterExecutionIdByJobIdPredicate() {
        }

        FilterExecutionIdByJobIdPredicate(long jobId) {
            this.jobId = jobId;
        }

        public boolean apply(Map.Entry<Long, Long> mapEntry) {
            return mapEntry.getKey() != this.jobId && mapEntry.getValue() == this.jobId;
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 13;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeLong(this.jobId);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.jobId = in.readLong();
        }
    }
}

