# Copyright 2012, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


"""This file provides classes and helper functions for multiplexing extension.

Specification:
http://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-06
"""


import collections
import copy
import email
import email.parser
import logging
import math
import struct
import threading
import traceback

from mod_pywebsocket import common
from mod_pywebsocket import handshake
from mod_pywebsocket import util
from mod_pywebsocket._stream_base import BadOperationException
from mod_pywebsocket._stream_base import ConnectionTerminatedException
from mod_pywebsocket._stream_base import InvalidFrameException
from mod_pywebsocket._stream_hybi import Frame
from mod_pywebsocket._stream_hybi import Stream
from mod_pywebsocket._stream_hybi import StreamOptions
from mod_pywebsocket._stream_hybi import create_binary_frame
from mod_pywebsocket._stream_hybi import create_closing_handshake_body
from mod_pywebsocket._stream_hybi import create_header
from mod_pywebsocket._stream_hybi import create_length_header
from mod_pywebsocket._stream_hybi import parse_frame
from mod_pywebsocket.handshake import hybi


_CONTROL_CHANNEL_ID = 0
_DEFAULT_CHANNEL_ID = 1

_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
_MUX_OPCODE_FLOW_CONTROL = 2
_MUX_OPCODE_DROP_CHANNEL = 3
_MUX_OPCODE_NEW_CHANNEL_SLOT = 4

_MAX_CHANNEL_ID = 2 ** 29 - 1

_INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64
_INITIAL_QUOTA_FOR_CLIENT = 8 * 1024

_HANDSHAKE_ENCODING_IDENTITY = 0
_HANDSHAKE_ENCODING_DELTA = 1

# We need only these status code for now.
_HTTP_BAD_RESPONSE_MESSAGES = {
    common.HTTP_STATUS_BAD_REQUEST: 'Bad Request',
}

# DropChannel reason code
# TODO(bashi): Define all reason code defined in -05 draft.
_DROP_CODE_NORMAL_CLOSURE = 1000

_DROP_CODE_INVALID_ENCAPSULATING_MESSAGE = 2001
_DROP_CODE_CHANNEL_ID_TRUNCATED = 2002
_DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED = 2003
_DROP_CODE_UNKNOWN_MUX_OPCODE = 2004
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK = 2005
_DROP_CODE_CHANNEL_ALREADY_EXISTS = 2006
_DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION = 2007
_DROP_CODE_UNKNOWN_REQUEST_ENCODING = 2010

_DROP_CODE_SEND_QUOTA_VIOLATION = 3005
_DROP_CODE_SEND_QUOTA_OVERFLOW = 3006
_DROP_CODE_ACKNOWLEDGED = 3008
_DROP_CODE_BAD_FRAGMENTATION = 3009


class MuxUnexpectedException(Exception):

    """Exception in handling multiplexing extension."""

    pass


# Temporary
class MuxNotImplementedException(Exception):

    """Raised when a flow enters unimplemented code path."""

    pass


class LogicalConnectionClosedException(Exception):

    """Raised when logical connection is gracefully closed."""

    pass


class PhysicalConnectionError(Exception):

    """Raised when there is a physical connection error."""

    def __init__(self, drop_code, message=''):
        super(PhysicalConnectionError, self).__init__(
            'code=%d, message=%r' % (drop_code, message))
        self.drop_code = drop_code
        self.message = message


class LogicalChannelError(Exception):

    """Raised when there is a logical channel error."""

    def __init__(self, channel_id, drop_code, message=''):
        """Initialize the error with a status message."""
        super(LogicalChannelError, self).__init__(
            'channel_id=%d, code=%d, message=%r' % (
                channel_id, drop_code, message))
        self.channel_id = channel_id
        self.drop_code = drop_code
        self.message = message


def _encode_channel_id(channel_id):
    if channel_id < 0:
        raise ValueError('Channel id %d must not be negative' % channel_id)

    if channel_id < 2 ** 7:
        return chr(channel_id)
    if channel_id < 2 ** 14:
        return struct.pack('!H', 0x8000 + channel_id)
    if channel_id < 2 ** 21:
        first = chr(0xc0 + (channel_id >> 16))
        return first + struct.pack('!H', channel_id & 0xffff)
    if channel_id < 2 ** 29:
        return struct.pack('!L', 0xe0000000 + channel_id)

    raise ValueError('Channel id %d is too large' % channel_id)


def _encode_number(number):
    return create_length_header(number, False)


def _create_add_channel_response(channel_id, encoded_handshake,
                                 encoding=0, rejected=False):
    if encoding != 0 and encoding != 1:
        raise ValueError('Invalid encoding %d' % encoding)

    first_byte = ((_MUX_OPCODE_ADD_CHANNEL_RESPONSE << 5) |
                  (rejected << 4) | encoding)
    block = (chr(first_byte) +
             _encode_channel_id(channel_id) +
             _encode_number(len(encoded_handshake)) +
             encoded_handshake)
    return block


def _create_drop_channel(channel_id, code=None, message=''):
    if len(message) > 0 and code is None:
        raise ValueError('Code must be specified if message is specified')

    first_byte = _MUX_OPCODE_DROP_CHANNEL << 5
    block = chr(first_byte) + _encode_channel_id(channel_id)
    if code is None:
        block += _encode_number(0)  # Reason size
    else:
        reason = struct.pack('!H', code) + message
        reason_size = _encode_number(len(reason))
        block += reason_size + reason

    return block


def _create_flow_control(channel_id, replenished_quota):
    first_byte = _MUX_OPCODE_FLOW_CONTROL << 5
    block = (chr(first_byte) +
             _encode_channel_id(channel_id) +
             _encode_number(replenished_quota))
    return block


def _create_new_channel_slot(slots, send_quota):
    if slots < 0 or send_quota < 0:
        raise ValueError('slots and send_quota must be non-negative.')
    first_byte = _MUX_OPCODE_NEW_CHANNEL_SLOT << 5
    block = (chr(first_byte) +
             _encode_number(slots) +
             _encode_number(send_quota))
    return block


def _create_fallback_new_channel_slot():
    first_byte = (_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) | 1  # Set the F flag
    block = (chr(first_byte) + _encode_number(0) + _encode_number(0))
    return block


def _parse_request_text(request_text):
    request_line, header_lines = request_text.split('\r\n', 1)

    words = request_line.split(' ')
    if len(words) != 3:
        raise ValueError('Bad Request-Line syntax %r' % request_line)
    [command, path, version] = words
    if version != 'HTTP/1.1':
        raise ValueError('Bad request version %r' % version)

    # email.parser.Parser() parses RFC 2822 (RFC 822) style headers.
    # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers
    # RFC 822.
    headers = email.parser.Parser().parsestr(header_lines)
    return command, path, version, headers


class _ControlBlock(object):

    """A structure that holds parsing result of multiplexing control block.

    Control block specific attributes will be added by _MuxFramePayloadParser.
    (e.g. encoded_handshake will be added for AddChannelRequest and
    AddChannelResponse)
    """

    def __init__(self, opcode):
        self.opcode = opcode


