/*
 * Decompiled with CFR 0.152.
 */
package ru.yandex.kikimr.persqueue.consumer.internal.read;

import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.kikimr.persqueue.consumer.internal.ThreadBarrier;
import ru.yandex.kikimr.persqueue.consumer.internal.read.Reader;
import ru.yandex.kikimr.persqueue.consumer.internal.read.ReaderConfig;
import ru.yandex.kikimr.persqueue.consumer.transport.ConsumerTransport;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerReadResponse;
import ru.yandex.kikimr.persqueue.consumer.transport.message.outbound.ConsumerReadRequest;

public class AsyncReader
implements Reader {
    private static final Logger logger = LoggerFactory.getLogger(AsyncReader.class);
    private final Thread thread;
    private final Semaphore inflightReads;
    private final Semaphore unconsumedReads;
    private final ThreadBarrier readThreadBarrier;
    private final ReaderConfig config;

    public AsyncReader(ConsumerTransport messageSender, ThreadBarrier readThreadBarrier, ReaderConfig config) {
        this.config = config;
        this.inflightReads = new Semaphore(config.getMaxInflightReads());
        this.unconsumedReads = new Semaphore(config.getMaxUnconsumedReads());
        this.readThreadBarrier = readThreadBarrier;
        this.thread = new Thread(new ReadRunnable(messageSender));
    }

    @Override
    public void dataReceived(ConsumerReadResponse data) {
        this.inflightReads.release();
    }

    @Override
    public void dataConsumed(ConsumerReadResponse data) {
        this.unconsumedReads.release();
    }

    @Override
    public void start() {
        this.thread.start();
    }

    @Override
    public void stop() {
        this.thread.interrupt();
        Uninterruptibles.joinUninterruptibly((Thread)this.thread);
    }

    private class ReadRunnable
    implements Runnable {
        private final ConsumerTransport transport;

        ReadRunnable(ConsumerTransport transport) {
            this.transport = transport;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    AsyncReader.this.inflightReads.acquire();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                try {
                    AsyncReader.this.unconsumedReads.acquire();
                }
                catch (InterruptedException e) {
                    AsyncReader.this.inflightReads.release();
                    Thread.currentThread().interrupt();
                    break;
                }
                try {
                    AsyncReader.this.readThreadBarrier.passThrough();
                }
                catch (InterruptedException e) {
                    AsyncReader.this.unconsumedReads.release();
                    AsyncReader.this.inflightReads.release();
                    Thread.currentThread().interrupt();
                    break;
                }
                try {
                    this.transport.sendRead(new ConsumerReadRequest(AsyncReader.this.config.getMaxCount(), AsyncReader.this.config.getMaxSize(), AsyncReader.this.config.getPartitionsAtOnce(), AsyncReader.this.config.getMaxTimeLagMs()));
                }
                catch (Exception e) {
                    AsyncReader.this.inflightReads.release();
                    AsyncReader.this.unconsumedReads.release();
                    logger.error("Error sending read", (Throwable)e);
                }
            }
        }
    }
}

