/*
 * Decompiled with CFR 0.152.
 */
package ru.yandex.kikimr.persqueue.producer.async;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.kikimr.persqueue.compression.CompressionCodec;
import ru.yandex.kikimr.persqueue.producer.AsyncProducer;
import ru.yandex.kikimr.persqueue.producer.ProducerStreamClosedException;
import ru.yandex.kikimr.persqueue.producer.async.AsyncProducerConfig;
import ru.yandex.kikimr.persqueue.producer.transport.ProducerMessageListener;
import ru.yandex.kikimr.persqueue.producer.transport.ProducerTransport;
import ru.yandex.kikimr.persqueue.producer.transport.message.inbound.ProducerInitResponse;
import ru.yandex.kikimr.persqueue.producer.transport.message.inbound.ProducerWriteResponse;
import ru.yandex.kikimr.persqueue.producer.transport.message.outbound.ProducerInitRequest;
import ru.yandex.kikimr.persqueue.producer.transport.message.outbound.ProducerWriteRequest;

public class LogbrokerAsyncProducer
implements AsyncProducer {
    private static final Logger logger = LoggerFactory.getLogger(LogbrokerAsyncProducer.class);
    private volatile boolean readChannelClosed;
    private volatile boolean writeChannelClosed;
    private final ReadWriteLock closeChannelLock;
    private final AsyncProducerConfig asyncProducerConfig;
    private final CompletableFuture<ProducerInitResponse> initFuture;
    private final CompletableFuture<Void> closeFuture;
    private final Map<Long, CompletableFuture<ProducerWriteResponse>> writeAcks;
    private final ProducerTransport transport;

    public LogbrokerAsyncProducer(AsyncProducerConfig asyncProducerConfig, ProducerTransport transport) {
        this.transport = transport;
        this.asyncProducerConfig = asyncProducerConfig;
        this.readChannelClosed = false;
        this.writeChannelClosed = false;
        this.writeAcks = new ConcurrentHashMap<Long, CompletableFuture<ProducerWriteResponse>>();
        this.initFuture = new CompletableFuture();
        this.closeFuture = new CompletableFuture();
        this.closeChannelLock = new ReentrantReadWriteLock();
    }

    @Override
    public CompletableFuture<ProducerInitResponse> init() {
        this.transport.openConnection(new InboundMessageListener());
        this.transport.sendInit(new ProducerInitRequest(this.asyncProducerConfig.getTopic(), this.asyncProducerConfig.getSourceId(), this.asyncProducerConfig.getGroup()));
        return this.initFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<ProducerWriteResponse> write(byte[] data, long seqNo, long timestamp) {
        CompletableFuture<ProducerWriteResponse> writeAckFuture = new CompletableFuture<ProducerWriteResponse>();
        this.closeChannelLock.readLock().lock();
        try {
            if (!this.writeChannelClosed && !this.readChannelClosed) {
                this.writeAcks.put(seqNo, writeAckFuture);
                try {
                    CompressionCodec codec = this.asyncProducerConfig.getCodec();
                    byte[] compressedData = codec.compressData(data);
                    ProducerWriteRequest message = new ProducerWriteRequest(seqNo, compressedData, timestamp, codec);
                    this.transport.sendData(message);
                }
                catch (Exception e) {
                    this.writeAcks.remove(seqNo).completeExceptionally(e);
                }
            } else {
                writeAckFuture.completeExceptionally(new ProducerStreamClosedException());
            }
        }
        finally {
            this.closeChannelLock.readLock().unlock();
        }
        return writeAckFuture;
    }

    @Override
    public CompletableFuture<Void> closeFuture() {
        return this.closeFuture;
    }

    @Override
    public void close() {
        this.closeChannelLock.writeLock().lock();
        try {
            this.writeChannelClosed = true;
            this.transport.closeConnection();
        }
        finally {
            this.closeChannelLock.writeLock().unlock();
        }
    }

    private void completeFutures(Throwable cause) {
        ProducerStreamClosedException exception = new ProducerStreamClosedException(cause);
        if (!this.initFuture.isDone()) {
            this.initFuture.completeExceptionally(exception);
        }
        if (!this.closeFuture.isDone()) {
            if (cause == null) {
                this.closeFuture.complete(null);
            } else {
                this.closeFuture.completeExceptionally(exception);
            }
        }
        for (CompletableFuture<ProducerWriteResponse> writeAck : this.writeAcks.values()) {
            writeAck.completeExceptionally(exception);
        }
    }

    private class InboundMessageListener
    implements ProducerMessageListener {
        private InboundMessageListener() {
        }

        @Override
        public void onInit(ProducerInitResponse init) {
            LogbrokerAsyncProducer.this.initFuture.complete(init);
        }

        @Override
        public void onAck(ProducerWriteResponse ack) {
            CompletableFuture writeAckHolder = (CompletableFuture)LogbrokerAsyncProducer.this.writeAcks.remove(ack.getSeqNo());
            if (writeAckHolder != null) {
                writeAckHolder.complete(ack);
            } else {
                logger.error("Got unexpected ACK with seqNo {}", (Object)ack.getSeqNo());
            }
        }

        @Override
        public void onError(Throwable t) {
            this.closeInboundStream(t);
        }

        @Override
        public void onCompleted() {
            this.closeInboundStream(null);
        }

        private void closeInboundStream(Throwable cause) {
            LogbrokerAsyncProducer.this.closeChannelLock.writeLock().lock();
            try {
                LogbrokerAsyncProducer.this.readChannelClosed = true;
                LogbrokerAsyncProducer.this.completeFutures(cause);
            }
            finally {
                LogbrokerAsyncProducer.this.closeChannelLock.writeLock().unlock();
            }
        }
    }
}

