/*
 * Decompiled with CFR 0.152.
 */
package ru.yandex.kikimr.persqueue.consumer.internal.commit;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import ru.yandex.kikimr.persqueue.consumer.internal.ThreadBarrier;
import ru.yandex.kikimr.persqueue.consumer.internal.commit.Committer;
import ru.yandex.kikimr.persqueue.consumer.internal.commit.CommitterConfig;
import ru.yandex.kikimr.persqueue.consumer.transport.ConsumerTransport;
import ru.yandex.kikimr.persqueue.consumer.transport.message.CommitMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerReadResponse;

public class SyncCommitter
implements Committer {
    private final ThreadBarrier readThreadBarrier;
    private final ConcurrentHashMap<Long, CompletableFuture<Void>> inflightCommits;
    private final CommitterConfig config;
    private final ConsumerTransport messageSender;
    private final AtomicInteger uncommittedReadsCount;

    public SyncCommitter(ConsumerTransport messageSender, ThreadBarrier readThreadBarrier, CommitterConfig config) {
        this.messageSender = messageSender;
        this.readThreadBarrier = readThreadBarrier;
        this.config = config;
        this.inflightCommits = new ConcurrentHashMap();
        this.uncommittedReadsCount = new AtomicInteger(0);
    }

    @Override
    public void dataReceived(ConsumerReadResponse data) {
        if (this.uncommittedReadsCount.incrementAndGet() == this.config.getMaxUncommittedReads()) {
            this.readThreadBarrier.close();
        }
    }

    @Override
    public void commitAckReceived(CommitMessage commit) {
        for (Long cookie : commit.getCookies()) {
            CompletableFuture<Void> commitAckFuture = this.inflightCommits.remove(cookie);
            if (commitAckFuture == null) {
                throw new IllegalStateException("ConsumerMessage with cookie " + cookie + " received ack that wasn't expected");
            }
            commitAckFuture.complete(null);
        }
        int commitsCount = commit.getCookies().size();
        int previousUncommittedReadsCount = this.uncommittedReadsCount.getAndAdd(-commitsCount);
        int currentUncommittedReadsCount = previousUncommittedReadsCount - commitsCount;
        if (previousUncommittedReadsCount >= this.config.getMaxUncommittedReads() && currentUncommittedReadsCount < this.config.getMaxUncommittedReads()) {
            this.readThreadBarrier.open();
        }
    }

    @Override
    public List<CompletableFuture<Void>> commitData(List<Long> cookies) {
        ArrayList<CompletableFuture<Void>> results = new ArrayList<CompletableFuture<Void>>(cookies.size());
        for (Long cookie : cookies) {
            CompletableFuture commitAckFuture = new CompletableFuture();
            results.add(commitAckFuture);
            this.inflightCommits.put(cookie, commitAckFuture);
        }
        this.messageSender.sendCommit(new CommitMessage(cookies));
        return results;
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }
}

