一、IO模型
- 阻塞式IO模型
- 非阻塞式IO模型
- IO復(fù)用
- 信號驅(qū)動式IO
- 異步IO
- linux IO流程

2. 各個IO模型的比較

NIO的優(yōu)勢
- 事件驅(qū)動模型 避免多線程 單線程處理多任務(wù)
- 非阻塞,IO讀寫不再阻塞,而是返回0
- 基于通道的傳輸,比基于流更有效率
- 更高級的IO函數(shù),零拷貝
- IO多路復(fù)用大大提高了JAVA網(wǎng)絡(luò)應(yīng)用的可伸縮性和實用性
NIO的缺點
- 編程困難
- 陷阱重重
3.TCP粘包,拆包問題
問題原因:TCP協(xié)議發(fā)送時造成


4.Netty的零拷貝
傳統(tǒng)拷貝方式
- 數(shù)據(jù)從磁盤讀取到內(nèi)核的read buffer
- 數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶緩沖區(qū)
- 數(shù)據(jù)從用戶緩沖區(qū)拷貝到內(nèi)核的socket buffer
- 數(shù)據(jù)從內(nèi)核的socket buffer拷貝到網(wǎng)卡接口(硬件)的緩沖區(qū)
零拷貝
- 調(diào)用transferTo,數(shù)據(jù)從文件由DMA引擎拷貝到內(nèi)核read buffer
- 接著DMA從內(nèi)核read buffer將數(shù)據(jù)拷貝到網(wǎng)卡接口buffer
Netty中的零拷貝體現(xiàn)在這三個方面:
1.bytebuffer
Netty發(fā)送和接收消息主要使用bytebuffer,bytebuffer使用對外內(nèi)存(DirectMemory)直接進(jìn)行Socket讀寫。
2.Composite Buffers
傳統(tǒng)的ByteBuffer,如果需要將兩個ByteBuffer中的數(shù)據(jù)組合到一起,我們需要首先創(chuàng)建一個size=size1+size2大小的新的數(shù)組,然后將兩個數(shù)組中的數(shù)據(jù)拷貝到新的數(shù)組中。但是使用Netty提供的組合ByteBuf,就可以避免這樣的操作,因為CompositeByteBuf并沒有真正將多個Buffer組合起來,而是保存了它們的引用,從而避免了數(shù)據(jù)的拷貝,實現(xiàn)了零拷貝。
3.對于FileChannel.transferTo的使用
Netty中使用了FileChannel的transferTo方法,該方法依賴于操作系統(tǒng)實現(xiàn)零拷貝。
二、Netty組件
- Channel – 對應(yīng)NIO中的channel
- EventLoop --對應(yīng)NIO中的while循環(huán)
- ChannelHandler和ChannelPipline --對應(yīng)NIO客戶邏輯實現(xiàn)handleRead和handleWrite
- ByteBuf --對應(yīng)Nio中的ByteBuffer
- BootStrap和BootServerStrap --對應(yīng)NIO中的Selecter、ServerSocketChannel等的創(chuàng)建、配置、啟動
Reactor線程模型
Rector線程模型有三種形式:
1.單線程模型:

2.多線程模型:

3.mutiple模型

