/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.hub.step.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.io.InputStreamHandle;
import com.marklogic.hub.HubClient;
import com.marklogic.hub.HubClientConfig;
import com.marklogic.hub.dataservices.StepRunnerService;
import com.marklogic.hub.impl.HubClientImpl;
import com.marklogic.hub.util.DiskQueue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SourceQueryCollector {
    private final HubClient hubClient;
    private final String sourceDatabase;
    private static HubClientConfig hubClientConfig;
    private static final Collector<CharSequence, ?, String> newLineCollector;

    public SourceQueryCollector(HubClient hubClient, String sourceDatabase) {
        this.hubClient = hubClient;
        this.sourceDatabase = sourceDatabase;
        hubClientConfig = ((HubClientImpl)hubClient).getHubClientConfig();
    }

    public DiskQueue<String> run(String flow, String step, Map<String, Object> options) {
        DatabaseClient dbClient = this.sourceDatabase.toUpperCase().contains("FINAL") ? this.hubClient.getFinalClient() : this.hubClient.getStagingClient();
        StepRunnerService stepRunnerService = StepRunnerService.on(dbClient);
        String optionsStr = null;
        if (options != null) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                optionsStr = objectMapper.writeValueAsString(options);
            }
            catch (JsonProcessingException ex) {
                throw new RuntimeException(String.format("Unexpected Json Processing Exception when collecting items to process for flow %s and step %s; cause: %s", new Object[]{flow, step, ex}));
            }
        }
        try {
            Stream<InputStreamHandle> collectedIDs = stepRunnerService.collector(stepRunnerService.newSessionState(), flow, step, optionsStr, this.sourceDatabase);
            return SourceQueryCollector.readItems(collectedIDs);
        }
        catch (IOException ex) {
            throw new RuntimeException(String.format("Unexpected IO exception when collecting items to process for flow %s and step %s; cause: %s", flow, step, ex));
        }
        catch (Exception ex) {
            throw new RuntimeException(String.format("Unable to collect items to process for flow %s and step %s; cause: %s", flow, step, ex.getMessage()));
        }
    }

    private static DiskQueue<String> readItems(Stream<InputStreamHandle> response) throws IOException {
        DiskQueue<String> results = new DiskQueue<String>(hubClientConfig);
        response.forEach(inputStreamHandle -> {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStreamHandle.get(), StandardCharsets.UTF_8));){
                String text = reader.lines().collect(newLineCollector);
                results.add(text);
            }
            catch (IOException iOException) {
            }
            finally {
                inputStreamHandle.close();
            }
        });
        return results;
    }

    static {
        newLineCollector = Collectors.joining("\n");
    }
}

