/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard.merge;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.standard.MergeContent;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.processors.standard.merge.RecordBin;
import org.apache.nifi.processors.standard.merge.RecordBinThresholds;
import org.apache.nifi.serialization.RecordReader;

public class RecordBinManager {
    private final ProcessContext context;
    private final ProcessSessionFactory sessionFactory;
    private final ComponentLog logger;
    private final int maxBinCount;
    private final AtomicLong maxBinAgeNanos = new AtomicLong(Long.MAX_VALUE);
    private final Map<String, List<RecordBin>> groupBinMap = new HashMap<String, List<RecordBin>>();
    private final Lock lock = new ReentrantLock();
    private final AtomicInteger binCount = new AtomicInteger(0);

    public RecordBinManager(ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger) {
        this.context = context;
        this.sessionFactory = sessionFactory;
        this.logger = logger;
        Integer maxBins = context.getProperty(MergeRecord.MAX_BIN_COUNT).asInteger();
        this.maxBinCount = maxBins == null ? Integer.MAX_VALUE : maxBins;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void purge() {
        this.lock.lock();
        try {
            for (List<RecordBin> binList : this.groupBinMap.values()) {
                for (RecordBin bin : binList) {
                    bin.rollback();
                }
            }
            this.groupBinMap.clear();
            this.binCount.set(0);
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setMaxBinAge(Long timePeriod, TimeUnit timeUnit) {
        if (timePeriod == null) {
            this.maxBinAgeNanos.set(Long.MAX_VALUE);
        } else {
            this.maxBinAgeNanos.set(timeUnit.toNanos(timePeriod));
        }
    }

    public int getBinCount() {
        return this.binCount.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(String groupIdentifier, FlowFile flowFile, RecordReader reader, ProcessSession session, boolean block) throws IOException {
        List currentBins;
        this.lock.lock();
        try {
            currentBins = this.groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList());
        }
        finally {
            this.lock.unlock();
        }
        RecordBin acceptedBin = null;
        for (RecordBin bin : currentBins) {
            boolean accepted = bin.offer(flowFile, reader, session, block);
            if (!accepted) continue;
            acceptedBin = bin;
            this.logger.debug("Transferred id={} to {}", new Object[]{flowFile.getId(), bin});
            break;
        }
        if (acceptedBin != null) {
            if (acceptedBin.isComplete()) {
                this.removeBins(groupIdentifier, List.of(acceptedBin));
            }
            return;
        }
        RecordBin bin = new RecordBin(this.context, this.sessionFactory.createSession(), this.logger, this.createThresholds(flowFile));
        boolean binAccepted = bin.offer(flowFile, reader, session, true);
        if (!binAccepted) {
            session.rollback();
            throw new RuntimeException("Attempted to add " + String.valueOf(flowFile) + " to a new bin but failed. This is unexpected. Will roll back session and try again.");
        }
        this.logger.debug("Transferred id={} to {}", new Object[]{flowFile.getId(), bin});
        if (!bin.isComplete()) {
            int updatedBinCount = this.binCount.incrementAndGet();
            this.lock.lock();
            try {
                List bins = this.groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList());
                bins.add(bin);
            }
            finally {
                this.lock.unlock();
            }
            if (updatedBinCount > this.maxBinCount) {
                this.completeOldestBin();
            }
        }
    }

    private RecordBinThresholds createThresholds(FlowFile flowfile) {
        String fragmentCountAttribute;
        int minRecords = this.context.getProperty(MergeRecord.MIN_RECORDS).evaluateAttributeExpressions().asInteger();
        int maxRecords = this.context.getProperty(MergeRecord.MAX_RECORDS).evaluateAttributeExpressions().asInteger();
        long minBytes = this.context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue();
        PropertyValue maxSizeValue = this.context.getProperty(MergeRecord.MAX_SIZE);
        long maxBytes = maxSizeValue.isSet() ? maxSizeValue.asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
        PropertyValue maxMillisValue = this.context.getProperty(MergeRecord.MAX_BIN_AGE);
        String maxBinAge = maxMillisValue.getValue();
        long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS) : Long.MAX_VALUE;
        String mergeStrategy = this.context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
        if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
            fragmentCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
            minRecords = Integer.MAX_VALUE;
        } else {
            fragmentCountAttribute = null;
        }
        return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, fragmentCountAttribute);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void completeOldestBin() throws IOException {
        RecordBin oldestBin = null;
        this.lock.lock();
        try {
            String oldestBinGroup = null;
            for (Map.Entry<String, List<RecordBin>> group : this.groupBinMap.entrySet()) {
                for (RecordBin bin : group.getValue()) {
                    if (oldestBin != null && !bin.isOlderThan(oldestBin)) continue;
                    oldestBin = bin;
                    oldestBinGroup = group.getKey();
                }
            }
            if (oldestBin == null) {
                return;
            }
            this.removeBins(oldestBinGroup, List.of(oldestBin));
        }
        finally {
            this.lock.unlock();
        }
        this.logger.debug("Completing Bin {} because the maximum number of bins has been exceeded", new Object[]{oldestBin});
        oldestBin.complete("Maximum number of bins has been exceeded");
    }

    public int completeExpiredBins() throws IOException {
        long maxNanos = this.maxBinAgeNanos.get();
        return this.handleCompletedBins(bin -> bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS), "Bin has reached Max Bin Age");
    }

    public int completeFullEnoughBins() throws IOException {
        return this.handleCompletedBins(RecordBin::isFullEnough, "Bin is full enough");
    }

    public int completeFullBins() throws IOException {
        return this.handleCompletedBins(RecordBin::isFull, "Bin is completely full");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int handleCompletedBins(Predicate<RecordBin> completionTest, String completionReason) throws IOException {
        HashMap<String, List> completedBinMap = new HashMap<String, List>();
        this.lock.lock();
        try {
            for (Map.Entry<String, List<RecordBin>> entry : this.groupBinMap.entrySet()) {
                String key = entry.getKey();
                List<RecordBin> bins = entry.getValue();
                for (RecordBin bin : bins) {
                    if (!completionTest.test(bin)) continue;
                    List expiredBinsForKey = completedBinMap.computeIfAbsent(key, ignore -> new ArrayList());
                    expiredBinsForKey.add(bin);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
        int completed = 0;
        for (Map.Entry entry : completedBinMap.entrySet()) {
            String key = (String)entry.getKey();
            List completeBins = (List)entry.getValue();
            for (RecordBin bin : completeBins) {
                this.logger.debug("Completing Bin {} because {}", new Object[]{bin, completionReason});
                bin.complete(completionReason);
                ++completed;
            }
            this.removeBins(key, completeBins);
        }
        return completed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeBins(String key, List<RecordBin> bins) {
        this.lock.lock();
        try {
            List<RecordBin> list = this.groupBinMap.get(key);
            if (list != null) {
                int initialSize = list.size();
                list.removeAll(bins);
                int removedCount = initialSize - list.size();
                this.binCount.addAndGet(-removedCount);
                if (list.isEmpty()) {
                    this.groupBinMap.remove(key);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }
}

