Java基础-网络编程BIO&NIO(四)

目录:

  1. Java BIO、NIO
  2. BIO模型及简单Demo
  3. NIO模型及简单Demo
  4. ByteBuffer主要操作方法
  5. Thrift框架NIO简单示意图

1. Java BIO、NIO

BIO:传统的同步阻塞IO,直白简述就是服务器端需要一个线程对应一个客户端,即如果有上万人的客户端连接,那么对服务器资源的消耗是巨大的,甚至直接崩溃。底层的数据读写使用的是InputStream、OutputStream实现,在数据的读写时候必须等待完成之后才能进行下一步工作。

NIO: 同步非阻塞IO,基于Reactor模型来实现。这个就相当于只要使用一个线程来处理客户端连接请求,一个或者多个线程来处理Channel的读写事件,最终再开启一个线程池去处理最终的业务逻辑数据。这里比BIO的核心处就是,一个客户端并不是时时刻刻都有数据需要处理的,而BIO是需要专门一个线程去处理对应的一个客户端连接。

2.BIO模型及简单Demo

BIO简单模型

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.jh.socket.bio;

import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @program:
* @description:
* @author: lishijia
* @create: 2019-07-15 17:18
**/
public class SocketServer {

@Test
public void server() throws IOException {
ServerSocket server = new ServerSocket(6379);
System.out.println("the server is start");
// 阻塞等待客户端连接
while(true) {
System.out.println("main thread wait for client to connect");
Socket socket = server.accept();
System.out.println("client connected");
Thread socketWorkThread = new Thread(new Worker(socket));
socketWorkThread.start();
}
}

static class Worker implements Runnable{
private Socket socket;

public Worker(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
InputStream inputStream = null;
try {
inputStream = socket.getInputStream();
// 一般来了连接后都是开一个线程去专门处理这个客户端
System.out.println("socket worker thread wait client to send data");
int len = 0;
byte[] bytes = new byte[1024];
while ((len = inputStream.read(bytes)) != -1) {
System.out.println("server receive data from client。 the data is = " + new String(bytes, 0, len));
}
}catch (IOException e){
}finally {
try {
if(inputStream!=null){
inputStream.close();
}
socket.close();
}catch (IOException e){}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.jh.socket.bio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
* @program:
* @description:
* @author: lishijia
* @create: 2019-07-15 17:18
**/
public class SocketClient {

public static void main(String[] args) throws IOException {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1", 6379));
OutputStream outputStream = socket.getOutputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String readData = null;
do {
readData = reader.readLine();
outputStream.write(readData.getBytes());
} while (!"quit".equals(readData));
outputStream.close();
socket.close();
}
}

3.NIO模型及简单Demo

NIO模型:

简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.jh.socket.nio;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;

/**
* @program:
* @description:
* @author: lishijia
* @create: 2019-07-16 14:17
**/
public class NioServer {

protected final Selector selector = SelectorProvider.provider().openSelector();
private ServerSocketChannel serverSocketChannel = null;

public NioServer() throws IOException {
}

@Test
public void server() throws IOException {
NioServer nioServer = new NioServer();
nioServer.initServer();
nioServer.handleAccpet();
}

public void initServer() throws IOException {
serverSocketChannel = ServerSocketChannel.open();
// 将channel设置为非阻塞
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(6379));

// 注册连接监听事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}

public void handleAccpet() throws IOException {
System.out.println("等待连接");
while (true) {
selector.select();

Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
while(selectionKeys.hasNext()){
SelectionKey selectionKey = selectionKeys.next();
selectionKeys.remove();
if(selectionKey.isAcceptable()){
SocketChannel sc = serverSocketChannel.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println("client connect");
} else if (selectionKey.isReadable()){
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
sc.read(byteBuffer);
byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));
byteBuffer.clear();

selectionKey.interestOps(SelectionKey.OP_WRITE);
} else if(selectionKey.isWritable()) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("hello client".getBytes());
byteBuffer.flip();
int len = 0;
while ( byteBuffer.hasRemaining()){
len = sc.write(byteBuffer);
if(len==0){
break;
}
}
byteBuffer.clear();
if(len != 0 ){
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
}

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.jh.socket.nio;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Random;

/**
* @program:
* @description:
* @author: lishijia
* @create: 2019-07-16 14:17
**/
public class NioClient {

protected final Selector selector = SelectorProvider.provider().openSelector();
private Random rnd = new Random();

public NioClient() throws IOException {
}

@Test
public void client() throws IOException, InterruptedException {
NioClient nioClient = new NioClient();
nioClient.initCient();
nioClient.handle();
}

public void initCient() throws IOException {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
sc.register(selector, interestSet);
sc.connect(new InetSocketAddress("127.0.0.1",6379));

while(!sc.finishConnect()) {
;
}
System.out.println("finish connect");
}

public void handle() throws IOException, InterruptedException {
while (true) {
selector.select();

Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

while (keys.hasNext()){

SelectionKey key = keys.next();
keys.remove();

SocketChannel sc = (SocketChannel) key.channel();

if(key.isReadable()){
ByteBuffer readByffer = ByteBuffer.allocate(1024);
sc.read(readByffer);
readByffer.flip();

System.out.println(new String(readByffer.array()));

readByffer.clear();
} else if(key.isWritable()){
ByteBuffer writerBuffer = ByteBuffer.allocate(1024);
writerBuffer.put("hello server".getBytes());

writerBuffer.flip();

sc.write(writerBuffer);

writerBuffer.clear();
}

}

Thread.sleep(1000 + rnd.nextInt(1000));

}
}
}

4.ByteBuffer主要操作方法

ByteBuffer对象的三个重要属性 position、limit和capacity。其中capacity表示缓存区的总容量大小,始终保持不变,促使时候position等于0,limit等于capacity。

  • put:向缓冲区放入数据

    调用put方法前,limit应该等于capacity,如果不等几乎可以肯定我们的缓存区操作有误。在put方法中0到position-1的区域表示未有效数据,position到limit区间的则为空闲区域。put方法会从position位置(包含)放入数据,每放一个position加一,当position等于limit的时候,如果还继续put数据则会抛异常BufferUnderflowException。

  • get:从缓存区读取数据

    在get方法中,0到limit这个区间快的数据代表是可读的,即每读取一个字节的数据position则加一,然后即position到limit这个区间快的数据还未读完。如果position等于limit的时候则会抛BufferUnderflowException。如果一个ByteBuffer刚写入了数据,即想马上通过get读取出来,一定要先调用flip方法,即从可写状态转换为可读状态。底层就是更改了position以及对应的limit

  • flip:将写模式转换为读模式

    1
    2
    3
    4
    5
    6
    public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
    }
  • clean:清空缓冲区数据,将读模式变为写模式。This method does not actually erase the data in the buffer, but itis named as if it did because it will most often be used in situations
    in which that might as well be the case. (这里解释clear这个方法并没有清楚缓冲区间的内容,仅仅只是复位了光标以及对应的limit)。但是如果往里写了数据后,对应的limit就会发生变化,然而读取的时候是不能超过对应的limit的,所以之前存留的数据是读取不到的,不会存在影响

    1
    2
    3
    4
    5
    6
    public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
    }
  • compact:保留未读取的数据,将读模式转换为写模式。即将剩余没有读取完的空间移动到起始位置,然后再position设置为没有读取完的数据末尾位置,再重置limit为capacity

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public ByteBuffer compact() {
    System.arraycopy(hb, ix(position()), hb, ix(0), remaining());

    position(remaining());
    limit(capacity());
    discardMark();

    return this;
    }
  • mark:保留当前position的位置放置到mark变量中。相当于做个标记,告诉这个对象我写到了这个位置或者读到了这个位置。可以配合rest实现重读某一块区域的数据

    1
    2
    3
    4
    public final Buffer mark() {
    mark = position;
    return this;
    }
  • rest:将position等于mark,即上次标记的位置。

    1
    2
    3
    4
    5
    6
    7
    public final Buffer reset() {
    int m = mark;
    if (m < 0)
    throw new InvalidMarkException();
    position = m;
    return this;
    }
  • rewind:从头开始重读,不可从写,因为不会清除已写的数据

    1
    2
    3
    4
    5
    public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
    }

    以上的api一定要了解清楚之后才能好好使用,即不能同时调用flip方法后马上调用compact方法,调用flip方法后不能马上调用put方法等。为了保证不使用错误,一般我们是创建两个ByteBuffer来控制读写,即一个ByteBuffer读,一个ByteBuffer负责写这样子也不会容易搞混。

4.Thrift框架NIO简单示意图

可以看到Thrift的框架针对NIO这块的使用,通过一个AcceptThread来负责处理客户端的连接请求,然后在轮询的方式获取一个SelectorThread线程来处理用户的读写事件,当然最终的业务处理是在有一个线程池在后续服务的。

分享到 评论