/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.couchbase;

import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.ByteBufInputStream;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.RawJsonDocument;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.couchbase.AbstractCouchbaseLookupService;
import org.apache.nifi.couchbase.CouchbaseConfigurationProperties;
import org.apache.nifi.couchbase.DocumentType;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.Tuple;

@Tags(value={"lookup", "enrich", "couchbase"})
@CapabilityDescription(value="Lookup a record from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.")
@DeprecationNotice(reason="This component is deprecated and will be removed in NiFi 2.x.")
public class CouchbaseRecordLookupService
extends AbstractCouchbaseLookupService
implements RecordLookupService {
    private volatile RecordReaderFactory readerFactory;
    private volatile DocumentType documentType;
    private static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The Record Reader to use for parsing fetched document from Couchbase Server.").identifiesControllerService(RecordReaderFactory.class).required(true).build();

    @Override
    protected void addProperties(List<PropertyDescriptor> properties) {
        properties.add(CouchbaseConfigurationProperties.DOCUMENT_TYPE);
        properties.add(RECORD_READER);
    }

    @Override
    @OnEnabled
    public void onEnabled(ConfigurationContext context) throws InitializationException {
        super.onEnabled(context);
        this.readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        this.documentType = DocumentType.valueOf((String)context.getProperty(CouchbaseConfigurationProperties.DOCUMENT_TYPE).getValue());
    }

    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
        Optional<InputStream> inputStream;
        Bucket bucket = this.couchbaseClusterService.openBucket(this.bucketName);
        Optional<String> docId = Optional.ofNullable(coordinates.get("key")).map(Object::toString);
        try {
            switch (this.documentType) {
                case Binary: {
                    inputStream = docId.map(key -> (BinaryDocument)bucket.get(key, BinaryDocument.class)).map(doc -> new ByteBufInputStream((ByteBuf)doc.content()));
                    break;
                }
                case Json: {
                    inputStream = docId.map(key -> (RawJsonDocument)bucket.get(key, RawJsonDocument.class)).map(doc -> new ByteArrayInputStream(((String)doc.content()).getBytes(StandardCharsets.UTF_8)));
                    break;
                }
                default: {
                    return Optional.empty();
                }
            }
        }
        catch (CouchbaseException e) {
            throw new LookupFailureException("Failed to lookup from Couchbase using this coordinates: " + coordinates);
        }
        Optional<Tuple> errOrReader = inputStream.map(in -> {
            try {
                HashMap recordReaderVariables = new HashMap(coordinates.size());
                coordinates.keySet().forEach(k -> {
                    Object value = coordinates.get(k);
                    if (value != null) {
                        recordReaderVariables.put(k, value.toString());
                    }
                });
                return new Tuple(null, (Object)this.readerFactory.createRecordReader(recordReaderVariables, in, -1L, this.getLogger()));
            }
            catch (Exception e) {
                return new Tuple((Object)e, null);
            }
        });
        if (!errOrReader.isPresent()) {
            return Optional.empty();
        }
        Exception exception = (Exception)errOrReader.get().getKey();
        if (exception != null) {
            throw new LookupFailureException(String.format("Failed to lookup with %s", coordinates), (Throwable)exception);
        }
        try {
            return Optional.ofNullable(((RecordReader)errOrReader.get().getValue()).nextRecord());
        }
        catch (Exception e) {
            throw new LookupFailureException(String.format("Failed to read Record when looking up with %s", coordinates), (Throwable)e);
        }
    }
}

