`

Mina 框架源码解析-构建简单通信程序

 
阅读更多

Mina 框架源码解析-启动一个服务端程序

本文描述是使用mina框架构建一个简单通信程序,实现效果是客户端会定时向服务端发送一个数据包,服务端进行接收,并返回一个数据包,可以理解为一个心跳机制。

 

1,先写一个心跳包Bean类

package cn.std.services.server.heartbeat;

import java.io.Serializable;

/***
 * HeartBeat Package,this package will be sent to the HeartBeat Server,
 * and the HeartBeat Server will analyse the package
 * @author root
 * @Date 2012-8-8下午2:20:00
 */
public class HeartBeat implements Serializable{

	/**
	 * 
	 */
	private static final long serialVersionUID = 7967977427115512657L;

	/** from */
	private String from;
	/** to */
	private String to;
	/** isAlive */
	private boolean isAlive;
	/** level */
	private int level;
	

	public HeartBeat(){}
	public HeartBeat(String from, String to, boolean isAlive, int level) {
		this.from = from;
		this.to = to;
		this.isAlive = isAlive;
		this.level = level;
	}
	public String getFrom() {
		return from;
	}
	public void setFrom(String from) {
		this.from = from;
	}
	public String getTo() {
		return to;
	}
	public void setTo(String to) {
		this.to = to;
	}
	public boolean isAlive() {
		return isAlive;
	}
	public void setAlive(boolean isAlive) {
		this.isAlive = isAlive;
	}
	public int getLevel() {
		return level;
	}
	public void setLevel(int level) {
		this.level = level;
	}
	
}

 

2,服务端类

package cn.std.services.server.heartbeat;

import java.net.InetSocketAddress;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import cn.std.services.server.ServerManager;

import com.mine.BigMap;
import com.mine.logging.LogUtil;

public class HeartBeatServer extends Thread {
	private static LogUtil logger = LogUtil.getLogger(HeartBeatServer.class
			.getName());

	private int PORT = 0;

	public HeartBeatServer(int port) {
		this.PORT = port;
	}

	@Override
	public void run() {
		run(PORT);
	}

	private boolean run(int port) {
		boolean flag = false;
		IoAcceptor acceptor = null;
		try {
			// 创建一个非阻塞的server端的Socket
			acceptor = new NioSocketAcceptor();
			// 设置过滤器
			DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
			filterChain.addLast(
					"HeartBeatServer",
					new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
			// 设置读取数据的缓冲区大小
			acceptor.getSessionConfig().setReadBufferSize(2048);
			// 读写通道100秒内无操作进入空闲状态
			acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 100);
			// 绑定逻辑处理器
			acceptor.setHandler(new HeartBeatServerHandler());
			// 绑定端口
			acceptor.bind(new InetSocketAddress(port));
			flag = true;
			logger.info("HeartBeatServer-{} is Started!!!!",PORT,BigMap.port_used.size());
			BigMap.port_used.put(PORT, acceptor);
		} catch (Exception e) {
			logger.warning("服务端启动异常...." + e.getMessage());
			e.printStackTrace();
		}
		return flag;
	}
	public static void main(String[] args) {
		int port = 6002;
		new HeartBeatServer(port).start();
	}
}

 

3,服务端Handle类

package cn.std.services.server.heartbeat;

import java.net.InetAddress;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.scktxt.SckTxtPack;

import com.mine.logging.LogUtil;
import com.mine.logging.LogUtil2;

public class HeartBeatServerHandler extends IoHandlerAdapter {
	public static LogUtil logger = LogUtil.getLogger(HeartBeatServerHandler.class.getName());
	@Override
	public void sessionCreated(IoSession session) throws Exception {
		logger.debug("服务端与客户端创建连接...");
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		logger.debug("服务端与客户端连接打开...");
	}

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		if(message instanceof HeartBeat){
			HeartBeat hb = (HeartBeat)message;
			String from = hb.getFrom();
			boolean isAlive = hb.isAlive();
			int level = hb.getLevel();
			String to = hb.getTo();
			System.out.println(from+"\t"+isAlive+"\t"+Integer.toHexString(level)+"\t"+to);
			//level == 0,关闭client
			if(level==0){
				session.close(true);
			}else{
				//analyse()
				hb.setTo(hb.getFrom());
				hb.setFrom(InetAddress.getLocalHost().toString());
				hb.setLevel(0x1011);
				hb.setAlive(true);
				session.write(hb);
			//	session.close(true);
			}
		}else{
			System.out.println("date error ...");
		}
	}

	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		session.close(true);
		logger.debug("服务端发送信息成功...");
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status)
			throws Exception {
		logger.debug("服务端进入空闲状态...");
	}

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		logger.warning("服务端发送异常..."+cause.getMessage());	}
}

 

4,客户端类

package cn.std.services.client.heartbeat;

import java.net.InetAddress;
import java.net.InetSocketAddress;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

import cn.std.services.server.ServerManager;
import cn.std.services.server.heartbeat.HeartBeat;

import com.mine.logging.LogUtil;

public class HeartBeatClient implements Runnable {
	private static LogUtil logger = LogUtil.getLogger(HeartBeatClient.class
			.getName());

	/** Master Host 默认 localhost */
	private  String HOST ;
	/** Master Port 默认 6002 */
	private  int PORT ;
	/** 心跳频率 默认10s */
	private  int HeartBeatMills;

	private boolean isRun = true;
	public HeartBeatClient() {
               HOST = "127.0.0.1";
               PORT = 6002;
               HeartBeatMills = 100000;
               // HOST = ServerManager.props.getProperty("MasterHost","127.0.0.1");
	       // PORT = Integer.parseInt(ServerManager.props.getProperty("HeartBeatServerPort","6002"));
	       // HeartBeatMills = Integer.parseInt(ServerManager.props.getProperty("HeartBeatMills", "10000"));
	}

	public static void main(String[] args) {
	//	List<String> ls = Lists.load(ServerManager.props.getProperty("MasterHost"));
	//	System.out.println(ls.size());
		for (int i = 0; i < 1; i++) {
			new Thread(new HeartBeatClient()).start();
		}
	
	}

	@Override
	public void run() {
		while(isRun){
			sent();
			try {
				Thread.currentThread();
				Thread.sleep(HeartBeatMills);
				logger.info("system will call sent method...");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	public void stop(){
		isRun = false;
	}
	public void sent() {
		// 创建一个非阻塞的客户端程序
		IoConnector connector = new NioSocketConnector();
		// 设置链接超时时间
		connector.setConnectTimeoutMillis(30000);
		// 添加过滤器
		connector.getFilterChain().addLast("HeartBeatClient",
				new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
		// 添加业务逻辑处理器类
		connector.setHandler(new HeartBeatClientHandler());
		IoSession session = null;
		try {
			ConnectFuture future = connector.connect(new InetSocketAddress(
					HOST, PORT));// 创建连接
			future.awaitUninterruptibly();// 等待连接创建完成
			session = future.getSession();// 获得session
			for (int i = 0; i < 1; i++) {
				HeartBeat hb = new HeartBeat(InetAddress.getLocalHost()
						.toString(), HOST, true, 0x0101);
				session.write(hb);
			}
			 //session.close(true);
		} catch (Exception e) {
			logger.warning("客户端链接异常..." + e.getMessage());
		}

		session.getCloseFuture().awaitUninterruptibly();// 等待连接断开
		connector.dispose();
	}

	public String getHOST() {
		return HOST;
	}

	public void setHOST(String hOST) {
		HOST = hOST;
	}

	public int getPORT() {
		return PORT;
	}

	public void setPORT(int pORT) {
		PORT = pORT;
	}

	public int getHeartBeatMills() {
		return HeartBeatMills;
	}

	public void setHeartBeatMills(int heartBeatMills) {
		HeartBeatMills = heartBeatMills;
	}

}

 5,客户端Handle类

package cn.std.services.client.heartbeat;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;

import cn.std.services.server.heartbeat.HeartBeat;

import com.mine.logging.LogUtil;


public class HeartBeatClientHandler extends IoHandlerAdapter {
	private static LogUtil logger = LogUtil.getLogger(HeartBeatClientHandler.class.getName());

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		if(message instanceof HeartBeat){
			HeartBeat hb = (HeartBeat)message;
			String from = hb.getFrom();
			boolean isAlive = hb.isAlive();
			int level = hb.getLevel();
			String to = hb.getTo();
			System.out.println(from+"\t"+isAlive+"\t"+Integer.toHexString(level)+"\t"+to);
			
		}else{
			System.out.println("date error ...");
		}
	}

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		logger.warning("客户端发生异常..."+cause.getLocalizedMessage());
	}
}

先运行HeartBeartServer,然后再运行HeartBeatClient,就可看到效果,注意加入LogUtil,可自己编译LogUtil,也可在下面下载LogUtil的jar包

分享到:
评论

相关推荐

    JAVA上百实例源码以及开源项目源代码

    Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。编辑音乐软件的朋友,这款实例会对你有所帮助。 Calendar万年历 1个目标文件 EJB 模拟银行ATM流程及操作源代码 6个目标文件,EJB来模拟银行ATM...

    JAVA上百实例源码以及开源项目

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    java开源包8

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包1

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包10

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包11

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包2

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包3

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包6

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包5

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包4

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包7

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包9

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    java开源包101

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

    Java资源包01

    6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...

Global site tag (gtag.js) - Google Analytics