class _MuxFramePayloadParser(object):

    """A class that parses multiplexed frame payload."""

    def __init__(self, payload):
        self._data = payload
        self._read_position = 0
        self._logger = util.get_class_logger(self)

    def read_channel_id(self):
        """Read channel id.

        Raises:
            ValueError: when the payload doesn't contain
                valid channel id.
        """
        remaining_length = len(self._data) - self._read_position
        pos = self._read_position
        if remaining_length == 0:
            raise ValueError('Invalid channel id format')

        channel_id = ord(self._data[pos])
        channel_id_length = 1
        if channel_id & 0xe0 == 0xe0:
            if remaining_length < 4:
                raise ValueError('Invalid channel id format')
            channel_id = struct.unpack('!L',
                                       self._data[pos:pos+4])[0] & 0x1fffffff
            channel_id_length = 4
        elif channel_id & 0xc0 == 0xc0:
            if remaining_length < 3:
                raise ValueError('Invalid channel id format')
            channel_id = (((channel_id & 0x1f) << 16) +
                          struct.unpack('!H', self._data[pos+1:pos+3])[0])
            channel_id_length = 3
        elif channel_id & 0x80 == 0x80:
            if remaining_length < 2:
                raise ValueError('Invalid channel id format')
            channel_id = struct.unpack('!H',
                                       self._data[pos:pos+2])[0] & 0x3fff
            channel_id_length = 2
        self._read_position += channel_id_length

        return channel_id

    def read_inner_frame(self):
        """Read an inner frame.

        Raises:
            PhysicalConnectionError: when the inner frame is invalid.
        """
        if len(self._data) == self._read_position:
            raise PhysicalConnectionError(
                _DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED)

        bits = ord(self._data[self._read_position])
        self._read_position += 1
        fin = (bits & 0x80) == 0x80
        rsv1 = (bits & 0x40) == 0x40
        rsv2 = (bits & 0x20) == 0x20
        rsv3 = (bits & 0x10) == 0x10
        opcode = bits & 0xf
        payload = self.remaining_data()
        # Consume rest of the message which is payload data of the original
        # frame.
        self._read_position = len(self._data)
        return fin, rsv1, rsv2, rsv3, opcode, payload

    def _read_number(self):
        if self._read_position + 1 > len(self._data):
            raise ValueError(
                'Cannot read the first byte of number field')

        number = ord(self._data[self._read_position])
        if number & 0x80 == 0x80:
            raise ValueError(
                'The most significant bit of the first byte of number should '
                'be unset')
        self._read_position += 1
        pos = self._read_position
        if number == 127:
            if pos + 8 > len(self._data):
                raise ValueError('Invalid number field')
            self._read_position += 8
            number = struct.unpack('!Q', self._data[pos:pos+8])[0]
            if number > 0x7FFFFFFFFFFFFFFF:
                raise ValueError('Encoded number(%d) >= 2^63' % number)
            if number <= 0xFFFF:
                raise ValueError(
                    '%d should not be encoded by 9 bytes encoding' % number)
            return number
        if number == 126:
            if pos + 2 > len(self._data):
                raise ValueError('Invalid number field')
            self._read_position += 2
            number = struct.unpack('!H', self._data[pos:pos+2])[0]
            if number <= 125:
                raise ValueError(
                    '%d should not be encoded by 3 bytes encoding' % number)
        return number

    def _read_size_and_contents(self):
        """Read data that consists of the following:
            - the size of the contents encoded the same way as payload length
              of the WebSocket Protocol with 1 bit padding at the head.
            - the contents.
        """
        try:
            size = self._read_number()
        except ValueError, e:
            raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                                          str(e))
        pos = self._read_position
        if pos + size > len(self._data):
            raise PhysicalConnectionError(
                _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                'Cannot read %d bytes data' % size)

        self._read_position += size
        return self._data[pos:pos+size]

    def _read_add_channel_request(self, first_byte, control_block):
        reserved = (first_byte >> 2) & 0x7
        if reserved != 0:
            raise PhysicalConnectionError(
                _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                'Reserved bits must be unset')

        # Invalid encoding will be handled by MuxHandler.
        encoding = first_byte & 0x3
        try:
            control_block.channel_id = self.read_channel_id()
        except ValueError, e:
            raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
        control_block.encoding = encoding
        encoded_handshake = self._read_size_and_contents()
        control_block.encoded_handshake = encoded_handshake
        return control_block

    def _read_add_channel_response(self, first_byte, control_block):
        reserved = (first_byte >> 2) & 0x3
        if reserved != 0:
            raise PhysicalConnectionError(
                _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                'Reserved bits must be unset')

        control_block.accepted = (first_byte >> 4) & 1
        control_block.encoding = first_byte & 0x3
        try:
            control_block.channel_id = self.read_channel_id()
        except ValueError, e:
            raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
        control_block.encoded_handshake = self._read_size_and_contents()
        return control_block

    def _read_flow_control(self, first_byte, control_block):
        reserved = first_byte & 0x1f
        if reserved != 0:
            raise PhysicalConnectionError(
                _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                'Reserved bits must be unset')

        try:
            control_block.channel_id = self.read_channel_id()
            control_block.send_quota = self._read_number()
        except ValueError, e:
            raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                                          str(e))

        return control_block

    def _read_drop_channel(self, first_byte, control_block):
        reserved = first_byte & 0x1f
        if reserved != 0:
            raise PhysicalConnectionError(
                _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                'Reserved bits must be unset')

        try:
            control_block.channel_id = self.read_channel_id()
        except ValueError, e:
            raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
        reason = self._read_size_and_contents()
        if len(reason) == 0:
            control_block.drop_code = None
            control_block.drop_message = ''
        elif len(reason) >= 2:
            control_block.drop_code = struct.unpack('!H', reason[:2])[0]
            control_block.drop_message = reason[2:]
        else:
            raise PhysicalConnectionError(
                _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                'Received DropChannel that conains only 1-byte reason')
        return control_block

    def _read_new_channel_slot(self, first_byte, control_block):
        reserved = first_byte & 0x1e
        if reserved != 0:
            raise PhysicalConnectionError(
                _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                'Reserved bits must be unset')
        control_block.fallback = first_byte & 1
        try:
            control_block.slots = self._read_number()
            control_block.send_quota = self._read_number()
        except ValueError, e:
            raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                                          str(e))
        return control_block

    def read_control_blocks(self):
        """Read control block(s).

        Raises:
           PhysicalConnectionError: when the payload contains invalid control
               block(s).
           StopIteration: when no control blocks left.
        """
        while self._read_position < len(self._data):
            first_byte = ord(self._data[self._read_position])
            self._read_position += 1
            opcode = (first_byte >> 5) & 0x7
            control_block = _ControlBlock(opcode=opcode)
            if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
                yield self._read_add_channel_request(first_byte, control_block)
            elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
                yield self._read_add_channel_response(
                    first_byte, control_block)
            elif opcode == _MUX_OPCODE_FLOW_CONTROL:
                yield self._read_flow_control(first_byte, control_block)
            elif opcode == _MUX_OPCODE_DROP_CHANNEL:
                yield self._read_drop_channel(first_byte, control_block)
            elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
                yield self._read_new_channel_slot(first_byte, control_block)
            else:
                raise PhysicalConnectionError(
                    _DROP_CODE_UNKNOWN_MUX_OPCODE,
                    'Invalid opcode %d' % opcode)

        assert self._read_position == len(self._data)
        raise StopIteration

    def remaining_data(self):
        """Return remaining data."""
        return self._data[self._read_position:]


