/*
 * Decompiled with CFR 0.152.
 */
package ru.yandex.kikimr.persqueue.producer.transport;

import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.kikimr.persqueue.auth.Credentials;
import ru.yandex.kikimr.persqueue.producer.ProducerException;
import ru.yandex.kikimr.persqueue.producer.transport.ProducerMessageListener;
import ru.yandex.kikimr.persqueue.producer.transport.ProducerTransport;
import ru.yandex.kikimr.persqueue.producer.transport.message.inbound.ProducerInitResponse;
import ru.yandex.kikimr.persqueue.producer.transport.message.inbound.ProducerWriteResponse;
import ru.yandex.kikimr.persqueue.producer.transport.message.outbound.ProducerInitRequest;
import ru.yandex.kikimr.persqueue.producer.transport.message.outbound.ProducerWriteRequest;
import ru.yandex.kikimr.persqueue.proxy.ProxyConfig;
import ru.yandex.persqueue.PersQueueServiceGrpc;
import ru.yandex.ydb.persqueue.Persqueue;

public class LogbrokerProducerTransport
implements ProducerTransport {
    private static final Logger logger = LoggerFactory.getLogger(LogbrokerProducerTransport.class);
    private final ProxyConfig proxyConfig;
    private final Supplier<Credentials> credentialsProvider;
    private String sessionId;
    private Credentials lastCredentials;
    private ManagedChannel channel;
    private PersQueueServiceGrpc.PersQueueServiceStub serverStub;
    private StreamObserver<Persqueue.WriteRequest> outboundStreamObserver;

    public LogbrokerProducerTransport(ProxyConfig proxyConfig, Supplier<Credentials> credentialsProvider) {
        this.proxyConfig = proxyConfig;
        this.credentialsProvider = credentialsProvider;
    }

    @Override
    public synchronized void openConnection(ProducerMessageListener listener) {
        logger.debug("Establishing grpc channel");
        this.channel = ManagedChannelBuilder.forAddress((String)this.proxyConfig.getProxyHost(), (int)this.proxyConfig.getProxyPort()).usePlaintext().build();
        this.serverStub = PersQueueServiceGrpc.newStub((Channel)this.channel);
        logger.debug("Establishing write session");
        this.outboundStreamObserver = this.serverStub.writeSession((StreamObserver)new InboundStreamObserver(listener));
    }

    @Override
    public synchronized void sendInit(ProducerInitRequest init) {
        Persqueue.WriteRequest.Init.Builder initRequestBuilder = Persqueue.WriteRequest.Init.newBuilder().setTopic(init.getTopic()).setSourceId(ByteString.copyFrom((byte[])init.getSourceId())).setPartitionGroup(init.getGroup()).setProxyCookie(this.proxyConfig.getProxyCookie());
        Persqueue.WriteRequest request = this.addCredentials(Persqueue.WriteRequest.newBuilder()).setInit(initRequestBuilder).build();
        logger.debug("Sending init {}", (Object)request);
        this.outboundStreamObserver.onNext((Object)request);
    }

    @Override
    public synchronized void sendData(ProducerWriteRequest data) {
        Persqueue.WriteRequest.Data tData = Persqueue.WriteRequest.Data.newBuilder().setSeqNo(data.getSeqNo()).setData(ByteString.copyFrom((byte[])data.getData())).setCreateTimeMs(data.getCreateTimeMs()).setCodec(data.getCodec().toECodec()).build();
        Persqueue.WriteRequest request = this.addCredentials(Persqueue.WriteRequest.newBuilder()).setData(tData).build();
        logger.debug("Sending data for session {}", (Object)this.sessionId);
        logger.trace("Data: {}", (Object)request);
        this.outboundStreamObserver.onNext((Object)request);
    }

    private Persqueue.WriteRequest.Builder addCredentials(Persqueue.WriteRequest.Builder builder) {
        Credentials credentials;
        if (this.credentialsProvider != null && !Objects.equals(this.lastCredentials, credentials = this.credentialsProvider.get())) {
            this.lastCredentials = credentials;
            return builder.setCredentials(LogbrokerProducerTransport.toPersqueueCredentials(credentials));
        }
        return builder;
    }

    private static Persqueue.Credentials toPersqueueCredentials(Credentials credentials) {
        ByteString value = ByteString.copyFromUtf8((String)credentials.getValue());
        switch (credentials.getType()) {
            case NONE: {
                return Persqueue.Credentials.newBuilder().build();
            }
            case OAUTH: {
                return Persqueue.Credentials.newBuilder().setOauthToken(value).build();
            }
            case TVM: {
                return Persqueue.Credentials.newBuilder().setTvmServiceTicket(value).build();
            }
        }
        throw new IllegalStateException("Unknown credentials type");
    }

    @Override
    public synchronized void closeConnection() {
        logger.debug("Closing outbound connection for session {}", (Object)this.sessionId);
        this.outboundStreamObserver.onCompleted();
        this.channel.shutdown();
    }

    private class InboundStreamObserver
    implements StreamObserver<Persqueue.WriteResponse> {
        private final ProducerMessageListener listener;

        InboundStreamObserver(ProducerMessageListener listener) {
            this.listener = listener;
        }

        public void onNext(Persqueue.WriteResponse value) {
            logger.debug("Got message {}", (Object)value);
            switch (value.getResponseCase()) {
                case INIT: {
                    ProducerInitResponse init = this.convertInit(value.getInit());
                    LogbrokerProducerTransport.this.sessionId = init.getSessionId();
                    this.listener.onInit(init);
                    break;
                }
                case ACK: {
                    this.listener.onAck(this.convertAck(value.getAck()));
                    break;
                }
                case ERROR: {
                    throw this.convertError(value.getError());
                }
            }
        }

        public void onError(Throwable t) {
            logger.error("Producer stream (sessionId: " + LogbrokerProducerTransport.this.sessionId + ") closed with error", t);
            this.listener.onError(t);
        }

        public void onCompleted() {
            logger.info("Producer stream (sessionId: " + LogbrokerProducerTransport.this.sessionId + ") closed");
            this.listener.onCompleted();
        }

        private ProducerInitResponse convertInit(Persqueue.WriteResponse.Init tInit) {
            return new ProducerInitResponse(tInit.getMaxSeqNo(), tInit.getTopic(), tInit.getPartition(), tInit.getSessionId());
        }

        private ProducerWriteResponse convertAck(Persqueue.WriteResponse.Ack tAck) {
            return new ProducerWriteResponse(tAck.getSeqNo(), tAck.getOffset(), tAck.getAlreadyWritten());
        }

        private ProducerException convertError(Persqueue.Error tError) {
            return new ProducerException(tError.getCode(), tError.getDescription());
        }
    }
}

