/*
 * Decompiled with CFR 0.152.
 */
package ru.yandex.kikimr.persqueue.consumer.sync;

import com.google.common.util.concurrent.Uninterruptibles;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.kikimr.persqueue.consumer.ConsumerSessionConfig;
import ru.yandex.kikimr.persqueue.consumer.SyncConsumer;
import ru.yandex.kikimr.persqueue.consumer.internal.ThreadBarrier;
import ru.yandex.kikimr.persqueue.consumer.internal.commit.Committer;
import ru.yandex.kikimr.persqueue.consumer.internal.commit.SyncCommitter;
import ru.yandex.kikimr.persqueue.consumer.internal.read.AsyncReader;
import ru.yandex.kikimr.persqueue.consumer.internal.read.Reader;
import ru.yandex.kikimr.persqueue.consumer.sync.ConsumerStreamClosedException;
import ru.yandex.kikimr.persqueue.consumer.sync.SyncConsumerConfig;
import ru.yandex.kikimr.persqueue.consumer.transport.ConsumerMessageListener;
import ru.yandex.kikimr.persqueue.consumer.transport.ConsumerTransport;
import ru.yandex.kikimr.persqueue.consumer.transport.message.CommitMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerInitResponse;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerLockMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerReadResponse;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerReleaseMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.outbound.ConsumerInitRequest;