class _LogicalRequest(object):

    """Mimics mod_python request."""

    def __init__(self, channel_id, command, path, protocol, headers,
                 connection):
        """Construct an instance.

        Args:
            channel_id: the channel id of the logical channel.
            command: HTTP request command.
            path: HTTP request path.
            headers: HTTP headers.
            connection: _LogicalConnection instance.
        """
        self.channel_id = channel_id
        self.method = command
        self.uri = path
        self.protocol = protocol
        self.headers_in = headers
        self.connection = connection
        self.server_terminated = False
        self.client_terminated = False

    def is_https(self):
        """Mimic request.is_https().

        Returns False because this method is used only by old protocols
        (hixie and hybi00).
        """
        return False


class _LogicalConnection(object):

    """Mimics mod_python mp_conn."""

    # For details, see the comment of set_read_state().
    STATE_ACTIVE = 1
    STATE_GRACEFULLY_CLOSED = 2
    STATE_TERMINATED = 3

    def __init__(self, mux_handler, channel_id):
        """Construct an instance.

        Args:
            mux_handler: _MuxHandler instance.
            channel_id: channel id of this connection.
        """
        self._mux_handler = mux_handler
        self._channel_id = channel_id
        self._incoming_data = ''

        # - Protects _waiting_write_completion
        # - Signals the thread waiting for completion of write by mux handler
        self._write_condition = threading.Condition()
        self._waiting_write_completion = False

        self._read_condition = threading.Condition()
        self._read_state = self.STATE_ACTIVE

    def get_local_addr(self):
        """Getter to mimic mp_conn.local_addr."""
        return self._mux_handler.physical_connection.get_local_addr()
    local_addr = property(get_local_addr)

    def get_remote_addr(self):
        """Getter to mimic mp_conn.remote_addr."""
        return self._mux_handler.physical_connection.get_remote_addr()
    remote_addr = property(get_remote_addr)

    def get_memorized_lines(self):
        """Get memorized lines. Not supported."""
        raise MuxUnexpectedException('_LogicalConnection does not support '
                                     'get_memorized_lines')

    def write(self, data):
        """Write data. mux_handler sends data asynchronously.

        The caller will be suspended until write done.

        Args:
            data: data to be written.

        Raises:
            MuxUnexpectedException: when called before finishing the previous
                write.
        """
        try:
            self._write_condition.acquire()
            if self._waiting_write_completion:
                raise MuxUnexpectedException(
                    'Logical connection %d is already waiting the completion '
                    'of write' % self._channel_id)

            self._waiting_write_completion = True
            self._mux_handler.send_data(self._channel_id, data)
            self._write_condition.wait()
            # TODO(tyoshino): Raise an exception if woke up by on_writer_done.
        finally:
            self._write_condition.release()

    def write_control_data(self, data):
        """Write data via the control channel.

        Don't wait finishing write because this method can be called by
        mux dispatcher.

        Args:
            data: data to be written.
        """
        self._mux_handler.send_control_data(data)

    def on_write_data_done(self):
        """Called when sending data is completed."""
        try:
            self._write_condition.acquire()
            if not self._waiting_write_completion:
                raise MuxUnexpectedException(
                    'Invalid call of on_write_data_done for logical '
                    'connection %d' % self._channel_id)
            self._waiting_write_completion = False
            self._write_condition.notify()
        finally:
            self._write_condition.release()

    def on_writer_done(self):
        """Called by the mux handler when the writer thread has finished."""
        try:
            self._write_condition.acquire()
            self._waiting_write_completion = False
            self._write_condition.notify()
        finally:
            self._write_condition.release()

    def append_frame_data(self, frame_data):
        """Append incoming frame data.

        Called when mux_handler dispatches frame data to the corresponding
        application.

        Args:
            frame_data: incoming frame data.
        """
        self._read_condition.acquire()
        self._incoming_data += frame_data
        self._read_condition.notify()
        self._read_condition.release()

    def read(self, length):
        """Read data.

        Blocks until enough data has arrived via physical connection.

        Args:
            length: length of data to be read.
        Raises:
            LogicalConnectionClosedException: when closing handshake for this
                logical channel has been received.
            ConnectionTerminatedException: when the physical connection has
                closed, or an error is caused on the reader thread.
        """
        self._read_condition.acquire()
        while (self._read_state == self.STATE_ACTIVE and
               len(self._incoming_data) < length):
            self._read_condition.wait()

        try:
            if self._read_state == self.STATE_GRACEFULLY_CLOSED:
                raise LogicalConnectionClosedException(
                    'Logical channel %d has closed.' % self._channel_id)
            elif self._read_state == self.STATE_TERMINATED:
                raise ConnectionTerminatedException(
                    'Receiving %d byte failed. Logical channel (%d) closed' %
                    (length, self._channel_id))

            value = self._incoming_data[:length]
            self._incoming_data = self._incoming_data[length:]
        finally:
            self._read_condition.release()

        return value

    def set_read_state(self, new_state):
        """Set the state of this connection.

        Called when an event for this connection has occurred.
        Args:
            new_state: state to be set. new_state must be one of followings:
            - STATE_GRACEFULLY_CLOSED: when closing handshake for this
                connection has been received.
            - STATE_TERMINATED: when the physical connection has closed or
                DropChannel of this connection has received.
        """
        self._read_condition.acquire()
        self._read_state = new_state
        self._read_condition.notify()
        self._read_condition.release()


class _InnerMessage(object):

    """Hold the result of _InnerMessageBuilder.build()."""

    def __init__(self, opcode, payload):
        self.opcode = opcode
        self.payload = payload


