基于java的nio聊天室demo

实现的界面

客户端

张飞用户

刘备用户

关羽用户:

聊天室的具体功能如下: 

1: 群聊

2: 显示在线人数

3: 用户名信息

4: 用户离线, 通知其他在线的用户

聊天室代码:

链接:https://pan.baidu.com/s/1GNHRLAa3ij5Wb0tyT8jYMw
提取码:dd0x

参考: http://ifeve.com/selectors/

代码如下:

服务端代码

 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
 
public class chatServer {
    private int port;
    private Selector selector;
 
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);//调整缓冲区大小为1024字节
    private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
    private static final String USER_NAME_TAG = "$%%^&*()!@#$^%#@*()*";
    private HashSet<String> users = new HashSet<String>();
    private HashMap<String, String> Users = new HashMap<>();
    private String user_msg;
 
    public chatServer(int port){
        this.port = port;
    }
 
    public static void main(String[] args){
        new chatServer(8081).start();
    }
 
    public void start() {
        ServerSocketChannel ssc = null;
        try {
            ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);  //服务器配置为非阻塞 即异步IO
            ssc.socket().bind(new InetSocketAddress(port));   //绑定本地端口
            selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);   //ssc注册到selector准备连接
            System.out.println("ChatServer started ......");
        }catch (Exception e){
            e.printStackTrace();
        }
 
        while(true){
            try {
                int events = selector.select();
                if (events > 0) {
                    Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
                    while (selectionKeys.hasNext()) {
                        SelectionKey key = selectionKeys.next();
                        selectionKeys.remove();  //移除当前的key
                        if (key.isValid()) {
                            if (key.isAcceptable()) {
                                accept(key);
                            }
                            if(key.isReadable()){
                                read(key);
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = ssc.accept();
        clientChannel.configureBlocking(false);
        clientChannel.register(selector, SelectionKey.OP_READ);
        System.out.println("a new client connected "+clientChannel.getLocalAddress());
    }
 
    private void read(SelectionKey key) throws IOException{
        SocketChannel socketChannel = (SocketChannel) key.channel();
        this.readBuffer.clear();//清除缓冲区,准备接受新数据
        System.out.println("===============read");
        int numRead;
        try{
            numRead = socketChannel.read(this.readBuffer);
        }catch (IOException e){    // 客户端断开连接,这里会报错提示远程主机强迫关闭了一个现有的连接。
            offlineUser(key);
            key.cancel();
            socketChannel.close();
            return;
        }
        user_msg = new String(readBuffer.array(),0, numRead);
        for (String s: users) System.out.println("在线用户: " + s);
 
        if (user_msg.contains(USER_NAME_TAG)){   // 用户第一次登陆, 输入登录名
            String user_name = user_msg.replace(USER_NAME_TAG, "");
            user_msg = "欢迎: " + user_name + " 登录聊天室";
            users.add(socketChannel.getRemoteAddress().toString() + "===" + user_name);   // 客户端地址和用户名拼接在一起作为唯一标识
            brodcast(socketChannel, user_msg);
        }
        else if (user_msg.equals("1")){       // 显示在线人数
            user_msg = onlineUser();
            write(socketChannel, user_msg);
        }
        else {                                // 群聊
            String user = "";
            for (String s: users) {
                if (s.contains(socketChannel.toString())){
                    String[] s1 = s.split("===");
                    if (s1.length == 2){
                        user = "用户" + s1[1] + "对大家说:";
                    }else{
                        continue;
                    }
                }
            }
            brodcast(socketChannel, user + user_msg);
        }
    }
 
    private void write(SocketChannel channel, String content) throws IOException, ClosedChannelException {
        sendBuffer.clear();
        sendBuffer.put(content.getBytes());
        sendBuffer.flip();
        channel.write(sendBuffer);
        //注册读操作 下一次进行读
        channel.register(selector, SelectionKey.OP_READ);
    }
 
    /**
     *  用户下线,同时通知线上用户哪些用户下线了。
     */
    public void offlineUser(SelectionKey key) throws IOException{
        SocketChannel socketChannel = (SocketChannel) key.channel();
        for (String user: users){
            String[] s1 = user.split("===");
            if (s1.length == 2){
                String user_name = s1[1];
                if (user.contains(socketChannel.getRemoteAddress().toString())){
                    users.remove(user);
                    String message = "用户: " + user_name + " 下线了, 拜拜";
                    brodcast(socketChannel, message);
                }
            }else{
                continue;
            }
        }
    }
 
    /**
     *   在线用户
     */
    private String onlineUser(){
        String online_users = "在线用户:\n";
        String user = "";
        for (String s: users) {
            String[] s1 = s.split("===");
            if (s1.length == 2){
                user = s1[1];
            }else{
                continue;
            }
            online_users += "\t" + user + "\n";
        }
        System.out.println(" " + online_users);
        return online_users;
    }
 
    /**
     *   群聊
     */
    public void brodcast(SocketChannel except, String content) throws IOException{
        for (SelectionKey key: selector.keys()) {
            Channel targetchannel = key.channel();
            System.out.println("broadcast write:" + content);
            if(targetchannel instanceof SocketChannel && targetchannel != except) {
                SocketChannel channel = (SocketChannel) key.channel();
                write(channel, content);
            }
        }
    }
}

 

客户端代码

  
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
 
public class chatClient {
    private static final String host = "127.0.0.1";
    private static final int port = 8081;
    private  Selector selector;
    private  SocketChannel sc;
    private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 
    private static final String USER_NAME_TAG = "$%%^&*()!@#$^%#@*()*";
 
    volatile boolean running = true;
 
    private static final Logger LOG = LoggerFactory.getLogger(chatClient.class);
 
    public chatClient() throws IOException{
        connect(host, port);
//         // 读写分离
        listen();
        
        Reader reader = new Reader();
        reader.start();
    }
 
    public static void main(String[] args) throws IOException{
        System.out.println("===================================================================================");
        System.out.println("输入1: 显示在线用户");
        System.out.println("===================================================================================");
        new chatClient();
    }
 
    public void connect(String host, int port) {
        try {
            sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.connect(new InetSocketAddress(host, port));
            this.selector = Selector.open();
            sc.register( selector, SelectionKey.OP_CONNECT);   //将channel注册到selector中
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public void listen() {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            try {
                int events = selector.select();
                if (events > 0) {
                    Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
                    while (selectionKeys.hasNext()) {
                        SelectionKey selectionKey = selectionKeys.next();
                        selectionKeys.remove();
 
                        //连接事件
                        if (selectionKey.isConnectable()) {
                            sc.finishConnect();
                            System.out.println("server connected...");
                            // 人员登录
                            login(scanner);
                            //注册写操作
                            sc.register(selector, SelectionKey.OP_WRITE);
                            break;
                        }
                        else if (selectionKey.isWritable()){
                            String message = scanner.nextLine();
                            writeBuffer.clear();
                            writeBuffer.put(message.getBytes());
                            //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
                            writeBuffer.flip();
                            sc.write(writeBuffer);
 
                            sc.register(selector,  SelectionKey.OP_WRITE);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
 
    private void login(Scanner scanner) throws IOException{
        System.out.println("请输入登录名: ");
        String message = scanner.nextLine();
        writeBuffer.clear();
        writeBuffer.put((USER_NAME_TAG + message).getBytes());
        //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
        writeBuffer.flip();
        sc.write(writeBuffer);
    }
 
    protected class Reader extends Thread {
        private final Selector writeSelector;
 
        Reader() throws IOException {
            this.setName("Reader");
            this.setDaemon(true);
            writeSelector = Selector.open(); // create a selector
            sc.register(writeSelector,  SelectionKey.OP_READ);
        }
 
        @Override
        public void run() {
            try {
                doRunLoop();
            } finally {
                LOG.info(getName() + ": stopping");
                try {
                    writeSelector.close();
                } catch (IOException ioe) {
                    LOG.error(getName() + ": couldn't close write selector", ioe);
                }
            }
        }
 
        private void doRunLoop() {
            while (running) {
                try {
                    int keyCt = writeSelector.select();
                    if (keyCt == 0) {
                        continue;
                    }
                    Set<SelectionKey> keys = writeSelector.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        try {
                            if (key.isValid() && key.isReadable()) {
                                Thread.sleep(1);
                                SocketChannel client = (SocketChannel) key.channel();
                                //将缓冲区清空以备下次读取
                                readBuffer.clear();
                                int num = client.read(readBuffer);
                                System.out.println(new String(readBuffer.array(),0, num));
                                //注册读操作,下一次读
                                sc.register(selector, SelectionKey.OP_READ);
                            }
                        } catch (IOException e) {
                            LOG.debug(getName() + ": Reader", e);
                        }
                    }
                } catch (Exception e) {
                    LOG.warn(getName() + ": exception in Reader " + e);
                }
            }
        }
    }
}

作者: 执着小钟

执着小钟

发表评论