public class LogbrokerSyncConsumer
implements SyncConsumer {
    private static final Logger logger = LoggerFactory.getLogger(LogbrokerSyncConsumer.class);
    private final CompletableFuture<ConsumerInitResponse> initFuture;
    private final CompletableFuture<Void> closeFuture;
    private final ReadWriteLock closeStreamLock;
    private final Reader reader;
    private final Committer committer;
    private final ConsumerTransport transport;
    private final BlockingQueue<ConsumerReadResponse> readBuffer;
    private final Map<Long, CompletableFuture<Void>> inflightCommits;
    private final SyncConsumerConfig config;

    public LogbrokerSyncConsumer(SyncConsumerConfig syncConsumerConfig, ConsumerTransport transport) {
        this.config = syncConsumerConfig;
        if (this.config.getSessionConfig().isClientSideLocksAllowed()) {
            throw new IllegalArgumentException("Client side locks is not supported by sync consumer");
        }
        if (this.config.getReadBufferSize() < this.config.getReaderConfig().getMaxUnconsumedReads()) {
            throw new IllegalArgumentException("Read buffer size is less than max permitted unconsumed reads count, that could lead to deadlocks or data loss.");
        }
        this.transport = transport;
        this.initFuture = new CompletableFuture();
        this.closeFuture = new CompletableFuture();
        this.closeStreamLock = new ReentrantReadWriteLock();
        this.readBuffer = new LinkedBlockingQueue<ConsumerReadResponse>(this.config.getReadBufferSize());
        this.inflightCommits = new ConcurrentHashMap<Long, CompletableFuture<Void>>();
        ThreadBarrier readThreadBarrier = new ThreadBarrier(true);
        this.reader = new AsyncReader(transport, readThreadBarrier, syncConsumerConfig.getReaderConfig());
        this.committer = new SyncCommitter(transport, readThreadBarrier, syncConsumerConfig.getCommitterConfig());
    }

    @Override
    public ConsumerInitResponse init() throws InterruptedException, TimeoutException {
        this.transport.openConnection(new InternalConsumerMessageListener());
        ConsumerSessionConfig sessionConfig = this.config.getSessionConfig();
        this.transport.sendInit(new ConsumerInitRequest(sessionConfig.getTopics(), sessionConfig.isReadOnlyLocal(), sessionConfig.getDataCenters(), sessionConfig.getClientId(), sessionConfig.isClientSideLocksAllowed(), sessionConfig.isForceBalancePartitions(), sessionConfig.getGroups(), sessionConfig.getIdleTimeoutSec()));
        try {
            return this.initFuture.get(this.config.getInitTimeout(), this.config.getInitTimeoutUnit());
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e.getCause());
        }
    }

    @Override
    public ConsumerReadResponse read() throws InterruptedException {
        this.checkNotClosed();
        ConsumerReadResponse response = this.readBuffer.poll(this.config.getReadDataTimeout(), this.config.getReadDataTimeoutUnit());
        if (response != null) {
            this.reader.dataConsumed(response);
        }
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit(List<Long> cookies) throws InterruptedException, TimeoutException {
        List<CompletableFuture<Void>> futures;
        this.closeStreamLock.readLock().lock();
        try {
            this.checkNotClosed();
            futures = this.committer.commitData(cookies);
            for (int i = 0; i < cookies.size(); ++i) {
                this.inflightCommits.put(cookies.get(i), futures.get(i));
            }
        }
        finally {
            this.closeStreamLock.readLock().unlock();
        }
        CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        try {
            future.get(this.config.getCommitTimeout(), this.config.getCommitTimeoutUnit());
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e.getCause());
        }
        finally {
            cookies.forEach(this.inflightCommits::remove);
        }
    }

    @Override
    public boolean isClosed() {
        return this.closeFuture.isDone();
    }

    private void checkNotClosed() {
        if (this.closeFuture.isDone()) {
            if (this.closeFuture.isCompletedExceptionally()) {
                this.closeFuture.getNow(null);
            } else {
                throw new ConsumerStreamClosedException();
            }
        }
    }

    @Override
    public void close() {
        this.stopThreads();
        this.transport.closeConnection();
        try {
            Uninterruptibles.getUninterruptibly(this.closeFuture);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof RuntimeException) {
                throw (RuntimeException)e.getCause();
            }
            throw new RuntimeException(e.getCause());
        }
    }

    private void onInboundChannelClose(Throwable cause) {
        if (!this.inflightCommits.isEmpty()) {
            ConsumerStreamClosedException exception = new ConsumerStreamClosedException(cause);
            for (CompletableFuture<Void> future : this.inflightCommits.values()) {
                future.completeExceptionally(exception);
            }
        }
    }

    private void stopThreads() {
        this.reader.stop();
        this.committer.stop();
    }

    private class InternalConsumerMessageListener
    implements ConsumerMessageListener {
        private InternalConsumerMessageListener() {
        }

        @Override
        public void onInit(ConsumerInitResponse init) {
            LogbrokerSyncConsumer.this.initFuture.complete(init);
            LogbrokerSyncConsumer.this.reader.start();
            LogbrokerSyncConsumer.this.committer.start();
        }

        @Override
        public void onData(ConsumerReadResponse read) {
            LogbrokerSyncConsumer.this.reader.dataReceived(read);
            LogbrokerSyncConsumer.this.committer.dataReceived(read);
            LogbrokerSyncConsumer.this.readBuffer.add(read);
        }

        @Override
        public void onCommit(CommitMessage commit) {
            LogbrokerSyncConsumer.this.committer.commitAckReceived(commit);
        }

        @Override
        public void onLock(ConsumerLockMessage lock) {
            logger.warn("Got unexpected LOCK message");
        }

        @Override
        public void onRelease(ConsumerReleaseMessage release) {
            logger.warn("Got unexpected RELEASE message");
        }

        @Override
        public void onError(Throwable t) {
            LogbrokerSyncConsumer.this.stopThreads();
            LogbrokerSyncConsumer.this.closeStreamLock.writeLock().lock();
            try {
                LogbrokerSyncConsumer.this.onInboundChannelClose(t);
                LogbrokerSyncConsumer.this.closeFuture.completeExceptionally(t);
            }
            finally {
                LogbrokerSyncConsumer.this.closeStreamLock.writeLock().unlock();
            }
        }

        @Override
        public void onCompleted() {
            LogbrokerSyncConsumer.this.stopThreads();
            LogbrokerSyncConsumer.this.closeStreamLock.writeLock().lock();
            try {
                LogbrokerSyncConsumer.this.onInboundChannelClose(null);
                LogbrokerSyncConsumer.this.closeFuture.complete(null);
            }
            finally {
                LogbrokerSyncConsumer.this.closeStreamLock.writeLock().unlock();
            }
        }
    }
}