Netty對這三種模式都有支持
三、簡單的例子
1.引入pom包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
2.服務(wù)器端
public class TimeServer {
public void bind(int port) throws InterruptedException {
EventLoopGroup boosGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try{
ServerBootstrap b=new ServerBootstrap();
b.group(boosGroup,workerGroup)
.channel(NIOServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
//綁定端口,同步等待成功
ChannelFuture future = b.bind(port).sync();
//等待服務(wù)端監(jiān)聽關(guān)閉
future.channel().closeFuture().sync();
}finally {
//優(yōu)雅退出,釋放線程池資源
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new TimeServer().bind(8888);
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
}
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf=(ByteBuf)msg;
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
String body=new String(req,"UTF-8");
System.out.println(body);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime=simpleDateFormat.format(new Date());
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
實戰(zhàn)彈幕系統(tǒng)
java:
public class WebSocketDanmuServer {
private int port;
public WebSocketDanmuServer(int port) {
this.port = port;
}
public void run(){
EventLoopGroup bossGroup=new NioEventLoopGroup(1);
EventLoopGroup workGroup=new NioEventLoopGroup(8);
try {
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketDanmuServerInitializer())
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
System.out.println("彈幕系統(tǒng)啟動了 "+port);
ChannelFuture future = b.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new WebSocketDanmuServer(8080).run();
}
}
public class WebsocketDanmuServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("http-decodec",new HttpRequestDecoder());
pipeline.addLast("http-aggregator",new HttpObjectAggregator(65536));
pipeline.addLast("http-encodec",new HttpResponseEncoder());
pipeline.addLast("http-chunked",new ChunkedWriteHandler());
/*
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64*1024));
pipeline.addLast(new ChunkedWriteHandler());
*/
pipeline.addLast("http-request",new HttpRequestHandler("/ws"));
pipeline.addLast("WebSocket-protocol",new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast("WebSocket-request",new TextWebSocketFrameHandler());
}
}
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location=HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "WebsocketDanMu.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if(wsUri.equalsIgnoreCase(request.getUri())){
ctx.fireChannelRead(request.retain());
}else {
if(HttpHeaders.is100ContinueExpected(request)){
send100Continue(ctx);
}
RandomaccessFile file = new RandomAccessFile(INDEX, "r");//4
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) { //5
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response); //6
if (ctx.pipeline().get(SslHandler.class) == null) { //7
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //8
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE); //9
}
file.close();
}
}
}
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
Channel incoming=ctx.channel();
for (Channel channel:channels){
if(channel!=incoming){
channel.writeAndFlush(new TextWebSocketFrame(msg.text()));
}else {
channel.writeAndFlush(new TextWebSocketFrame("我發(fā)送的 "+msg.text()));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
channels.add(incoming);
System.out.println("Client:"+incoming.remoteAddress() +"加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 離開"));
System.err.println("Client:"+incoming.remoteAddress() +"離開");
// A closed Channel is automatically removed from ChannelGroup,
// so there is no need to do "channels.remove(ctx.channel());"
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"在線");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.err.println("Client:"+incoming.remoteAddress()+"掉線");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) // (7)
throws Exception {
Channel incoming = ctx.channel();
System.err.println("Client:"+incoming.remoteAddress()+"異常");
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
ctx.close();
}
}
html:
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta name="Keywords" content="danmu">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>彈幕網(wǎng)站</title>
<style type="text/css">
body {
background: url(http://ot0ak6eri.bkt.clouddn.com/01.jpg); no-repeat:top center;
font-size: 12px;
font-family: "微軟雅黑";
}
* {
margin: 0;
padding: 0;
}
/* screen start*/
.screen {
width: 300px;
height: 100px;
background: #669900;
}
.dm {
width: 100%;
height: 100%;
position: absolute;
top: 0;
left: 0;
display: none;
}
.dm .d_screen .d_del {
width: 38px;
height: 38px;
background: #600;
display: block;
text-align: center;
line-height: 38px;
text-decoration: none;
font-size: 20px;
color: #fff;
border-radius: 19px;
border: 1px solid #fff;
z-index: 2;
position: absolute;
right: 20px;
top: 20px;
outline: none;
}
.dm .d_screen .d_del:hover {
background: #F00;
}
.dm .d_screen .d_mask {
width: 100%;
height: 100%;
background: #000;
position: absolute;
top: 0;
left: 0;
opacity: 0.6;
filter: alpha(opacity = 60);
z-index: 1;
}
.dm .d_screen .d_show {
position: relative;
z-index: 2;
}
.dm .d_screen .d_show div {
font-size: 26px;
line-height: 36px;
font-weight: 500;
position: absolute;
top: 76px;
left: 10;
color: #fff;
}
/*end screen*/
/*send start*/
.send {
width: 100%;
height: 76px;
position: absolute;
bottom: 0;
left: 0;
border: 1px solid red;
}
.send .s_filter {
width: 100%;
height: 76px;
background: #000;
position: absolute;
bottom: 0;
left: 0;
opacity: 0.6;
filter: alpha(opacity = 60);
}
.send .s_con {
width: 100%;
height: 76px;
position: absolute;
top: 0;
left: 0;
z-index: 2;
text-align: center;
line-height: 76px;
}
.send .s_con .s_text {
width: 800px;
height: 36px;
border: 0;
border-radius: 6px 0 0 6px;
outline: none;
}
.send .s_con .s_submit {
width: 100px;
height: 36px;
border-radius: 0 6px 6px 0;
outline: none;
font-size: 14px;
color: #fff;
background: #65c33d;
font-family: "微軟雅黑";
cursor: pointer;
border: 1px solid #5bba32;
}
.send .s_con .s_submit:hover {
background: #3eaf0e;
}
/*end send*/
</style>
</head>
<body>
<a href="#" id="startDm">開啟彈幕</a>
<!-- dm start -->
<div class="dm">
<!-- d_screen start -->
<div class="d_screen">
<a href="#" class="d_del">X</a>
<div class="d_mask"></div>
<div class="d_show">
</div>
</div>
<!-- end d_screen -->
<!-- send start -->
<div class="send">
<div class="s_filter"></div>
<div class="s_con">
<input type="text" class="s_text" /> <input type="button"
value="發(fā)表評論" class="s_submit" id="btn"/>
</div>
</div>
<!-- end send -->
</div>
<!-- end dm-->
<script type="text/JavaScript"
src="http://ajax.aspnetcdn.com/ajax/jQuery/jquery-1.8.0.js"></script>
<script type="text/javascript" >
String.prototype.endWith=function(str){
if(str==null||str==""||this.length==0||str.length>this.length)
return false;
if(this.substring(this.length-str.length)==str)
return true;
else
return false;
return true;
}
String.prototype.startWith=function(str){
if(str==null||str==""||this.length==0||str.length>this.length)
return false;
if(this.substr(0,str.length)==str)
return true;
else
return false;
return true;
}
</script>
<!--<script type="text/javascript" src="websocket.js"></script>-->
<script type="text/javascript">
$(function() {
$("#startDm,.d_del").click(function() {
$("#startDm,.dm").toggle(1000);
//init_screen();
});
$("#btn").click(function(){
send();
});
$(".s_text").keydown(function() {
var code = window.event.keyCode;
if (code == 13)//回車鍵按下時,輸出到彈幕
{
send();
}
});
});
function launch()
{
var _height = $(window).height();
var _left = $(window).width() - $("#"+index).width();
var time=10000;
if(index%2==0)
time=20000;
_top+=80;
if(_top>_height-100)
_top=80;
$("#"+index).css({
left:_left,
top:_top,
color:getRandomColor()
});
$("#"+index).animate({
left:"-"+_left+"px"},
time,
function(){});
index++;
}
/* //初始化彈幕
function init_screen() {
var _top = 0;
var _height = $(window).height();
$(".d_show").find("div").show().each(function() {
var _left = $(window).width() - $(this).width();
var time=10000;
if($(this).index()%2==0)
time=20000;
_top+=80;
if(_top>_height-100)
_top=80;
$(this).css({
left:_left,
top:_top,
color:getRandomColor()
});
$(this).animate({
left:"-"+_left+"px"},
time,
function(){});
});
} */
//隨機(jī)獲取顏色值
function getRandomColor() {
return '#' + (function(h) {
return new Array(7 - h.length).join("0") + h
})((Math.random() * 0x1000000 << 0).toString(16))
}
</script>
<script type="text/javascript">
var websocket=null;
var _top=80;
var index=0;
var host=window.location.host;
//判斷當(dāng)前瀏覽器是否支持WebSocket
if('WebSocket' in window){
websocket=new WebSocket("ws://"+host+"/ws");
}
else{
alert("Not Support WebSocket!");
}
//連接發(fā)生錯誤的回調(diào)方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//連接成功建立的回調(diào)方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回調(diào)方法
// 收到服務(wù)器發(fā)送的消息
websocket.onmessage = function(){
setMessageInnerHTML(event.data);
}
//連接關(guān)閉的回調(diào)方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//監(jiān)聽窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時,主動去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會拋異常。
window.onbeforeunload = function(){
websocket.close();
}
//將消息顯示在網(wǎng)頁上
function setMessageInnerHTML(innerHTML){
//修改背景圖
var imgurl;
if (innerHTML.startWith("~background,")) {
var cmd = innerHTML;
imgurl = cmd.split(",")[1];
document.body.style.background = "url("+imgurl+")";
}else{
$(".d_show").Append("<div id='"+index+"'>"+ innerHTML + "</div>");
}
launch();
}
//發(fā)送消息
function send(){
//var message = document.getElementById('text').value;
var message = $(".s_text").val();
$(".s_text").val("");
websocket.send(message);
}
</script>
</body>
</html>