class _InnerMessageBuilder(object):

    """Class to build an _InnerMessage.

    A class that holds the context of inner message fragmentation and
    builds a message from fragmented inner frame(s).
    """

    def __init__(self):
        self._control_opcode = None
        self._pending_control_fragments = []
        self._message_opcode = None
        self._pending_message_fragments = []
        self._frame_handler = self._handle_first

    def _handle_first(self, frame):
        if frame.opcode == common.OPCODE_CONTINUATION:
            raise InvalidFrameException('Sending invalid continuation opcode')

        if common.is_control_opcode(frame.opcode):
            return self._process_first_fragmented_control(frame)
        else:
            return self._process_first_fragmented_message(frame)

    def _process_first_fragmented_control(self, frame):
        self._control_opcode = frame.opcode
        self._pending_control_fragments.append(frame.payload)
        if not frame.fin:
            self._frame_handler = self._handle_fragmented_control
            return None
        return self._reassemble_fragmented_control()

    def _process_first_fragmented_message(self, frame):
        self._message_opcode = frame.opcode
        self._pending_message_fragments.append(frame.payload)
        if not frame.fin:
            self._frame_handler = self._handle_fragmented_message
            return None
        return self._reassemble_fragmented_message()

    def _handle_fragmented_control(self, frame):
        if frame.opcode != common.OPCODE_CONTINUATION:
            raise InvalidFrameException(
                'Sending invalid opcode %d while sending fragmented control '
                'message' % frame.opcode)
        self._pending_control_fragments.append(frame.payload)
        if not frame.fin:
            return None
        return self._reassemble_fragmented_control()

    def _reassemble_fragmented_control(self):
        opcode = self._control_opcode
        payload = ''.join(self._pending_control_fragments)
        self._control_opcode = None
        self._pending_control_fragments = []
        if self._message_opcode is not None:
            self._frame_handler = self._handle_fragmented_message
        else:
            self._frame_handler = self._handle_first
        return _InnerMessage(opcode, payload)

    def _handle_fragmented_message(self, frame):
        # Sender can interleave a control message while sending fragmented
        # messages.
        if common.is_control_opcode(frame.opcode):
            if self._control_opcode is not None:
                raise MuxUnexpectedException(
                    'Should not reach here(Bug in builder)')
            return self._process_first_fragmented_control(frame)

        if frame.opcode != common.OPCODE_CONTINUATION:
            raise InvalidFrameException(
                'Sending invalid opcode %d while sending fragmented message' %
                frame.opcode)
        self._pending_message_fragments.append(frame.payload)
        if not frame.fin:
            return None
        return self._reassemble_fragmented_message()

    def _reassemble_fragmented_message(self):
        opcode = self._message_opcode
        payload = ''.join(self._pending_message_fragments)
        self._message_opcode = None
        self._pending_message_fragments = []
        self._frame_handler = self._handle_first
        return _InnerMessage(opcode, payload)

    def build(self, frame):
        """Build an inner message.

        Returns an _InnerMessage instance when the given frame is the last
        fragmented frame. Returns None otherwise.

        Args:
            frame: an inner frame.
        Raises:
            InvalidFrameException: when received invalid opcode. (e.g.
                receiving non continuation data opcode but the fin flag of
                the previous inner frame was not set.)
        """
        return self._frame_handler(frame)


class _LogicalStream(Stream):

    """Mimics the Stream class.

    This class interprets multiplexed WebSocket frames.
    """

    def __init__(self, request, stream_options, send_quota, receive_quota):
        """Construct an instance.

        Args:
            request: _LogicalRequest instance.
            stream_options: StreamOptions instance.
            send_quota: Initial send quota.
            receive_quota: Initial receive quota.
        """
        # Physical stream is responsible for masking.
        stream_options.unmask_receive = False
        Stream.__init__(self, request, stream_options)

        self._send_closed = False
        self._send_quota = send_quota
        # - Protects _send_closed and _send_quota
        # - Signals the thread waiting for send quota replenished
        self._send_condition = threading.Condition()

        # The opcode of the first frame in messages.
        self._message_opcode = common.OPCODE_TEXT
        # True when the last message was fragmented.
        self._last_message_was_fragmented = False

        self._receive_quota = receive_quota
        self._write_inner_frame_semaphore = threading.Semaphore()

        self._inner_message_builder = _InnerMessageBuilder()

    def _create_inner_frame(self, opcode, payload, end=True):
        frame = Frame(fin=end, opcode=opcode, payload=payload)
        for frame_filter in self._options.outgoing_frame_filters:
            frame_filter.filter(frame)

        if len(payload) != len(frame.payload):
            raise MuxUnexpectedException(
                'Mux extension must not be used after extensions which change '
                ' frame boundary')

        first_byte = ((frame.fin << 7) | (frame.rsv1 << 6) |
                      (frame.rsv2 << 5) | (frame.rsv3 << 4) | frame.opcode)
        return chr(first_byte) + frame.payload

    def _write_inner_frame(self, opcode, payload, end=True):
        payload_length = len(payload)
        write_position = 0

        try:
            # An inner frame will be fragmented if there is no enough send
            # quota. This semaphore ensures that fragmented inner frames are
            # sent in order on the logical channel.
            # Note that frames that come from other logical channels or
            # multiplexing control blocks can be inserted between fragmented
            # inner frames on the physical channel.
            self._write_inner_frame_semaphore.acquire()

            # Consume an octet quota when this is the first fragmented frame.
            if opcode != common.OPCODE_CONTINUATION:
                try:
                    self._send_condition.acquire()
                    while (not self._send_closed) and self._send_quota == 0:
                        self._send_condition.wait()

                    if self._send_closed:
                        raise BadOperationException(
                            'Logical connection %d is closed' %
                            self._request.channel_id)

                    self._send_quota -= 1
                finally:
                    self._send_condition.release()

            while write_position < payload_length:
                try:
                    self._send_condition.acquire()
                    while (not self._send_closed) and self._send_quota == 0:
                        self._logger.debug(
                            'No quota. Waiting FlowControl message for %d.' %
                            self._request.channel_id)
                        self._send_condition.wait()

                    if self._send_closed:
                        raise BadOperationException(
                            'Logical connection %d is closed' %
                            self.request._channel_id)

                    remaining = payload_length - write_position
                    write_length = min(self._send_quota, remaining)
                    inner_frame_end = (
                        end and
                        (write_position + write_length == payload_length))

                    inner_frame = self._create_inner_frame(
                        opcode,
                        payload[write_position:write_position+write_length],
                        inner_frame_end)
                    self._send_quota -= write_length
                    self._logger.debug('Consumed quota=%d, remaining=%d' %
                                       (write_length, self._send_quota))
                finally:
                    self._send_condition.release()

                # Writing data will block the worker so we need to release
                # _send_condition before writing.
                self._logger.debug('Sending inner frame: %r' % inner_frame)
                self._request.connection.write(inner_frame)
                write_position += write_length

                opcode = common.OPCODE_CONTINUATION

        except ValueError, e:
            raise BadOperationException(e)
        finally:
            self._write_inner_frame_semaphore.release()

    def replenish_send_quota(self, send_quota):
        """Replenish send quota."""
        try:
            self._send_condition.acquire()
            if self._send_quota + send_quota > 0x7FFFFFFFFFFFFFFF:
                self._send_quota = 0
                raise LogicalChannelError(
                    self._request.channel_id, _DROP_CODE_SEND_QUOTA_OVERFLOW)
            self._send_quota += send_quota
            self._logger.debug('Replenished send quota for channel id %d: %d' %
                               (self._request.channel_id, self._send_quota))
        finally:
            self._send_condition.notify()
            self._send_condition.release()

    def consume_receive_quota(self, amount):
        """Consume receive quota. Returns False on failure."""
        if self._receive_quota < amount:
            self._logger.debug('Violate quota on channel id %d: %d < %d' %
                               (self._request.channel_id,
                                self._receive_quota, amount))
            return False
        self._receive_quota -= amount
        return True

    def send_message(self, message, end=True, binary=False):
        """Override Stream.send_message."""
        if self._request.server_terminated:
            raise BadOperationException(
                'Requested send_message after sending out a closing handshake')

        if binary and isinstance(message, unicode):
            raise BadOperationException(
                'Message for binary frame must be instance of str')

        if binary:
            opcode = common.OPCODE_BINARY
        else:
            opcode = common.OPCODE_TEXT
            message = message.encode('utf-8')

        for message_filter in self._options.outgoing_message_filters:
            message = message_filter.filter(message, end, binary)

        if self._last_message_was_fragmented:
            if opcode != self._message_opcode:
                raise BadOperationException('Message types are different in '
                                            'frames for the same message')
            opcode = common.OPCODE_CONTINUATION
        else:
            self._message_opcode = opcode

        self._write_inner_frame(opcode, message, end)
        self._last_message_was_fragmented = not end

    def _receive_frame(self):
        """Override Stream._receive_frame.

        In addition to call Stream._receive_frame, this method adds the amount
        of payload to receiving quota and sends FlowControl to the client.
        We need to do it here because Stream.receive_message() handles
        control frames internally.
        """
        opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self)
        amount = len(payload)
        # Replenish extra one octet when receiving the first fragmented frame.
        if opcode != common.OPCODE_CONTINUATION:
            amount += 1
        self._receive_quota += amount
        frame_data = _create_flow_control(self._request.channel_id,
                                          amount)
        self._logger.debug('Sending flow control for %d, replenished=%d' %
                           (self._request.channel_id, amount))
        self._request.connection.write_control_data(frame_data)
        return opcode, payload, fin, rsv1, rsv2, rsv3

    def _get_message_from_frame(self, frame):
        """Override Stream._get_message_from_frame."""
        try:
            inner_message = self._inner_message_builder.build(frame)
        except InvalidFrameException:
            raise LogicalChannelError(
                self._request.channel_id, _DROP_CODE_BAD_FRAGMENTATION)

        if inner_message is None:
            return None
        self._original_opcode = inner_message.opcode
        return inner_message.payload

    def receive_message(self):
        """Override Stream.receive_message."""
        # Just call Stream.receive_message(), but catch
        # LogicalConnectionClosedException, which is raised when the logical
        # connection has closed gracefully.
        try:
            return Stream.receive_message(self)
        except LogicalConnectionClosedException, e:
            self._logger.debug('%s', e)
            return None

    def _send_closing_handshake(self, code, reason):
        """Override Stream._send_closing_handshake."""
        body = create_closing_handshake_body(code, reason)
        self._logger.debug('Sending closing handshake for %d: (%r, %r)' %
                           (self._request.channel_id, code, reason))
        self._write_inner_frame(common.OPCODE_CLOSE, body, end=True)

        self._request.server_terminated = True

    def send_ping(self, body=''):
        """Override Stream.send_ping."""
        self._logger.debug('Sending ping on logical channel %d: %r' %
                           (self._request.channel_id, body))
        self._write_inner_frame(common.OPCODE_PING, body, end=True)

        self._ping_queue.append(body)

    def _send_pong(self, body):
        """Override Stream._send_pong."""
        self._logger.debug('Sending pong on logical channel %d: %r' %
                           (self._request.channel_id, body))
        self._write_inner_frame(common.OPCODE_PONG, body, end=True)

    def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
        """Override Stream.close_connection."""
        # TODO(bashi): Implement
        self._logger.debug('Closing logical connection %d' %
                           self._request.channel_id)
        self._request.server_terminated = True

    def stop_sending(self):
        """Stop accepting new send operation (_write_inner_frame)."""
        self._send_condition.acquire()
        self._send_closed = True
        self._send_condition.notify()
        self._send_condition.release()


