/*
 * Decompiled with CFR 0.152.
 */
package com.hurence.opc.da;

import com.hurence.opc.OpcData;
import com.hurence.opc.OpcSession;
import com.hurence.opc.OperationStatus;
import com.hurence.opc.SubscriptionConfiguration;
import com.hurence.opc.da.JIVariantMarshaller;
import com.hurence.opc.da.OpcDaQualityExtractor;
import com.hurence.opc.da.OpcDaSessionProfile;
import com.hurence.opc.da.OpcDaTemplate;
import com.hurence.opc.exception.OpcException;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jinterop.dcom.common.JIException;
import org.jinterop.dcom.core.JIVariant;
import org.openscada.opc.dcom.common.KeyedResult;
import org.openscada.opc.dcom.common.KeyedResultSet;
import org.openscada.opc.dcom.common.Result;
import org.openscada.opc.dcom.common.ResultSet;
import org.openscada.opc.dcom.da.OPCDATASOURCE;
import org.openscada.opc.dcom.da.OPCITEMDEF;
import org.openscada.opc.dcom.da.OPCITEMRESULT;
import org.openscada.opc.dcom.da.WriteRequest;
import org.openscada.opc.dcom.da.impl.OPCGroupStateMgt;
import org.openscada.opc.dcom.da.impl.OPCItemMgt;
import org.openscada.opc.dcom.da.impl.OPCServer;
import org.openscada.opc.dcom.da.impl.OPCSyncIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpcDaSession
implements OpcSession {
    private static final Logger logger = LoggerFactory.getLogger(OpcDaSession.class);
    private OPCGroupStateMgt group;
    private Map<String, Map.Entry<Integer, Integer>> handlesMap = new HashMap<String, Map.Entry<Integer, Integer>>();
    private static final AtomicInteger clientHandleCounter = new AtomicInteger();
    private OPCSyncIO syncIO;
    private OPCItemMgt opcItemMgt;
    private OPCDATASOURCE datasource;
    private final WeakReference<OpcDaTemplate> creatingOperations;
    private final Map<String, Short> dataTypeMap;

    private OpcDaSession(OpcDaTemplate creatingOperations, OPCGroupStateMgt group, OPCDATASOURCE datasource, Map<String, Short> dataTypeMap) throws JIException {
        this.group = group;
        this.opcItemMgt = group.getItemManagement();
        this.syncIO = group.getSyncIO();
        this.datasource = datasource;
        this.creatingOperations = new WeakReference<OpcDaTemplate>(creatingOperations);
        this.dataTypeMap = dataTypeMap;
    }

    static OpcDaSession create(OPCServer server, OpcDaSessionProfile sessionProfile, OpcDaTemplate creatingOperations) {
        try {
            return new OpcDaSession(creatingOperations, server.addGroup(null, true, (int)sessionProfile.getRefreshInterval().toMillis(), clientHandleCounter.incrementAndGet(), null, null, 0), sessionProfile.isDirectRead() ? OPCDATASOURCE.OPC_DS_DEVICE : OPCDATASOURCE.OPC_DS_CACHE, sessionProfile.getDataTypeOverrideMap());
        }
        catch (Exception e) {
            throw new OpcException("Unable to create an OPC-DA session", e);
        }
    }

    public void cleanup(OPCServer opcServer) {
        logger.info("Cleaning session");
        try {
            opcServer.removeGroup(this.group, true);
        }
        catch (JIException e) {
            logger.warn("Unable to properly remove group from opc server", (Throwable)e);
            if (this.handlesMap != null) {
                this.handlesMap.clear();
            }
            this.handlesMap = null;
            this.group = null;
            this.opcItemMgt = null;
            this.syncIO = null;
        }
    }

    @Override
    public List<OpcData> read(String ... tags) {
        if (this.group == null) {
            throw new OpcException("Unable to read tags. Session has been detached!");
        }
        Map tagsHandles = Arrays.stream(tags).collect(Collectors.toMap(Function.identity(), this::resolveItemHandles));
        Map<Integer, String> mapsToClientHandles = tagsHandles.entrySet().stream().collect(Collectors.toMap(e -> (Integer)((Map.Entry)e.getValue()).getValue(), e -> (String)e.getKey()));
        try {
            KeyedResultSet result = this.syncIO.read(this.datasource, (Integer[])tagsHandles.values().stream().map(Map.Entry::getKey).toArray(Integer[]::new));
            return result.stream().map(Result::getValue).filter(value -> mapsToClientHandles.containsKey(value.getClientHandle())).map(value -> {
                try {
                    return new OpcData<Object>((String)mapsToClientHandles.get(value.getClientHandle()), value.getTimestamp().asBigDecimalCalendar().toInstant(), OpcDaQualityExtractor.quality(value.getQuality()), JIVariantMarshaller.toJavaType(value.getValue()), OpcDaQualityExtractor.operationStatus(value.getQuality()));
                }
                catch (JIException e) {
                    throw new OpcException("Unable to read tag " + value, e);
                }
            }).collect(Collectors.toList());
        }
        catch (JIException e2) {
            throw new OpcException("Unable to read tags", e2);
        }
    }

    @Override
    public List<OperationStatus> write(OpcData ... data) {
        if (this.group == null) {
            throw new OpcException("Unable to write tags. Session has been detached!");
        }
        try {
            ResultSet result = this.syncIO.write((WriteRequest[])Arrays.stream(data).map(d -> new WriteRequest(this.resolveItemHandles(d.getTag()).getKey().intValue(), JIVariant.makeVariant(d.getValue()))).toArray(WriteRequest[]::new));
            return result.stream().map(OpcDaQualityExtractor::operationStatus).collect(Collectors.toList());
        }
        catch (Exception e) {
            throw new OpcException("Unable to write data", e);
        }
    }

    @Override
    public Stream<OpcData> stream(SubscriptionConfiguration subscriptionConfiguration, String ... tags) {
        long refreshRate;
        if (this.group == null) {
            throw new OpcException("Unable to read tags. Session has been detached!");
        }
        try {
            refreshRate = this.group.getState().getUpdateRate();
        }
        catch (JIException e) {
            throw new OpcException("Unable to get revised refresh interval", e);
        }
        ScheduledExecutorService readScheduler = Executors.newSingleThreadScheduledExecutor();
        ScheduledExecutorService writeScheduler = Executors.newSingleThreadScheduledExecutor();
        AtomicBoolean inError = new AtomicBoolean();
        LinkedTransferQueue transferQueue = new LinkedTransferQueue();
        Map latestReads = Collections.synchronizedMap(new HashMap());
        Map latestWrites = Collections.synchronizedMap(new HashMap());
        readScheduler.scheduleWithFixedDelay(() -> {
            try {
                this.read(tags).forEach(opcData -> latestReads.compute(opcData.getTag(), (k, oldValue) -> oldValue == null || oldValue.getTimestamp().isBefore(opcData.getTimestamp()) ? opcData : oldValue));
            }
            catch (Exception e) {
                inError.set(true);
                throw new OpcException("Error while reading tags. Stream aborted", e);
            }
        }, 0L, refreshRate, TimeUnit.MILLISECONDS);
        Map<Duration, List<String>> streamConfigurationByTags = Arrays.stream(tags).collect(Collectors.groupingBy(subscriptionConfiguration::samplingIntervalForTag));
        streamConfigurationByTags.forEach((period, tagList) -> writeScheduler.scheduleAtFixedRate(() -> tagList.forEach(tag -> {
            try {
                OpcData lastWrite = (OpcData)latestWrites.get(tag);
                OpcData lastRead = (OpcData)latestReads.get(tag);
                if (lastRead != null && !lastRead.equals(lastWrite)) {
                    latestWrites.put(tag, lastRead);
                    transferQueue.add(lastRead);
                }
            }
            catch (Exception e) {
                inError.set(true);
                throw new OpcException("Unable to buffer data", e);
            }
        }), 0L, period.toNanos(), TimeUnit.NANOSECONDS));
        long minPollingTime = streamConfigurationByTags.keySet().stream().mapToLong(Duration::toNanos).min().getAsLong();
        return (Stream)Stream.generate(() -> {
            if (inError.get()) {
                throw new OpcException("EOF reading from the steam.");
            }
            try {
                return (OpcData)transferQueue.poll(minPollingTime, TimeUnit.NANOSECONDS);
            }
            catch (InterruptedException ie) {
                throw new OpcException("Interrupted reading from the steam.", ie);
            }
        }).filter(opcData -> opcData != null).onClose(() -> {
            readScheduler.shutdown();
            writeScheduler.shutdown();
        });
    }

    private synchronized Map.Entry<Integer, Integer> resolveItemHandles(String tag) {
        Map.Entry<Integer, Integer> handles = this.handlesMap.get(tag);
        if (handles == null) {
            OPCITEMDEF opcitemdef = new OPCITEMDEF();
            opcitemdef.setActive(true);
            opcitemdef.setClientHandle(clientHandleCounter.incrementAndGet());
            opcitemdef.setItemID(tag);
            opcitemdef.setRequestedDataType(this.dataTypeMap.getOrDefault(tag, (short)0).shortValue());
            try {
                Integer serverHandle = ((OPCITEMRESULT)((KeyedResult)this.opcItemMgt.add(new OPCITEMDEF[]{opcitemdef}).get(0)).getValue()).getServerHandle();
                if (serverHandle == null || serverHandle == 0) {
                    throw new OpcException("Received invalid handle from OPC server.");
                }
                handles = new AbstractMap.SimpleEntry<Integer, Integer>(serverHandle, opcitemdef.getClientHandle());
            }
            catch (Exception e) {
                throw new OpcException("Unable to add item " + tag, e);
            }
            this.handlesMap.put(tag, handles);
        }
        return handles;
    }

    @Override
    public void close() {
        if (this.creatingOperations != null && this.creatingOperations.get() != null) {
            try {
                ((OpcDaTemplate)this.creatingOperations.get()).releaseSession(this);
            }
            finally {
                this.creatingOperations.clear();
            }
        }
    }
}

