/*
 * Decompiled with CFR 0.152.
 */
package ru.yandex.kikimr.persqueue.consumer.stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.kikimr.persqueue.consumer.ConsumerSessionConfig;
import ru.yandex.kikimr.persqueue.consumer.StreamConsumer;
import ru.yandex.kikimr.persqueue.consumer.StreamListener;
import ru.yandex.kikimr.persqueue.consumer.internal.ThreadBarrier;
import ru.yandex.kikimr.persqueue.consumer.internal.commit.Committer;
import ru.yandex.kikimr.persqueue.consumer.internal.commit.SyncCommitter;
import ru.yandex.kikimr.persqueue.consumer.internal.read.AsyncReader;
import ru.yandex.kikimr.persqueue.consumer.internal.read.Reader;
import ru.yandex.kikimr.persqueue.consumer.stream.StreamConsumerConfig;
import ru.yandex.kikimr.persqueue.consumer.transport.ConsumerMessageListener;
import ru.yandex.kikimr.persqueue.consumer.transport.ConsumerTransport;
import ru.yandex.kikimr.persqueue.consumer.transport.message.CommitMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerInitResponse;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerLockMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerReadResponse;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerReleaseMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.outbound.ConsumerInitRequest;
import ru.yandex.kikimr.persqueue.consumer.transport.message.outbound.ConsumerLockedMessage;

public class LogbrokerStreamConsumer
implements StreamConsumer {
    private static final Logger logger = LoggerFactory.getLogger(LogbrokerStreamConsumer.class);
    private final Reader reader;
    private final Committer committer;
    private final ConsumerTransport transport;
    private final StreamConsumerConfig config;

    public LogbrokerStreamConsumer(StreamConsumerConfig streamConsumerConfig, ConsumerTransport transport) {
        this.transport = transport;
        this.config = streamConsumerConfig;
        ThreadBarrier readThreadBarrier = new ThreadBarrier(true);
        this.reader = new AsyncReader(transport, readThreadBarrier, streamConsumerConfig.getReaderConfig());
        this.committer = new SyncCommitter(transport, readThreadBarrier, streamConsumerConfig.getCommitterConfig());
    }

    @Override
    public void startConsume(StreamListener listener) {
        this.transport.openConnection(new InternalConsumerMessageListener(listener));
        ConsumerSessionConfig sessionConfig = this.config.getSessionConfig();
        this.transport.sendInit(new ConsumerInitRequest(sessionConfig.getTopics(), sessionConfig.isReadOnlyLocal(), sessionConfig.getDataCenters(), sessionConfig.getClientId(), sessionConfig.isClientSideLocksAllowed(), sessionConfig.isForceBalancePartitions(), sessionConfig.getGroups(), sessionConfig.getIdleTimeoutSec()));
        this.reader.start();
        this.committer.start();
    }

    @Override
    public void stopConsume() {
        this.reader.stop();
        this.committer.stop();
        this.transport.closeConnection();
    }

    private class InternalConsumerMessageListener
    implements ConsumerMessageListener {
        private final StreamListener listener;

        InternalConsumerMessageListener(StreamListener listener) {
            this.listener = listener;
        }

        @Override
        public void onInit(ConsumerInitResponse init) {
            this.listener.onInit(init);
        }

        @Override
        public void onData(ConsumerReadResponse read) {
            LogbrokerStreamConsumer.this.reader.dataReceived(read);
            LogbrokerStreamConsumer.this.committer.dataReceived(read);
            LogbrokerStreamConsumer.this.config.getExecutor().submit(() -> {
                long cookie = read.getCookie();
                try {
                    this.listener.onRead(read, () -> LogbrokerStreamConsumer.this.committer.commitData(cookie));
                }
                finally {
                    LogbrokerStreamConsumer.this.reader.dataConsumed(read);
                }
            });
        }

        @Override
        public void onCommit(CommitMessage commit) {
            LogbrokerStreamConsumer.this.committer.commitAckReceived(commit);
            LogbrokerStreamConsumer.this.config.getExecutor().submit(() -> this.listener.onCommit(commit));
        }

        @Override
        public void onLock(ConsumerLockMessage lock) {
            LogbrokerStreamConsumer.this.config.getExecutor().submit(() -> this.listener.onLock(lock, (readOffset, verifyOffset) -> LogbrokerStreamConsumer.this.transport.sendLocked(new ConsumerLockedMessage(lock.getTopic(), lock.getPartition(), readOffset, verifyOffset, lock.getGeneration()))));
        }

        @Override
        public void onRelease(ConsumerReleaseMessage release) {
            LogbrokerStreamConsumer.this.config.getExecutor().submit(() -> this.listener.onRelease(release));
        }

        @Override
        public void onError(Throwable t) {
            this.listener.onError(t);
        }

        @Override
        public void onCompleted() {
            this.listener.onClose();
        }
    }
}