class _OutgoingData(object):

    """Simple data/channel container.

    A structure that holds data to be sent via physical connection and
    origin of the data.
    """

    def __init__(self, channel_id, data):
        self.channel_id = channel_id
        self.data = data


class _PhysicalConnectionWriter(threading.Thread):

    """A thread that is responsible for writing data to physical connection.

    TODO(bashi): Make sure there is no thread-safety problem when the reader
    thread reads data from the same socket at a time.
    """

    def __init__(self, mux_handler):
        """Construct an instance.

        Args:
            mux_handler: _MuxHandler instance.
        """
        threading.Thread.__init__(self)
        self._logger = util.get_class_logger(self)
        self._mux_handler = mux_handler
        self.setDaemon(True)

        # When set, make this thread stop accepting new data, flush pending
        # data and exit.
        self._stop_requested = False
        # The close code of the physical connection.
        self._close_code = common.STATUS_NORMAL_CLOSURE
        # Deque for passing write data. It's protected by _deque_condition
        # until _stop_requested is set.
        self._deque = collections.deque()
        # - Protects _deque, _stop_requested and _close_code
        # - Signals threads waiting for them to be available
        self._deque_condition = threading.Condition()

    def put_outgoing_data(self, data):
        """Put outgoing data.

        Args:
            data: _OutgoingData instance.
        Raises:
            BadOperationException: when the thread has been requested to
                terminate.
        """
        try:
            self._deque_condition.acquire()
            if self._stop_requested:
                raise BadOperationException('Cannot write data anymore')

            self._deque.append(data)
            self._deque_condition.notify()
        finally:
            self._deque_condition.release()

    def _write_data(self, outgoing_data):
        message = (_encode_channel_id(outgoing_data.channel_id) +
                   outgoing_data.data)
        try:
            self._mux_handler.physical_stream.send_message(
                message=message, end=True, binary=True)
        except Exception, e:
            util.prepend_message_to_exception(
                'Failed to send message to %r: ' %
                (self._mux_handler.physical_connection.remote_addr,), e)
            raise

        # TODO(bashi): It would be better to block the thread that sends
        # control data as well.
        if outgoing_data.channel_id != _CONTROL_CHANNEL_ID:
            self._mux_handler.notify_write_data_done(outgoing_data.channel_id)

    def run(self):
        try:
            self._deque_condition.acquire()
            while not self._stop_requested:
                if len(self._deque) == 0:
                    self._deque_condition.wait()
                    continue

                outgoing_data = self._deque.popleft()

                self._deque_condition.release()
                self._write_data(outgoing_data)
                self._deque_condition.acquire()

            # Flush deque.
            #
            # At this point, self._deque_condition is always acquired.
            try:
                while len(self._deque) > 0:
                    outgoing_data = self._deque.popleft()
                    self._write_data(outgoing_data)
            finally:
                self._deque_condition.release()

            # Close physical connection.
            try:
                # Don't wait the response here. The response will be read
                # by the reader thread.
                self._mux_handler.physical_stream.close_connection(
                    self._close_code, wait_response=False)
            except Exception, e:
                util.prepend_message_to_exception(
                    'Failed to close the physical connection: %r' % e)
                raise
        finally:
            self._mux_handler.notify_writer_done()

    def stop(self, close_code=common.STATUS_NORMAL_CLOSURE):
        """Stop the writer thread."""
        self._deque_condition.acquire()
        self._stop_requested = True
        self._close_code = close_code
        self._deque_condition.notify()
        self._deque_condition.release()


class _PhysicalConnectionReader(threading.Thread):

    """A thread that is responsible for reading data from physical connection.
    """

    def __init__(self, mux_handler):
        """Construct an instance.

        Args:
            mux_handler: _MuxHandler instance.
        """
        threading.Thread.__init__(self)
        self._logger = util.get_class_logger(self)
        self._mux_handler = mux_handler
        self.setDaemon(True)

    def run(self):
        while True:
            try:
                physical_stream = self._mux_handler.physical_stream
                message = physical_stream.receive_message()
                if message is None:
                    break
                # Below happens only when a data message is received.
                opcode = physical_stream.get_last_received_opcode()
                if opcode != common.OPCODE_BINARY:
                    self._mux_handler.fail_physical_connection(
                        _DROP_CODE_INVALID_ENCAPSULATING_MESSAGE,
                        'Received a text message on physical connection')
                    break

            except ConnectionTerminatedException, e:
                self._logger.debug('%s', e)
                break

            try:
                self._mux_handler.dispatch_message(message)
            except PhysicalConnectionError, e:
                self._mux_handler.fail_physical_connection(
                    e.drop_code, e.message)
                break
            except LogicalChannelError, e:
                self._mux_handler.fail_logical_channel(
                    e.channel_id, e.drop_code, e.message)
            except Exception, e:
                self._logger.debug(traceback.format_exc())
                break

        self._mux_handler.notify_reader_done()


class _Worker(threading.Thread):

    """A thread that is responsible for running the corresponding application
    handler.
    """

    def __init__(self, mux_handler, request):
        """Construct an instance.

        Args:
            mux_handler: _MuxHandler instance.
            request: _LogicalRequest instance.
        """
        threading.Thread.__init__(self)
        self._logger = util.get_class_logger(self)
        self._mux_handler = mux_handler
        self._request = request
        self.setDaemon(True)

    def run(self):
        self._logger.debug('Logical channel worker started. (id=%d)' %
                           self._request.channel_id)
        try:
            # Non-critical exceptions will be handled by dispatcher.
            self._mux_handler.dispatcher.transfer_data(self._request)
        except LogicalChannelError, e:
            self._mux_handler.fail_logical_channel(
                e.channel_id, e.drop_code, e.message)
        finally:
            self._mux_handler.notify_worker_done(self._request.channel_id)


class _MuxHandshaker(hybi.Handshaker):

    """Opening handshake processor for multiplexing."""

    _DUMMY_WEBSOCKET_KEY = 'dGhlIHNhbXBsZSBub25jZQ=='

    def __init__(self, request, dispatcher, send_quota, receive_quota):
        """Construct an instance.

        Args:
            request: _LogicalRequest instance.
            dispatcher: Dispatcher instance (dispatch.Dispatcher).
            send_quota: Initial send quota.
            receive_quota: Initial receive quota.
        """
        hybi.Handshaker.__init__(self, request, dispatcher)
        self._send_quota = send_quota
        self._receive_quota = receive_quota

        # Append headers which should not be included in handshake field of
        # AddChannelRequest.
        # TODO(bashi): Make sure whether we should raise exception when
        #     these headers are included already.
        request.headers_in[common.UPGRADE_HEADER] = (
            common.WEBSOCKET_UPGRADE_TYPE)
        request.headers_in[common.SEC_WEBSOCKET_VERSION_HEADER] = (
            str(common.VERSION_HYBI_LATEST))
        request.headers_in[common.SEC_WEBSOCKET_KEY_HEADER] = (
            self._DUMMY_WEBSOCKET_KEY)

    def _create_stream(self, stream_options):
        """Override hybi.Handshaker._create_stream."""
        self._logger.debug('Creating logical stream for %d' %
                           self._request.channel_id)
        return _LogicalStream(
            self._request, stream_options, self._send_quota,
            self._receive_quota)

    def _create_handshake_response(self, accept):
        """Override hybi._create_handshake_response."""
        response = []

        response.append('HTTP/1.1 101 Switching Protocols\r\n')

        # Upgrade and Sec-WebSocket-Accept should be excluded.
        response.append('%s: %s\r\n' % (
            common.CONNECTION_HEADER, common.UPGRADE_CONNECTION_TYPE))
        if self._request.ws_protocol is not None:
            response.append('%s: %s\r\n' % (
                common.SEC_WEBSOCKET_PROTOCOL_HEADER,
                self._request.ws_protocol))
        if (self._request.ws_extensions is not None and
            len(self._request.ws_extensions) != 0):
            response.append('%s: %s\r\n' % (
                common.SEC_WEBSOCKET_EXTENSIONS_HEADER,
                common.format_extensions(self._request.ws_extensions)))
        response.append('\r\n')

        return ''.join(response)

    def _send_handshake(self, accept):
        """Override hybi.Handshaker._send_handshake."""
        # Don't send handshake response for the default channel
        if self._request.channel_id == _DEFAULT_CHANNEL_ID:
            return

        handshake_response = self._create_handshake_response(accept)
        frame_data = _create_add_channel_response(
                         self._request.channel_id,
                         handshake_response)
        self._logger.debug('Sending handshake response for %d: %r' %
                           (self._request.channel_id, frame_data))
        self._request.connection.write_control_data(frame_data)


class _LogicalChannelData(object):

    """A structure that holds information about logical channel."""

    def __init__(self, request, worker):
        self.request = request
        self.worker = worker
        self.drop_code = _DROP_CODE_NORMAL_CLOSURE
        self.drop_message = ''


class _HandshakeDeltaBase(object):
    """A class that holds information for delta-encoded handshake."""

    def __init__(self, headers):
        self._headers = headers

    def create_headers(self, delta=None):
        """Creates request headers for an AddChannelRequest that has
        delta-encoded handshake.

        Args:
            delta: headers should be overridden.
        """

        headers = copy.copy(self._headers)
        if delta:
            for key, value in delta.items():
                # The spec requires that a header with an empty value is
                # removed from the delta base.
                if len(value) == 0 and headers.has_key(key):
                    del headers[key]
                else:
                    headers[key] = value
        return headers


