/*
 * Decompiled with CFR 0.152.
 */
package ru.yandex.kikimr.persqueue.consumer.transport;

import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.kikimr.persqueue.auth.Credentials;
import ru.yandex.kikimr.persqueue.compression.CompressionCodec;
import ru.yandex.kikimr.persqueue.consumer.ConsumerException;
import ru.yandex.kikimr.persqueue.consumer.transport.ConsumerMessageListener;
import ru.yandex.kikimr.persqueue.consumer.transport.ConsumerTransport;
import ru.yandex.kikimr.persqueue.consumer.transport.message.CommitMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerInitResponse;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerLockMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerReadResponse;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.ConsumerReleaseMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.data.MessageBatch;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.data.MessageData;
import ru.yandex.kikimr.persqueue.consumer.transport.message.inbound.data.MessageMeta;
import ru.yandex.kikimr.persqueue.consumer.transport.message.outbound.ConsumerInitRequest;
import ru.yandex.kikimr.persqueue.consumer.transport.message.outbound.ConsumerLockedMessage;
import ru.yandex.kikimr.persqueue.consumer.transport.message.outbound.ConsumerReadRequest;
import ru.yandex.kikimr.persqueue.proxy.ProxyConfig;
import ru.yandex.persqueue.PersQueueServiceGrpc;
import ru.yandex.ydb.persqueue.Persqueue;

