- 浏览: 778235 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
萨琳娜啊:
Java读源码之Netty深入剖析网盘地址:https://p ...
Netty源码学习-FileRegion -
飞天奔月:
写得有趣 ^_^
那一年你定义了一个接口 -
GoldRoger:
第二个方法很好
java-判断一个自然数是否是某个数的平方。当然不能使用开方运算 -
bylijinnan:
<script>alert("close ...
自己动手实现Java Validation -
paul920531:
39行有个bug:"int j=new Random ...
java-蓄水池抽样-要求从N个元素中随机的抽取k个元素,其中N无法确定
Netty里面采用了NIO-based Reactor Pattern
了解这个模式对学习Netty非常有帮助
参考以下两篇文章:
http://jeewanthad.blogspot.com/2013/02/reactor-pattern-explained-part-1.html
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
本文所贴的代码来自第一篇文章,在注释部分加入了我自己的理解
完整代码可以到我的github上下载,仅供参考:
https://github.com/bylijinnan/nettyLearn/tree/master/ljn-netty3-learn/src/main/java/com/ljn/reactor
package com.ljn.reactor; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /* 单线程的实现 Server端用一个Selector利用一个线程(在main方法里面start)来响应所有请求 1.当ACCEPT事件就绪,Acceptor被选中,执行它的run方法:创建一个Handler(例如为handlerA),并将Handler的interestOps初始为READ 2.当READ事件就绪,handlerA被选中,执行它的run方法:它根据自身的当前状态,来执行读或写操作 因此,每一个Client连接过来,Server就创建一个Handler,但都所有操作都在一个线程里面 Selection Key Channel Handler Interested Operation ------------------------------------------------------------------------ SelectionKey 0 ServerSocketChannel Acceptor Accept SelectionKey 1 SocketChannel 1 Handler 1 Read and Write SelectionKey 2 SocketChannel 2 Handler 2 Read and Write SelectionKey 3 SocketChannel 3 Handler 3 Read and Write 如果采用多个selector,那就是所谓的“Multiple Reactor Threads”,大体思路如下: Selector[] selectors; // also create threads int next = 0; class Acceptor { // ... public synchronized void run() { ... Socket connection = serverSocket.accept(); if (connection != null) new Handler(selectors[next], connection); if (++next == selectors.length) next = 0; } } */ public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocketChannel; final boolean isWithThreadPool; /*Reactor的主要工作: * 1.给ServerSocketChannel设置一个Acceptor,接收请求 * 2.给每一个一个SocketChannel(代表一个Client)关联一个Handler * 要注意其实Acceptor也是一个Handler(只是与它关联的channel是ServerSocketChannel而不是SocketChannel) */ Reactor(int port, boolean isWithThreadPool) throws IOException { this.isWithThreadPool = isWithThreadPool; selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); selectionKey0.attach(new Acceptor()); } public void run() { System.out.println("Server listening to port: " + serverSocketChannel.socket().getLocalPort()); try { while (!Thread.interrupted()) { int readySelectionKeyCount = selector.select(); if (readySelectionKeyCount == 0) { continue; } Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> it = selected.iterator(); while (it.hasNext()) { dispatch((SelectionKey) (it.next())); } //不会自动remove,因此要手动清;下次事件到来会自动添加 selected.clear(); } } catch (IOException ex) { ex.printStackTrace(); } } //从SelectionKey中取出Handler并执行Handler的run方法,没有创建新线程 void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) { r.run(); } } //主要工作是为每一个连接成功后返回的SocketChannel关联一个Handler,详见Handler的构造函数 class Acceptor implements Runnable { public void run() { try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { if (isWithThreadPool) new HandlerWithThreadPool(selector, socketChannel); else new Handler(selector, socketChannel); } System.out.println("Connection Accepted by Reactor2"); } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException{ int port = 9900; boolean withThreadPool = false; Reactor reactor = new Reactor(port, withThreadPool); new Thread(reactor).start(); } }
package com.ljn.reactor; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /* * 单线程版本的Handler */ public class Handler implements Runnable { final SocketChannel socketChannel; final SelectionKey selectionKey; ByteBuffer input = ByteBuffer.allocate(1024); static final int READING = 0, SENDING = 1; //初始状态 int state = READING; String clientName = ""; //在handler里面设置interestOps,而且这个interestOps是会随着事件的进行而改变的 Handler(Selector selector, SocketChannel c) throws IOException { socketChannel = c; c.configureBlocking(false); selectionKey = socketChannel.register(selector, 0); /* handler作为SellectionKey的attachment。这样,handler就与SelectionKey也就是interestOps对应起来了 反过来说,当interestOps发生、SelectionKey被选中时,就能从SelectionKey中取得handler */ selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } //在Reactor的dispatch方法里面被调用,但是直接的方法调用,没有创建新线程 public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { ex.printStackTrace(); } } void read() throws IOException { int readCount = socketChannel.read(input); if (readCount > 0) { readProcess(readCount); } state = SENDING; // Interested in writing selectionKey.interestOps(SelectionKey.OP_WRITE); } /** * Processing of the read message. This only prints the message to stdOut. * 非IO操作(业务逻辑,实际应用中可能会非常耗时):将Client发过来的信息(clientName)转成字符串形式 * @param readCount */ synchronized void readProcess(int readCount) { StringBuilder sb = new StringBuilder(); input.flip(); //from writing mode to reading mode byte[] subStringBytes = new byte[readCount]; byte[] array = input.array(); System.arraycopy(array, 0, subStringBytes, 0, readCount); // Assuming ASCII (bad assumption but simplifies the example) sb.append(new String(subStringBytes)); input.clear(); clientName = sb.toString().trim(); } void send() throws IOException { System.out.println("Saying hello to " + clientName); ByteBuffer output = ByteBuffer.wrap(("Hello " + clientName + "\n").getBytes()); socketChannel.write(output); selectionKey.interestOps(SelectionKey.OP_READ); state = READING; } }
package com.ljn.reactor; import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * 多线程版本的Handler * 思路就是把耗时的操作(非IO操作)放到其他线程里面跑, * 使得Handler只专注与Channel之间的IO操作; * Handler快速地从Channel中读或写,可以使Channel及时地、更快地响应其他请求 * 耗时的操作完成后,产生一个事件(改变state),再“通知”(由Handler轮询这个状态是否有改变) * Handler执行Channel的读写操作 */ public class HandlerWithThreadPool extends Handler { static ExecutorService pool = Executors.newFixedThreadPool(2); static final int PROCESSING = 2; public HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException { super(sel, c); } //Handler从SocketChannel中读到数据后,把“数据的处理”这个工作扔到线程池里面执行 void read() throws IOException { int readCount = socketChannel.read(input); if (readCount > 0) { state = PROCESSING; //execute是非阻塞的,所以要新增一个state(PROCESSING),表示数据在处理当中,Handler还不能执行send操作 pool.execute(new Processer(readCount)); } //We are interested in writing back to the client soon after read processing is done. //这时候虽然设置了OP_WRITE,但下一次本Handler被选中时不会执行send()方法,因为state=PROCESSING //或者可以把这个设置放到Processer里面,等process完成后再设为OP_WRITE selectionKey.interestOps(SelectionKey.OP_WRITE); } //Start processing in a new Processer Thread and Hand off to the reactor thread. synchronized void processAndHandOff(int readCount) { readProcess(readCount); //Read processing done. Now the server is ready to send a message to the client. state = SENDING; } class Processer implements Runnable { int readCount; Processer(int readCount) { this.readCount = readCount; } public void run() { processAndHandOff(readCount); } } }
发表评论
-
TCP的TIME-WAIT
2014-04-23 16:35 1167原文连接:http://vincent.bernat.im/e ... -
《TCPIP详解卷1》学习-拥塞避免
2014-01-15 15:16 159拥塞避免算法、 ... -
Netty源码学习-HTTP-tunnel
2014-01-14 18:19 4252Netty关于HTTP tunnel的说明: http://d ... -
Netty源码学习-FileRegion
2013-12-31 17:17 5572今天看org.jboss.netty.example.http ... -
Netty源码学习-HttpChunkAggregator-HttpRequestEncoder-HttpResponseDecoder
2013-12-27 16:10 4000今天看Netty如何实现一个Http Server org.j ... -
Netty源码学习-ReadTimeoutHandler
2013-12-26 17:53 3769ReadTimeoutHandler的实现思 ... -
Netty学习笔记
2013-12-25 18:39 1457本文是阅读以下两篇文章时: http://seeallhear ... -
Netty源码学习-ChannelHandler
2013-12-25 18:12 1593一般来说,“有状态”的ChannelHandler不应 ... -
Netty源码学习-ServerBootstrap启动及事件处理过程
2013-12-19 20:11 10695Netty是采用了Reactor模式的多线程版本,建议先看下面 ... -
Netty源码学习-ReplayingDecoder
2013-12-13 20:21 4223ReplayingDecoder是FrameDecoder的子 ... -
Netty源码学习-DefaultChannelPipeline2
2013-12-11 15:47 1248Netty3的API http://docs.jboss.or ... -
Netty源码学习-CompositeChannelBuffer
2013-12-06 15:54 2727CompositeChannelBuffer体现了Netty的 ... -
Netty源码学习-DelimiterBasedFrameDecoder
2013-12-05 18:36 9485看DelimiterBasedFrameDecoder的AP ... -
Netty源码学习-ObjectEncoder和ObjectDecoder
2013-12-05 16:06 4952Netty中传递对象的思路很直观: Netty中数据的传递是基 ... -
Netty源码学习-LengthFieldBasedFrameDecoder
2013-12-05 15:20 7240先看看LengthFieldBasedFrameDecoder ... -
Netty源码学习-FrameDecoder
2013-11-28 18:38 3868Netty 3.x的user guide里FrameDecod ... -
Netty源码学习-DefaultChannelPipeline
2013-11-27 17:00 2185package com.ljn.channel; /** ...
相关推荐
1.BIO、NIO 和 AIO 的区别? BIO:一个连接一个线程,客户端有连接请求时服务器端就需要启动一个线程进行处理。线 程开销大。 伪异步 IO:将请求连接放入线程池,一对多,但线程还是很宝贵的资源。 NIO:一个请求一...
reactor多线程,java代码demo,帮助理解reactor模式;由于是测试代码,故不保证一定正确,能正常接入传输数据,目前数据包处理没做,故会出现数据截断
Java编程方法论-响应式篇-Reactor分享视频已完结B站: : 油管: : list PL95Ey4rht7980EH8yr7SLBvj9XSE1ggdyJava编程方法论-响应式篇-Reactor-Netty相关博文: : 视频分享: B站: : 油管: : 6qLh2L75KdM list PL95...
通过Netty基于Java NIO的https代理服务器的实现。 这是一个简单的工作原理: 要通过浏览器对其进行测试,我们将需要设置一个虚拟主机名,如下所示: 127.0.0.1 test.localdomain 这样浏览器就能将SNI发送到我们...
Java编程方法论-JDK篇之NIO分享视频在分享 相关博文: : B站: : Java编程方法论-JDK文章之JUC分享待上传 Java编程方法论-Netty篇在分享 B站: : simviso出品国外顶级开发者分享视频翻译 simviso出品国外顶级高校...
Netty学习实践 源码分析 BIO/NIO/AIO基础 阻塞I/O 非阻塞I/O I/O复用 信号驱动的I/O 异步I/O Java I/O模型 同步阻塞IO 1:1同步阻塞IO通信模型 M:N形式的同步阻塞IO通信模型 非阻塞式IO模型(NIO) NIO+单线程Reactor...
积分管理系统java源码 Play Java 网络 & IO 网络 IO BIO NIO 实操 AIO 操作系统 Java Core Java Concurrency Basics Advanced Concurrency in Java Other ...
Java异步NIO框架Netty实现高性能高并发无标题笔记 1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨 节点...
35_Java NIO核心类源码解读与分析 36_文件通道用法详解 37_Buffer深入详解 38_NIO堆外内存与零拷贝深入讲解 39_NIO中Scattering与Gathering深度解析 40_Selector源码深入分析 41_NIO网络访问模式分析 42_NIO网络编程...
Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接; 2)作为NIO客户端,向服务端发起TCP连接; 3)读取通信对端的请求或者应答消息;...
6)Netty源码分析 ByteBuf工作原理 Channel, Unsafe ChannelPipline, ChannelHandler EventLoop, EventLoopGroup Future, Promise 7) Netty逻辑架构 8)Netty中的多线程编程 9)Netty与RPC 10)Netty的可靠性 ...
非双层IO(React堆Reactor) 触发 无 选择器(选择性机制) 1.3 IO的操作汇总 IO(BIO)同步双向IO NIO异步非双向IO(使用线程池来进行实现) 1.4 IO的概念 BIO适用于连接数量比较小,而且是固定的架构,程序了解...
第35讲:Java NIO核心类源码解读与分析 第36讲:文件通道用法详解 第37讲:Buffer深入详解 第38讲:NIO堆外内存与零拷贝深入讲解 第39讲:NIO中Scattering与Gathering深度解析 第40讲:Selector源码深入分析 ...
精通并发与netty视频教程(2018)视频教程 netty视频教程 Java视频教程目录: 1_学习的要义 2_Netty宏观理解 3_Netty课程大纲深度解读 4_项目环境搭建与Gradle配置 5_Netty执行流程分析与重要组件介绍 6_Netty回调与...
Java编程方法论-响应式篇-Reactor-Netty 分享视频 在分享 相关博文: 视频分享: B站: 油管: Java编程方法论-JDK 篇 之 NIO 分享视频 在分享 相关博文: B站: 油管: 所有直播分享均在QQ群进行,同时也欢迎大家加...
「喜欢的自提」 可以直接运行的代码,里面是我在工作中用来学和和测试的代码,一般是得到想要的测试结果,就直接用于实战项目中...17、BIO、NIO、AIO、Reactor、netty相关学习总结和测试(客户端/服务端通信测试demo)
: list PL95Ey4rht7980EH8yr7SLBvj9XSE1ggdyJava编程方法论-响应式篇-Reactor-Netty相关博文: : 视频分享: B站: : 油管: : 6qLh2L75KdM list PL95Ey4rht79-ISlb_Yr9ToaEI0K8ARmH6Java编程方法论-JDK篇之NIO分享...
NIO,为Reactor模式,非附加I / O。类似点菜,等待被叫,先接收连接,当有请求时在去处理; AIO,为Procator模式,异步I / O。类似包厢,接收连接,当数据处理好通过替代给程序。 绑定端口最终会调用dobind()...
Netty权威指南PDF 了解Java NIO知识,通过Netty创建后台server,心跳维持,多线程业务处理等