class _MuxHandler(object):
    """Multiplexing handler. When a handler starts, it launches three
    threads; the reader thread, the writer thread, and a worker thread.

    The reader thread reads data from the physical stream, i.e., the
    ws_stream object of the underlying websocket connection. The reader
    thread interprets multiplexed frames and dispatches them to logical
    channels. Methods of this class are mostly called by the reader thread.

    The writer thread sends multiplexed frames which are created by
    logical channels via the physical connection.

    The worker thread launched at the starting point handles the
    "Implicitly Opened Connection". If multiplexing handler receives
    an AddChannelRequest and accepts it, the handler will launch a new worker
    thread and dispatch the request to it.
    """

    def __init__(self, request, dispatcher):
        """Constructs an instance.

        Args:
            request: mod_python request of the physical connection.
            dispatcher: Dispatcher instance (dispatch.Dispatcher).
        """

        self.original_request = request
        self.dispatcher = dispatcher
        self.physical_connection = request.connection
        self.physical_stream = request.ws_stream
        self._logger = util.get_class_logger(self)
        self._logical_channels = {}
        self._logical_channels_condition = threading.Condition()
        # Holds client's initial quota
        self._channel_slots = collections.deque()
        self._handshake_base = None
        self._worker_done_notify_received = False
        self._reader = None
        self._writer = None

    def start(self):
        """Starts the handler.

        Raises:
            MuxUnexpectedException: when the handler already started, or when
                opening handshake of the default channel fails.
        """

        if self._reader or self._writer:
            raise MuxUnexpectedException('MuxHandler already started')

        self._reader = _PhysicalConnectionReader(self)
        self._writer = _PhysicalConnectionWriter(self)
        self._reader.start()
        self._writer.start()

        # Create "Implicitly Opened Connection".
        logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID)
        headers = copy.copy(self.original_request.headers_in)
        # Add extensions for logical channel.
        headers[common.SEC_WEBSOCKET_EXTENSIONS_HEADER] = (
            common.format_extensions(
                self.original_request.mux_processor.extensions()))
        self._handshake_base = _HandshakeDeltaBase(headers)
        logical_request = _LogicalRequest(
            _DEFAULT_CHANNEL_ID,
            self.original_request.method,
            self.original_request.uri,
            self.original_request.protocol,
            self._handshake_base.create_headers(),
            logical_connection)
        # Client's send quota for the implicitly opened connection is zero,
        # but we will send FlowControl later so set the initial quota to
        # _INITIAL_QUOTA_FOR_CLIENT.
        self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT)
        send_quota = self.original_request.mux_processor.quota()
        if not self._do_handshake_for_logical_request(
            logical_request, send_quota=send_quota):
            raise MuxUnexpectedException(
                'Failed handshake on the default channel id')
        self._add_logical_channel(logical_request)

        # Send FlowControl for the implicitly opened connection.
        frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID,
                                          _INITIAL_QUOTA_FOR_CLIENT)
        logical_request.connection.write_control_data(frame_data)

    def add_channel_slots(self, slots, send_quota):
        """Adds channel slots.

        Args:
            slots: number of slots to be added.
            send_quota: initial send quota for slots.
        """

        self._channel_slots.extend([send_quota] * slots)
        # Send NewChannelSlot to client.
        frame_data = _create_new_channel_slot(slots, send_quota)
        self.send_control_data(frame_data)

    def wait_until_done(self, timeout=None):
        """Waits until all workers are done. Returns False when timeout has
        occurred. Returns True on success.

        Args:
            timeout: timeout in sec.
        """

        self._logical_channels_condition.acquire()
        try:
            while len(self._logical_channels) > 0:
                self._logger.debug('Waiting workers(%d)...' %
                                   len(self._logical_channels))
                self._worker_done_notify_received = False
                self._logical_channels_condition.wait(timeout)
                if not self._worker_done_notify_received:
                    self._logger.debug('Waiting worker(s) timed out')
                    return False
        finally:
            self._logical_channels_condition.release()

        # Flush pending outgoing data
        self._writer.stop()
        self._writer.join()

        return True

    def notify_write_data_done(self, channel_id):
        """Called by the writer thread when a write operation has done.

        Args:
            channel_id: objective channel id.
        """

        try:
            self._logical_channels_condition.acquire()
            if channel_id in self._logical_channels:
                channel_data = self._logical_channels[channel_id]
                channel_data.request.connection.on_write_data_done()
            else:
                self._logger.debug('Seems that logical channel for %d has gone'
                                   % channel_id)
        finally:
            self._logical_channels_condition.release()

    def send_control_data(self, data):
        """Sends data via the control channel.

        Args:
            data: data to be sent.
        """

        self._writer.put_outgoing_data(_OutgoingData(
                channel_id=_CONTROL_CHANNEL_ID, data=data))

    def send_data(self, channel_id, data):
        """Sends data via given logical channel. This method is called by
        worker threads.

        Args:
            channel_id: objective channel id.
            data: data to be sent.
        """

        self._writer.put_outgoing_data(_OutgoingData(
                channel_id=channel_id, data=data))

    def _send_drop_channel(self, channel_id, code=None, message=''):
        frame_data = _create_drop_channel(channel_id, code, message)
        self._logger.debug(
            'Sending drop channel for channel id %d' % channel_id)
        self.send_control_data(frame_data)

    def _send_error_add_channel_response(self, channel_id, status=None):
        if status is None:
            status = common.HTTP_STATUS_BAD_REQUEST

        if status in _HTTP_BAD_RESPONSE_MESSAGES:
            message = _HTTP_BAD_RESPONSE_MESSAGES[status]
        else:
            self._logger.debug('Response message for %d is not found' % status)
            message = '???'

        response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message)
        frame_data = _create_add_channel_response(channel_id,
                                                  encoded_handshake=response,
                                                  encoding=0, rejected=True)
        self.send_control_data(frame_data)

    def _create_logical_request(self, block):
        if block.channel_id == _CONTROL_CHANNEL_ID:
            # TODO(bashi): Raise PhysicalConnectionError with code 2006
            # instead of MuxUnexpectedException.
            raise MuxUnexpectedException(
                'Received the control channel id (0) as objective channel '
                'id for AddChannel')

        if block.encoding > _HANDSHAKE_ENCODING_DELTA:
            raise PhysicalConnectionError(
                _DROP_CODE_UNKNOWN_REQUEST_ENCODING)

        method, path, version, headers = _parse_request_text(
            block.encoded_handshake)
        if block.encoding == _HANDSHAKE_ENCODING_DELTA:
            headers = self._handshake_base.create_headers(headers)

        connection = _LogicalConnection(self, block.channel_id)
        request = _LogicalRequest(block.channel_id, method, path, version,
                                  headers, connection)
        return request

    def _do_handshake_for_logical_request(self, request, send_quota=0):
        try:
            receive_quota = self._channel_slots.popleft()
        except IndexError:
            raise LogicalChannelError(
                request.channel_id, _DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION)

        handshaker = _MuxHandshaker(request, self.dispatcher,
                                    send_quota, receive_quota)
        try:
            handshaker.do_handshake()
        except handshake.VersionException, e:
            self._logger.info('%s', e)
            self._send_error_add_channel_response(
                request.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
            return False
        except handshake.HandshakeException, e:
            # TODO(bashi): Should we _Fail the Logical Channel_ with 3001
            # instead?
            self._logger.info('%s', e)
            self._send_error_add_channel_response(request.channel_id,
                                                  status=e.status)
            return False
        except handshake.AbortedByUserException, e:
            self._logger.info('%s', e)
            self._send_error_add_channel_response(request.channel_id)
            return False

        return True

    def _add_logical_channel(self, logical_request):
        try:
            self._logical_channels_condition.acquire()
            if logical_request.channel_id in self._logical_channels:
                self._logger.debug('Channel id %d already exists' %
                                   logical_request.channel_id)
                raise PhysicalConnectionError(
                    _DROP_CODE_CHANNEL_ALREADY_EXISTS,
                    'Channel id %d already exists' %
                    logical_request.channel_id)
            worker = _Worker(self, logical_request)
            channel_data = _LogicalChannelData(logical_request, worker)
            self._logical_channels[logical_request.channel_id] = channel_data
            worker.start()
        finally:
            self._logical_channels_condition.release()

    def _process_add_channel_request(self, block):
        try:
            logical_request = self._create_logical_request(block)
        except ValueError, e:
            self._logger.debug('Failed to create logical request: %r' % e)
            self._send_error_add_channel_response(
                block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
            return
        if self._do_handshake_for_logical_request(logical_request):
            if block.encoding == _HANDSHAKE_ENCODING_IDENTITY:
                # Update handshake base.
                # TODO(bashi): Make sure this is the right place to update
                # handshake base.
                self._handshake_base = _HandshakeDeltaBase(
                    logical_request.headers_in)
            self._add_logical_channel(logical_request)
        else:
            self._send_error_add_channel_response(
                block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)

    def _process_flow_control(self, block):
        try:
            self._logical_channels_condition.acquire()
            if not block.channel_id in self._logical_channels:
                return
            channel_data = self._logical_channels[block.channel_id]
            channel_data.request.ws_stream.replenish_send_quota(
                block.send_quota)
        finally:
            self._logical_channels_condition.release()

    def _process_drop_channel(self, block):
        self._logger.debug(
            'DropChannel received for %d: code=%r, reason=%r' %
            (block.channel_id, block.drop_code, block.drop_message))
        try:
            self._logical_channels_condition.acquire()
            if not block.channel_id in self._logical_channels:
                return
            channel_data = self._logical_channels[block.channel_id]
            channel_data.drop_code = _DROP_CODE_ACKNOWLEDGED

            # Close the logical channel
            channel_data.request.connection.set_read_state(
                _LogicalConnection.STATE_TERMINATED)
            channel_data.request.ws_stream.stop_sending()
        finally:
            self._logical_channels_condition.release()

    def _process_control_blocks(self, parser):
        for control_block in parser.read_control_blocks():
            opcode = control_block.opcode
            self._logger.debug('control block received, opcode: %d' % opcode)
            if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
                self._process_add_channel_request(control_block)
            elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
                raise PhysicalConnectionError(
                    _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                    'Received AddChannelResponse')
            elif opcode == _MUX_OPCODE_FLOW_CONTROL:
                self._process_flow_control(control_block)
            elif opcode == _MUX_OPCODE_DROP_CHANNEL:
                self._process_drop_channel(control_block)
            elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
                raise PhysicalConnectionError(
                    _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
                    'Received NewChannelSlot')
            else:
                raise MuxUnexpectedException(
                    'Unexpected opcode %r' % opcode)

    def _process_logical_frame(self, channel_id, parser):
        self._logger.debug('Received a frame. channel id=%d' % channel_id)
        try:
            self._logical_channels_condition.acquire()
            if not channel_id in self._logical_channels:
                # We must ignore the message for an inactive channel.
                return
            channel_data = self._logical_channels[channel_id]
            fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame()
            consuming_byte = len(payload)
            if opcode != common.OPCODE_CONTINUATION:
                consuming_byte += 1
            if not channel_data.request.ws_stream.consume_receive_quota(
                consuming_byte):
                # The client violates quota. Close logical channel.
                raise LogicalChannelError(
                    channel_id, _DROP_CODE_SEND_QUOTA_VIOLATION)
            header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3,
                                   mask=False)
            frame_data = header + payload
            channel_data.request.connection.append_frame_data(frame_data)
        finally:
            self._logical_channels_condition.release()

    def dispatch_message(self, message):
        """Dispatches message. The reader thread calls this method.

        Args:
            message: a message that contains encapsulated frame.
        Raises:
            PhysicalConnectionError: if the message contains physical
                connection level errors.
            LogicalChannelError: if the message contains logical channel
                level errors.
        """

        parser = _MuxFramePayloadParser(message)
        try:
            channel_id = parser.read_channel_id()
        except ValueError, e:
            raise PhysicalConnectionError(_DROP_CODE_CHANNEL_ID_TRUNCATED)
        if channel_id == _CONTROL_CHANNEL_ID:
            self._process_control_blocks(parser)
        else:
            self._process_logical_frame(channel_id, parser)

    def notify_worker_done(self, channel_id):
        """Called when a worker has finished.

        Args:
            channel_id: channel id corresponded with the worker.
        """

        self._logger.debug('Worker for channel id %d terminated' % channel_id)
        try:
            self._logical_channels_condition.acquire()
            if not channel_id in self._logical_channels:
                raise MuxUnexpectedException(
                    'Channel id %d not found' % channel_id)
            channel_data = self._logical_channels.pop(channel_id)
        finally:
            self._worker_done_notify_received = True
            self._logical_channels_condition.notify()
            self._logical_channels_condition.release()

        if not channel_data.request.server_terminated:
            self._send_drop_channel(
                channel_id, code=channel_data.drop_code,
                message=channel_data.drop_message)

    def notify_reader_done(self):
        """This method is called by the reader thread when the reader has
        finished.
        """

        self._logger.debug(
            'Termiating all logical connections waiting for incoming data '
            '...')
        self._logical_channels_condition.acquire()
        for channel_data in self._logical_channels.values():
            try:
                channel_data.request.connection.set_read_state(
                    _LogicalConnection.STATE_TERMINATED)
            except Exception:
                self._logger.debug(traceback.format_exc())
        self._logical_channels_condition.release()

    def notify_writer_done(self):
        """This method is called by the writer thread when the writer has
        finished.
        """

        self._logger.debug(
            'Termiating all logical connections waiting for write '
            'completion ...')
        self._logical_channels_condition.acquire()
        for channel_data in self._logical_channels.values():
            try:
                channel_data.request.connection.on_writer_done()
            except Exception:
                self._logger.debug(traceback.format_exc())
        self._logical_channels_condition.release()

    def fail_physical_connection(self, code, message):
        """Fail the physical connection.

        Args:
            code: drop reason code.
            message: drop message.
        """

        self._logger.debug('Failing the physical connection...')
        self._send_drop_channel(_CONTROL_CHANNEL_ID, code, message)
        self._writer.stop(common.STATUS_INTERNAL_ENDPOINT_ERROR)

    def fail_logical_channel(self, channel_id, code, message):
        """Fail a logical channel.

        Args:
            channel_id: channel id.
            code: drop reason code.
            message: drop message.
        """

        self._logger.debug('Failing logical channel %d...' % channel_id)
        try:
            self._logical_channels_condition.acquire()
            if channel_id in self._logical_channels:
                channel_data = self._logical_channels[channel_id]
                # Close the logical channel. notify_worker_done() will be
                # called later and it will send DropChannel.
                channel_data.drop_code = code
                channel_data.drop_message = message

                channel_data.request.connection.set_read_state(
                    _LogicalConnection.STATE_TERMINATED)
                channel_data.request.ws_stream.stop_sending()
            else:
                self._send_drop_channel(channel_id, code, message)
        finally:
            self._logical_channels_condition.release()


def use_mux(request):
    return hasattr(request, 'mux_processor') and (
        request.mux_processor.is_active())


def start(request, dispatcher):
    mux_handler = _MuxHandler(request, dispatcher)
    mux_handler.start()

    mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS,
                                  _INITIAL_QUOTA_FOR_CLIENT)

    mux_handler.wait_until_done()


# vi:sts=4 sw=4 et