@ThreadSafe
public class LogbrokerConsumerTransport
implements ConsumerTransport {
    private static final Logger logger = LoggerFactory.getLogger(LogbrokerConsumerTransport.class);
    private String sessionId;
    private Credentials lastCredentials;
    private final ProxyConfig proxyConfig;
    private final Supplier<Credentials> credentialsProvider;
    private StreamObserver<Persqueue.ReadRequest> outboundStreamObserver;
    private ManagedChannel channel;
    private PersQueueServiceGrpc.PersQueueServiceStub serverStub;

    public LogbrokerConsumerTransport(ProxyConfig proxyConfig, Supplier<Credentials> credentialsProvider) {
        this.proxyConfig = proxyConfig;
        this.credentialsProvider = credentialsProvider;
    }

    @Override
    public synchronized void openConnection(ConsumerMessageListener listener) {
        logger.debug("Establishing grpc channel");
        this.channel = ManagedChannelBuilder.forAddress((String)this.proxyConfig.getProxyHost(), (int)this.proxyConfig.getProxyPort()).usePlaintext().build();
        this.serverStub = PersQueueServiceGrpc.newStub((Channel)this.channel);
        logger.debug("Establishing read session");
        this.outboundStreamObserver = this.serverStub.readSession((StreamObserver)new InboundStreamObserver(listener));
    }

    @Override
    public synchronized void sendInit(ConsumerInitRequest init) {
        Persqueue.ReadRequest.Init.Builder initBuilder = Persqueue.ReadRequest.Init.newBuilder().addAllTopics(init.getTopics()).setReadOnlyLocal(init.isReadOnlyLocal()).addAllDataCenters(init.getDataCenters()).setClientId(init.getClientId()).setClientsideLocksAllowed(init.isClientSideLocksAllowed()).setProxyCookie(this.proxyConfig.getProxyCookie()).setBalancePartitionRightNow(init.isForceBalancePartitions()).addAllPartitionGroups(init.getGroups()).setIdleTimeoutSec(init.getIdleTimeoutSec());
        Persqueue.ReadRequest initRequest = this.addCredentials(Persqueue.ReadRequest.newBuilder()).setInit(initBuilder).build();
        logger.debug("Sending init message {}", (Object)initRequest);
        this.outboundStreamObserver.onNext((Object)initRequest);
    }

    @Override
    public synchronized void sendRead(ConsumerReadRequest read) {
        Persqueue.ReadRequest.Read.Builder readBuilder = Persqueue.ReadRequest.Read.newBuilder().setMaxCount(read.getMaxCount()).setMaxSize(read.getMaxSize()).setPartitionsAtOnce(read.getPartitionsAtOnce()).setMaxTimeLagMs(read.getMaxTimeLagMs());
        Persqueue.ReadRequest readRequest = this.addCredentials(Persqueue.ReadRequest.newBuilder()).setRead(readBuilder).build();
        logger.debug("Sending read message {}", (Object)readRequest);
        this.outboundStreamObserver.onNext((Object)readRequest);
    }

    @Override
    public synchronized void sendCommit(CommitMessage commit) {
        Persqueue.ReadRequest.Commit.Builder commitBuilder = Persqueue.ReadRequest.Commit.newBuilder().addAllCookie(commit.getCookies());
        Persqueue.ReadRequest commitRequest = this.addCredentials(Persqueue.ReadRequest.newBuilder()).setCommit(commitBuilder).build();
        logger.debug("Sending commit message {}", (Object)commitRequest);
        this.outboundStreamObserver.onNext((Object)commitRequest);
    }

    @Override
    public synchronized void sendLocked(ConsumerLockedMessage locked) {
        Persqueue.ReadRequest.StartRead.Builder lockedBuilder = Persqueue.ReadRequest.StartRead.newBuilder().setTopic(locked.getTopic()).setPartition(locked.getPartition()).setReadOffset(locked.getReadOffset()).setVerifyReadOffset(locked.isVerifyOffset()).setGeneration(locked.getGeneration());
        Persqueue.ReadRequest lockedRequest = this.addCredentials(Persqueue.ReadRequest.newBuilder()).setStartRead(lockedBuilder).build();
        logger.debug("Sending locked message {}", (Object)lockedRequest);
        this.outboundStreamObserver.onNext((Object)lockedRequest);
    }

    private Persqueue.ReadRequest.Builder addCredentials(Persqueue.ReadRequest.Builder builder) {
        Credentials credentials;
        if (this.credentialsProvider != null && !Objects.equals(this.lastCredentials, credentials = this.credentialsProvider.get())) {
            this.lastCredentials = credentials;
            builder = builder.setCredentials(LogbrokerConsumerTransport.toPersqueueCredentials(credentials));
        }
        return builder;
    }

    private static Persqueue.Credentials toPersqueueCredentials(Credentials credentials) {
        ByteString value = ByteString.copyFromUtf8((String)credentials.getValue());
        switch (credentials.getType()) {
            case NONE: {
                return Persqueue.Credentials.newBuilder().build();
            }
            case OAUTH: {
                return Persqueue.Credentials.newBuilder().setOauthToken(value).build();
            }
            case TVM: {
                return Persqueue.Credentials.newBuilder().setTvmServiceTicket(value).build();
            }
        }
        throw new IllegalStateException("Unknown credentials type");
    }

    @Override
    public synchronized void closeConnection() {
        logger.debug("Closing outbound connection for session {}", (Object)this.sessionId);
        this.outboundStreamObserver.onCompleted();
        this.channel.shutdown();
    }

    public class InboundStreamObserver
    implements StreamObserver<Persqueue.ReadResponse> {
        private final ConsumerMessageListener listener;

        InboundStreamObserver(ConsumerMessageListener listener) {
            this.listener = listener;
        }

        public void onNext(Persqueue.ReadResponse value) {
            logger.trace("Got inbound message {}", (Object)value);
            switch (value.getResponseCase()) {
                case INIT: {
                    ConsumerInitResponse init = this.convertInit(value.getInit());
                    LogbrokerConsumerTransport.this.sessionId = init.getSessionId();
                    this.listener.onInit(init);
                    break;
                }
                case DATA: {
                    this.listener.onData(this.convertData(value.getData()));
                    break;
                }
                case COMMIT: {
                    this.listener.onCommit(this.convertCommit(value.getCommit()));
                    break;
                }
                case LOCK: {
                    this.listener.onLock(this.convertLock(value.getLock()));
                    break;
                }
                case RELEASE: {
                    this.listener.onRelease(this.convertRelease(value.getRelease()));
                    break;
                }
                case ERROR: {
                    if ("Reads done signal - closing everything".equals(value.getError().getDescription())) break;
                    throw this.convertError(value.getError());
                }
            }
        }

        public void onError(Throwable t) {
            logger.error("Consumer stream (sessionId: " + LogbrokerConsumerTransport.this.sessionId + ") closed with error", t);
            this.listener.onError(t);
        }

        public void onCompleted() {
            logger.info("Consumer stream (sessionId: " + LogbrokerConsumerTransport.this.sessionId + ") closed");
            this.listener.onCompleted();
        }

        private ConsumerInitResponse convertInit(Persqueue.ReadResponse.Init tInit) {
            return new ConsumerInitResponse(tInit.getSessionId());
        }

        private ConsumerReadResponse convertData(Persqueue.ReadResponse.Data tData) {
            ArrayList<MessageBatch> messageBatches = new ArrayList<MessageBatch>(tData.getMessageBatchCount());
            for (Persqueue.ReadResponse.Data.MessageBatch tMessageBatch : tData.getMessageBatchList()) {
                ArrayList<MessageData> messageData = new ArrayList<MessageData>(tMessageBatch.getMessageCount());
                for (Persqueue.ReadResponse.Data.Message tMessage : tMessageBatch.getMessageList()) {
                    Persqueue.MessageMeta tMeta = tMessage.getMeta();
                    MessageMeta meta = new MessageMeta(tMeta.getSourceId().toByteArray(), tMeta.getSeqNo(), tMeta.getCreateTimeMs(), tMeta.getWriteTimeMs(), tMeta.getIp(), CompressionCodec.fromECodec(tMeta.getCodec()));
                    MessageData data = new MessageData(tMessage.getData().toByteArray(), tMessage.getOffset(), meta);
                    messageData.add(data);
                }
                MessageBatch messageBatch = new MessageBatch(tMessageBatch.getTopic(), tMessageBatch.getPartition(), messageData);
                messageBatches.add(messageBatch);
            }
            return new ConsumerReadResponse(messageBatches, tData.getCookie());
        }

        private CommitMessage convertCommit(Persqueue.ReadResponse.Commit tCommit) {
            return new CommitMessage(tCommit.getCookieList());
        }

        private ConsumerLockMessage convertLock(Persqueue.ReadResponse.Lock tLock) {
            return new ConsumerLockMessage(tLock.getTopic(), tLock.getPartition(), tLock.getReadOffset(), tLock.getEndOffset(), tLock.getGeneration());
        }

        private ConsumerReleaseMessage convertRelease(Persqueue.ReadResponse.Release tRelease) {
            return new ConsumerReleaseMessage(tRelease.getTopic(), tRelease.getPartition(), tRelease.getCanCommit(), tRelease.getGeneration());
        }

        private ConsumerException convertError(Persqueue.Error tError) {
            return new ConsumerException(tError.getCode(), tError.getDescription());
        }
    }
}

