/*
 * Decompiled with CFR 0.152.
 */
package org.tikv.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.flink.shaded.guava31.com.google.common.base.Preconditions;
import org.apache.flink.shaded.guava31.com.google.common.collect.Range;
import org.apache.flink.shaded.guava31.com.google.common.collect.TreeMultiset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.cdc.CDCConfig;
import org.tikv.cdc.CDCEvent;
import org.tikv.cdc.RegionCDCClient;
import org.tikv.common.TiSession;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.shade.io.grpc.ManagedChannel;

public class CDCClient
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class);
    private final TiSession session;
    private final Coprocessor.KeyRange keyRange;
    private final CDCConfig config;
    private final BlockingQueue<CDCEvent> eventsBuffer;
    private final ConcurrentHashMap<Long, RegionCDCClient> regionClients = new ConcurrentHashMap();
    private final Map<Long, Long> regionToResolvedTs = new HashMap<Long, Long>();
    private final TreeMultiset<Long> resolvedTsSet = TreeMultiset.create();
    private boolean started = false;
    private Consumer<CDCEvent> eventConsumer;

    public CDCClient(TiSession session, Coprocessor.KeyRange keyRange) {
        this(session, keyRange, new CDCConfig());
    }

    public CDCClient(TiSession session, Coprocessor.KeyRange keyRange, CDCConfig config) {
        Preconditions.checkState(session.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI), "Unsupported Isolation Level");
        this.session = session;
        this.keyRange = keyRange;
        this.config = config;
        this.eventsBuffer = new LinkedBlockingQueue<CDCEvent>(config.getEventBufferSize());
        this.eventConsumer = event -> {
            for (int i = 0; i < 2; ++i) {
                if (!this.eventsBuffer.offer((CDCEvent)event)) continue;
                return;
            }
            try {
                this.eventsBuffer.put((CDCEvent)event);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }

    public synchronized void start(long startTs) {
        Preconditions.checkState(!this.started, "Client is already started");
        this.applyKeyRange(this.keyRange, startTs);
        this.started = true;
    }

    public synchronized Cdcpb.Event.Row get() throws InterruptedException {
        CDCEvent event = (CDCEvent)this.eventsBuffer.poll();
        if (event != null) {
            switch (event.eventType) {
                case ROW: {
                    return event.row;
                }
                case RESOLVED_TS: {
                    this.handleResolvedTs(event.regionId, event.resolvedTs);
                    break;
                }
                case ERROR: {
                    this.handleErrorEvent(event.regionId, event.error, event.resolvedTs);
                }
            }
        }
        return null;
    }

    public synchronized long getMinResolvedTs() {
        return (Long)this.resolvedTsSet.firstEntry().getElement();
    }

    public synchronized long getMaxResolvedTs() {
        return (Long)this.resolvedTsSet.lastEntry().getElement();
    }

    @Override
    public synchronized void close() {
        this.removeRegions(this.regionClients.keySet());
    }

    private synchronized void applyKeyRange(Coprocessor.KeyRange keyRange, long timestamp) {
        RegionCDCClient oldRegionClient;
        RangeSplitter splitter = RangeSplitter.newSplitter(this.session.getRegionManager());
        Iterator newRegionsIterator = splitter.splitRangeByRegion(Arrays.asList(keyRange)).stream().map(RangeSplitter.RegionTask::getRegion).sorted((a, b) -> Long.compare(a.getId(), b.getId())).iterator();
        Iterator<RegionCDCClient> oldRegionsIterator = this.regionClients.values().iterator();
        ArrayList<TiRegion> regionsToAdd = new ArrayList<TiRegion>();
        ArrayList<Long> regionsToRemove = new ArrayList<Long>();
        TiRegion newRegion = newRegionsIterator.hasNext() ? (TiRegion)newRegionsIterator.next() : null;
        RegionCDCClient regionCDCClient = oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
        while (newRegion != null && oldRegionClient != null) {
            if (newRegion.getId() == oldRegionClient.getRegion().getId()) {
                if (!oldRegionClient.isRunning()) {
                    regionsToRemove.add(newRegion.getId());
                    regionsToAdd.add(newRegion);
                }
                newRegion = newRegionsIterator.hasNext() ? (TiRegion)newRegionsIterator.next() : null;
                oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
                continue;
            }
            if (newRegion.getId() < oldRegionClient.getRegion().getId()) {
                regionsToAdd.add(newRegion);
                newRegion = newRegionsIterator.hasNext() ? (TiRegion)newRegionsIterator.next() : null;
                continue;
            }
            regionsToRemove.add(oldRegionClient.getRegion().getId());
            oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
        }
        while (newRegion != null) {
            regionsToAdd.add(newRegion);
            newRegion = newRegionsIterator.hasNext() ? (TiRegion)newRegionsIterator.next() : null;
        }
        while (oldRegionClient != null) {
            regionsToRemove.add(oldRegionClient.getRegion().getId());
            oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
        }
        this.removeRegions(regionsToRemove);
        this.addRegions(regionsToAdd, timestamp);
        LOGGER.info("keyRange applied");
    }

    private synchronized void addRegions(Iterable<TiRegion> regions, long timestamp) {
        LOGGER.info("add regions: {}, timestamp: {}", regions, (Object)timestamp);
        for (TiRegion region : regions) {
            if (!this.overlapWithRegion(region)) continue;
            String address = this.session.getRegionManager().getStoreById(region.getLeader().getStoreId()).getStore().getAddress();
            ManagedChannel channel = this.session.getChannelFactory().getChannel(address, this.session.getPDClient().getHostMapping());
            try {
                RegionCDCClient client = new RegionCDCClient(region, this.keyRange, channel, this.eventConsumer, this.config);
                this.regionClients.put(region.getId(), client);
                this.regionToResolvedTs.put(region.getId(), timestamp);
                this.resolvedTsSet.add(timestamp);
                client.start(timestamp);
            }
            catch (Exception e) {
                LOGGER.error("failed to add region(regionId: {}, reason: {})", (Object)region.getId(), (Object)e);
                throw new RuntimeException(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void removeRegions(Iterable<Long> regionIds) {
        LOGGER.info("remove regions: {}", regionIds);
        for (long regionId : regionIds) {
            RegionCDCClient regionClient = this.regionClients.remove(regionId);
            if (regionClient == null) continue;
            try {
                regionClient.close();
            }
            catch (Exception e) {
                LOGGER.error("failed to close region client, region id: {}, error: {}", (Object)regionId, (Object)e);
            }
            finally {
                this.resolvedTsSet.remove(this.regionToResolvedTs.remove(regionId));
                this.regionToResolvedTs.remove(regionId);
            }
        }
    }

    private boolean overlapWithRegion(TiRegion region) {
        Range<Key> clientRange;
        Range<Key> regionRange = Range.closedOpen(Key.toRawKey(region.getStartKey()), Key.toRawKey(region.getEndKey()));
        Range<Key> intersection = regionRange.intersection(clientRange = Range.closedOpen(Key.toRawKey(this.keyRange.getStart()), Key.toRawKey(this.keyRange.getEnd())));
        return !intersection.isEmpty();
    }

    private void handleResolvedTs(long regionId, long resolvedTs) {
        LOGGER.info("handle resolvedTs: {}, regionId: {}", (Object)resolvedTs, (Object)regionId);
        this.resolvedTsSet.remove(this.regionToResolvedTs.replace(regionId, resolvedTs));
        this.resolvedTsSet.add(resolvedTs);
    }

    public void handleErrorEvent(long regionId, Throwable error, long resolvedTs) {
        LOGGER.info("handle error: {}, regionId: {}", (Object)error, (Object)regionId);
        TiRegion region = this.regionClients.get(regionId).getRegion();
        this.session.getRegionManager().onRequestFail(region);
        this.removeRegions(Arrays.asList(regionId));
        this.applyKeyRange(this.keyRange, resolvedTs);
    }
}

