/*
 * Decompiled with CFR 0.152.
 */
package cocaine.session;

import cocaine.ServiceException;
import cocaine.UnexpectedServiceMessageException;
import cocaine.api.TransactionTree;
import cocaine.session.CocainePayloadDeserializer;
import cocaine.session.protocol.CocaineProtocol;
import cocaine.session.protocol.IdentityProtocol;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.msgpack.type.Value;

public class ReceiveChannel<T> {
    private static final Logger logger = Logger.getLogger(ReceiveChannel.class);
    private TransactionTree rxTree;
    private final BlockingQueue<ResultMessage> queue;
    private final CocaineProtocol protocol;
    private final CocainePayloadDeserializer<T> deserializer;
    private final String serviceName;
    private final long readTimeoutInMs;
    private boolean completed = false;
    private boolean hasError = false;

    public ReceiveChannel(String serviceName, TransactionTree rxTree, CocaineProtocol protocol, CocainePayloadDeserializer<T> deserializer, long readTimeoutInMs) {
        this.serviceName = serviceName;
        this.rxTree = rxTree;
        this.queue = new LinkedBlockingQueue<ResultMessage>();
        this.protocol = protocol;
        this.deserializer = deserializer;
        this.readTimeoutInMs = readTimeoutInMs;
    }

    public T get() {
        if (this.protocol instanceof IdentityProtocol) {
            this.onCompleted();
            return null;
        }
        PayloadResultMessage msg = this.getNextPayloadMessage();
        Value payload = this.protocol.handle(this.serviceName, msg.messageType, msg.payload);
        try {
            if (payload != null) {
                return this.deserializer.deserialize(msg.messageType, payload);
            }
            return null;
        }
        catch (IOException e) {
            logger.error((Object)("Couldn't deserialize result of message " + msg.messageType + ", " + e.getMessage()), (Throwable)e);
            throw new ServiceException(this.serviceName, e.getMessage());
        }
    }

    void onRead(int type, Value payload) {
        Optional<TransactionTree.TransactionInfo> info = this.rxTree.getInfo(type);
        if (!info.isPresent()) {
            this.putMessageInQueue(new ErrorResultMessage((Exception)((Object)new UnexpectedServiceMessageException(this.serviceName, type))));
            logger.error((Object)("Unknown message type: " + type + ", for service " + this.serviceName));
        } else {
            this.putMessageInQueue(new PayloadResultMessage(info.get().getMessageName(), payload));
            TransactionTree tree = info.get().getTree();
            if (!tree.isCycle()) {
                if (tree.isEmpty()) {
                    this.onCompleted();
                    logger.info((Object)"Last message received");
                } else {
                    this.rxTree = tree;
                }
            }
        }
    }

    private PayloadResultMessage getNextPayloadMessage() {
        try {
            if (this.completed && this.queue.isEmpty()) {
                throw new ServiceException(this.serviceName, "Read channel is completed and has empty queue");
            }
            ResultMessage nextMessage = this.pollTheQueue();
            if (nextMessage.isErrorMessage()) {
                Exception error = ((ErrorResultMessage)nextMessage).error;
                throw new ServiceException(this.serviceName, error.getClass().getName() + ": " + error.getMessage());
            }
            return (PayloadResultMessage)nextMessage;
        }
        catch (InterruptedException e) {
            throw new ServiceException(this.serviceName, "Reading interrupted, " + e.getMessage());
        }
    }

    private ResultMessage pollTheQueue() throws InterruptedException {
        if (this.readTimeoutInMs == 0L) {
            return this.queue.take();
        }
        ResultMessage result = this.queue.poll(this.readTimeoutInMs, TimeUnit.MILLISECONDS);
        if (result == null) {
            throw new ServiceException(this.serviceName, "Read timeout occurred in receive channel, timeout = " + this.readTimeoutInMs + " ms");
        }
        return result;
    }

    private void putMessageInQueue(ResultMessage resultMessage) {
        try {
            if (!this.completed && !this.hasError) {
                this.queue.put(resultMessage);
            }
            this.hasError = this.hasError || resultMessage.isErrorMessage();
        }
        catch (InterruptedException e) {
            throw new ServiceException(this.serviceName, "Putting message in queue interrupted, " + e.getMessage());
        }
    }

    public void onCompleted() {
        this.completed = true;
    }

    private static class ErrorResultMessage
    extends ResultMessage {
        private Exception error;

        public ErrorResultMessage(Exception error) {
            this.error = error;
        }

        @Override
        public boolean isErrorMessage() {
            return true;
        }
    }

    private static class PayloadResultMessage
    extends ResultMessage {
        private String messageType;
        private Value payload;

        public PayloadResultMessage(String messageType, Value payload) {
            this.messageType = messageType;
            this.payload = payload;
        }

        @Override
        public boolean isErrorMessage() {
            return false;
        }
    }

    private static abstract class ResultMessage {
        private ResultMessage() {
        }

        public abstract boolean isErrorMessage();
    }
}

