`

Netty源码学习-CompositeChannelBuffer

阅读更多
CompositeChannelBuffer体现了Netty的“Transparent Zero Copy”

查看API(http://docs.jboss.org/netty/3.2/api/org/jboss/netty/buffer/package-summary.html#package_description
可以看到,所谓“Transparent Zero Copy”是通过 ChannelBuffers的wrappedBuffer方法来实现的:
 ChannelBuffer message = ChannelBuffers.wrappedBuffer(header, body);
 ChannelBuffer messageWithFooter = ChannelBuffers.wrappedBuffer(message, footer);

这样的好处是:
1.性能更好。因为避免了memory copy
2.返回的是一个ChannelBuffer,这样就统一了接口,保持了良好的封装和抽象

ChannelBuffers.wrappedBuffer的实现,要依靠CompositeChannelBuffer

先通过一个实例来说明CompositeChannelBuffer的机制

例如有以下两个ChannelBuffer要合并:
bufferA,容量为4,数据为“ABCD”:
+---+---+---+---+ 					
| 0  | 1  | 2  |  3 |
------------------
| A  | B  | C  |  D |
+---+---+---+---+ 

bufferB,容量为3,数据为“EFG”:
+---+---+---+					
| 0  | 1  | 2  |
--------------
| E  | F  | G  |
+---+---+---+

合并后,“看起来”是下面这样,但实际上并不会开辟新的内存空间:
bufferC,容量为7,数据为“ABCDEFG”
+---+---+---+---+---+---+---+				
| 0  | 1  | 2  |  3 | 4  | 5  | 6  |
--------------------------------
| A  | B  | C  |  D | E  | F  | G  |
+---+---+---+---+---+---+---+


CompositeChannelBuffer是怎么实现“Zero Copy”呢?

首先,用“ChannelBuffer[] components”保存对bufferA和bufferB的引用
注意这是保存引用,而不是复制数据,也就是
components[0] = bufferA
components[1] = bufferB


然后,用“int[] indices”记录bufferA和bufferB的长度:
indices[i] = indices[ i - 1] + component[i -1].length,也就是
indices[0] = 0;
indices[1] = 0 + 4 = 4;
indices[2] = 4 + 3 = 7;


最后看看是如何访问指定位置的数据
例如
ChannelBuffer bufferC = ChannelBuffers.wrappedBuffer(bufferA, bufferB);
那么bufferC.getByte(5)应该取得bufferB里面的“F”
过程是怎样呢?
首先要确定数据是在bufferA还是bufferB,此时,indices就派上用场了
因为 indices[1] < 5 < indices[2]
因此,数据在component[1],也就是bufferB里面
然后要把下标映射到bufferB的下标
因为bufferA长度为4(indices[1]),所以
bufferC.getBytes(5) = bufferB.getBytes(5 - 4) = "F"

说到底,还是指针的运算、下标的映射

另外,还用“int lastAccessedComponentId”缓存了上一个访问的component
比如,访问bufferB的“F”之后,接下来很有可能会访问“G”,因此lastAccessedComponentId=1
(表示上一次访问了bufferB)
加快了查询速度

CompositeChannelBuffer的关键源码:
public class CompositeChannelBuffer extends AbstractChannelBuffer {

    private ChannelBuffer[] components;
    private int[] indices;
    private int lastAccessedComponentId;
	
	public CompositeChannelBuffer(ByteOrder endianness, List<ChannelBuffer> buffers, boolean gathering) {
        setComponents(buffers);
    }
	
	private void setComponents(List<ChannelBuffer> newComponents) {
        // Clear the cache.
        lastAccessedComponentId = 0;
        components = new ChannelBuffer[newComponents.size()];
        for (int i = 0; i < components.length; i ++) {
            ChannelBuffer c = newComponents.get(i);
			
			//这两个assert表明了,只接受“满”的buffer
            assert c.readerIndex() == 0;
            assert c.writerIndex() == c.capacity();

            components[i] = c;	//保存引用
        }

        // Build the component lookup table.
        indices = new int[components.length + 1];
        indices[0] = 0;
        for (int i = 1; i <= components.length; i ++) {
            indices[i] = indices[i - 1] + components[i - 1].capacity();
        }

        // Reset the indexes.
        setIndex(0, capacity());	//设置合并后的readerIndex=0, writerIndex = 7(以文章开头的为例)
    }

	
	public byte getByte(int index) {
        int componentId = componentId(index);
		
		//计算得到真实的下标:index - indices[componentId]
        return components[componentId].getByte(index - indices[componentId]);
    }
	
	private int componentId(int index) {
	
		//先在上次访问的那一个buffer开始查找,先往右(后)查找,找不到再往前
		//例如,以文章开头的为例,访问“F”后,再访问“A”,往后查不到,那就需要往前
        int lastComponentId = lastAccessedComponentId;
        if (index >= indices[lastComponentId]) {
            if (index < indices[lastComponentId + 1]) {
                return lastComponentId;
            }

            // Search right
            for (int i = lastComponentId + 1; i < components.length; i ++) {
                if (index < indices[i + 1]) {
                    lastAccessedComponentId = i;
                    return i;
                }
            }
        } else {
            // Search left
            for (int i = lastComponentId - 1; i >= 0; i --) {
                if (index >= indices[i]) {
                    lastAccessedComponentId = i;
                    return i;
                }
            }
        }

        throw new IndexOutOfBoundsException("Invalid index: " + index + ", maximum: " + indices.length);
    }
}

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics