`

Java异步socket

 
阅读更多

用异步输入输出流编写Socket进程通信程序
在Merlin中加入了用于实现异步输入输出机制的应用程序接口 包:java.nio(新的输入输出包,定义了很多基本类型缓冲(Buffer)),java.nio.channels(通道及选择器等,用于异步输入 输出),java.nio.charset(字符的编码解码)。通道(Channel)首先在选择器(Selector)中注册自己感兴趣的事件,当相应 的事件发生时,选择器便通过选择键(SelectionKey)通知已注册的通道。然后通道将需要处理的信息,通过缓冲(Buffer)打包,编码/解 码,完成输入输出控制。

通道介绍:
这里主要介绍ServerSocketChannel和 SocketChannel.它们都是可选择的(selectable)通道,分别可以工作在同步和异步两种方式下(注意,这里的可选择不是指可以选择两 种工作方式,而是指可以有选择的注册自己感兴趣的事件)。可以用channel.configureBlocking(Boolean )来设置其工作方式。与以前版本的API相比较,ServerSocketChannel就相当于 ServerSocket(ServerSocketChannel封装了ServerSocket),而SocketChannel就相当于 Socket(SocketChannel封装了Socket)。当通道工作在同步方式时,编程方法与以前的基本相似,这里主要介绍异步工作方式。

所 谓异步输入输出机制,是指在进行输入输出处理时,不必等到输入输出处理完毕才返回。所以异步的同义语是非阻塞(None Blocking)。在服务器端,ServerSocketChannel通过静态函数open()返回一个实例serverChl。然后该通道调用 serverChl.socket().bind()绑定到服务器某端口,并调用register(Selector sel, SelectionKey.OP_ACCEPT)注册OP_ACCEPT事件到一个选择器中(ServerSocketChannel只可以注册 OP_ACCEPT事件)。当有客户请求连接时,选择器就会通知该通道有客户连接请求,就可以进行相应的输入输出控制了;在客户端,clientChl实 例注册自己感兴趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的组合),调用 clientChl.connect(InetSocketAddress )连接服务器然后进行相应处理。注意,这里的连接是异步的,即会立即返回而继续执行后面的代码。

选择器和选择键介绍:
选择器 (Selector)的作用是:将通道感兴趣的事件放入队列中,而不是马上提交给应用程序,等已注册的通道自己来请求处理这些事件。换句话说,就是选择器 将会随时报告已经准备好了的通道,而且是按照先进先出的顺序。那么,选择器是通过什么来报告的呢?选择键(SelectionKey)。选择键的作用就是 表明哪个通道已经做好了准备,准备干什么。你也许马上会想到,那一定是已注册的通道感兴趣的事件。不错,例如对于服务器端serverChl来说,可以调 用key.isAcceptable()来通知serverChl有客户端连接请求。相应的函数还 有:SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一个循环中轮询感兴趣的事件 (具体可参照下面的代码)。如果选择器中尚无通道已注册事件发生,调用Selector.select()将阻塞,直到有事件发生为止。另外,可以调用 selectNow()或者select(long timeout)。前者立即返回,没有事件时返回0值;后者等待timeout时间后返回。一个选择器最多可以同时被63个通道一起注册使用。
应用实例:
下面是用异步输入输出机制实现的客户/服务器实例程序――程序清单1(限于篇幅,只给出了服务器端实现,读者可以参照着实现客户端代码):



程序类图




public class NBlockingServer {
int port = 8000;
int BUFFERSIZE = 1024;
Selector selector = null;
ServerSocketChannel serverChannel = null;
HashMap clientChannelMap = null;//用来存放每一个客户连接对应的套接字和通道

public NBlockingServer( int port ) {
this.clientChannelMap = new HashMap();
this.port = port;
}

public void initialize() throws IOException {
//初始化,分别实例化一个选择器,一个服务器端可选择通道
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
InetAddress localhost = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(localhost, this.port );
this.serverChannel.socket().bind(isa);//将该套接字绑定到服务器某一可用端口
}
//结束时释放资源
public void finalize() throws IOException {
this.serverChannel.close();
this.selector.close();
}
//将读入字节缓冲的信息解码
public String decode( ByteBuffer byteBuffer ) throws
CharacterCodingException {
Charset charset = Charset.forName( "ISO-8859-1" );
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode( byteBuffer );
String result = charBuffer.toString();
return result;
}
//监听端口,当通道准备好时进行相应操作
public void portListening() throws IOException, InterruptedException {
//服务器端通道注册OP_ACCEPT事件
SelectionKey acceptKey =this.serverChannel.register( this.selector,
SelectionKey.OP_ACCEPT );
//当有已注册的事件发生时,select()返回值将大于0
while (acceptKey.selector().select() > 0 ) {
System.out.println("event happened");
//取得所有已经准备好的所有选择键
Set readyKeys = this.selector.selectedKeys();
//使用迭代器对选择键进行轮询
Iterator i = readyKeys.iterator();
while (i
else if ( key.isReadable() ) {//如果是通道读准备好事件
System.out.println("Readable");
//取得选择键对应的通道和套接字
SelectableChannel nextReady =
(SelectableChannel) key.channel();
Socket socket = (Socket) key.attachment();
//处理该事件,处理方法已封装在类ClientChInstance中
this.readFromChannel( socket.getChannel(),
(ClientChInstance)
this.clientChannelMap.get( socket ) );
}
else if ( key.isWritable() ) {//如果是通道写准备好事件
System.out.println("writeable");
//取得套接字后处理,方法同上
Socket socket = (Socket) key.attachment();
SocketChannel channel = (SocketChannel)
socket.getChannel();
this.writeToChannel( channel,"This is from server!");
}
}
}
}
//对通道的写操作
public void writeToChannel( SocketChannel channel, String message )
throws IOException {
ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
int nbytes = channel.write( buf );
}
//对通道的读操作
public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )
throws IOException, InterruptedException {
ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE );
int nbytes = channel.read( byteBuffer );
byteBuffer.flip();
String result = this.decode( byteBuffer );
//当客户端发出”@exit”退出命令时,关闭其通道
if ( result.indexOf( "@exit" ) >= 0 ) {
channel.close();
}
else {
clientInstance.append( result.toString() );
//读入一行完毕,执行相应操作
if ( result.indexOf( "/n" ) >= 0 ){
System.out.println("client input"+result);
clientInstance.execute();
}
}
}
//该类封装了怎样对客户端的通道进行操作,具体实现可以通过重载execute()方法
public class ClientChInstance {
SocketChannel channel;
StringBuffer buffer=new StringBuffer();
public ClientChInstance( SocketChannel channel ) {
this.channel = channel;
}
public void execute() throws IOException {
String message = "This is response after reading from channel!";
writeToChannel( this.channel, message );
buffer = new StringBuffer();
}
//当一行没有结束时,将当前字窜置于缓冲尾
public void append( String values ) {
buffer.append( values );
}
}


//主程序
public static void main( String[] args ) {
NBlockingServer nbServer = new NBlockingServer(8000);
try {
nbServer.initialize();
} catch ( Exception e ) {
e.printStackTrace();
System.exit( -1 );
}
try {
nbServer.portListening();
}
catch ( Exception e ) {
e.printStackTrace();
}
}
}

程序清单1

小结:
从 以上程序段可以看出,服务器端没有引入多余线程就完成了多客户的客户/服务器模式。该程序中使用了回调模式(CALLBACK)。需要注意的是,请不要将 原来的输入输出包与新加入的输入输出包混用,因为出于一些原因的考虑,这两个包并不兼容。即使用通道时请使用缓冲完成输入输出控制。该程序在 Windows2000,J2SE1.4下,用telnet测试成功。

 

 

 

 

 

 

一、关键字:

thread(线程)、thread-safe(线程安全)、intercurrent(并发的)

synchronized(同步的)、asynchronized(异步的)、

volatile(易变的)、atomic(原子的)、share(共享)

二、总结背景:

一次读写共享文件编写,嚯,好家伙,竟然揪出这些零碎而又是一路的知识点。于是乎,Google和翻阅了《Java参考大全》、《Effective Java Second Edition》,特此总结一下供日后工作学习参考。

三、概念:

1、  什么时候必须同步?什么叫同步?如何同步?

       要跨线程维护正确的可见性,只要在几个线程之间共享非 final 变量,就必须使用 synchronized(或 volatile)以确保一个线程可以看见另一个线程做的更改。

为了在线程之间进行可靠的通信,也为了互斥访问,同步是必须的。这归因于java语言规范的内存模型,它规定了:一个线程所做的变化何时以及如何变成对其它线程可见。

因 为多线程将异步行为引进程序,所以在需要同步时,必须有一种方法强制进行。例如:如果2个线程想要通信并且要共享一个复杂的数据结构,如链表,此时需要确 保它们互不冲突,也就是必须阻止B线程在A线程读数据的过程中向链表里面写数据(A获得了锁,B必须等A释放了该锁)。

为了达到这个目 的,java在一个旧的的进程同步模型——监控器(Monitor)的基础上实现了一个巧妙的方案:监控器是一个控制机制,可以认为是一个很小的、只能容 纳一个线程的盒子,一旦一个线程进入监控器,其它的线程必须等待,直到那个线程退出监控为止。通过这种方式,一个监控器可以保证共享资源在同一时刻只可被 一个线程使用。这种方式称之为同步。(一旦一个线程进入一个实例的任何同步方法,别的线程将不能进入该同一实例的其它同步方法,但是该实例的非同步方法仍 然能够被调用)。

错误的理解:同步嘛,就是几个线程可以同时进行访问。

同步和多线程关系:没多线程环境就不需要同步;有多线程环境也不一定需要同步。

锁提供了两种主要特性:互斥(mutual exclusion) 和可见性(visibility)。

互斥即一次只允许一个线程持有某个特定的锁,因此可使用该特性实现对共享数据的协调访问协议,这样,一次就只有一个线程能够使用该共享数据。

可见性要更加复杂一些,它必须确保释放锁之前对共享数据做出的更改对于随后获得该锁的另一个线程是可见的 —— 如果没有同步机制提供的这种可见性保证,线程看到的共享变量可能是修改前的值或不一致的值,这将引发许多严重问题

小 结:为了防止多个线程并发对同一数据的修改,所以需要同步,否则会造成数据不一致(就是所谓的:线程安全。如java集合框架中Hashtable和 Vector是线程安全的。我们的大部分程序都不是线程安全的,因为没有进行同步,而且我们没有必要,因为大部分情况根本没有多线程环境)。



2、  什么叫原子的(原子操作)?

     Java原子操作是指:不会被打断地的操作。(就是做到互斥 和可见性?!)

那难道原子操作就可以真的达到线程安全同步效果了吗?实际上有一些原子操作不一定是线程安全的。

那 么,原子操作在什么情况下不是线程安全的呢?也许是这个原因导致的:java线程允许线程在自己的内存区保存变量的副本。允许线程使用本地的私有拷贝进行 工作而非每次都使用主存的值是为了提高性能(本人愚见:虽然原子操作是线程安全的,可各线程在得到变量(读操作)后,就是各自玩弄自己的副本了,更新操作 (写操作)因未写入主存中,导致其它线程不可见)。

那该如何解决呢?因此需要通过java同步机制。

     在java中,32位或者更少位数的赋值是原子的。在一个32位的硬件平台上,除了double和long型的其它原始类型通常都是使用32位进行表示, 而double和long通常使用64位表示。另外,对象引用使用本机指针实现,通常也是32位的。对这些32位的类型的操作是原子的。

     这些原始类型通常使用32位或者64位表示,这又引入了另一个小小的神话:原始类型的大小是由语言保证的。这是不对的。java语言保证的是原始类型的表 数范围而非JVM中的存储大小。因此,int型总是有相同的表数范围。在一个JVM上可能使用32位实现,而在另一个JVM上可能是64位的。在此再次强 调:在所有平台上被保证的是表数范围,32位以及更小的值的操作是原子的。

    

3、  不要搞混了:同步、异步

举个例子:普通B/S模式(同步)AJAX技术(异步)

同步:提交请求->等待服务器处理->处理完返回 这个期间客户端浏览器不能干任何事

异步:请求通过事件触发->服务器处理(这是浏览器仍然可以作其他事情)->处理完毕

可见,彼“同步”非此“同步”——我们说的java中的那个共享数据同步(synchronized)

一个同步的对象是指行为(动作),一个是同步的对象是指物质(共享数据)。



4、  Java同步机制有4种实现方式:(部分引用网上资源)

①    ThreadLocal ② synchronized( ) ③ wait() 与 notify() ④ volatile

目的:都是为了解决多线程中的对同一变量的访问冲突
ThreadLocal
    ThreadLocal 保证不同线程拥有不同实例,相同线程一定拥有相同的实例,即为每一个使用该变量的线程提供一个该变量值的副本,每一个线程都可以独立改变自己的副本,而不是与其它线程的副本冲突。

优势:提供了线程安全的共享对象

与其它同步机制的区别:同步机制是为了同步多个线程对相同资源的并发访问,是为了多个线程之间进行通信;而 ThreadLocal 是隔离多个线程的数据共享,从根本上就不在多个线程之间共享资源,这样当然不需要多个线程进行同步了。

volatile
     volatile 修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。
    优势:这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。
    缘由:Java 语言规范中指出,为了获得最佳速度,允许线程保存共享成员变量的私有拷贝,而且只当线程进入或者离开同步代码块时才与共享成员变量的原始值对比。这样当多 个线程同时与某个对象交互时,就必须要注意到要让线程及时的得到共享成员变量的变化。而 volatile 关键字就是提示 VM :对于这个成员变量不能保存它的私有拷贝,而应直接与共享成员变量交互。
     使用技巧:在两个或者更多的线程访问的成员变量上使用 volatile 。当要访问的变量已在 synchronized 代码块中,或者为常量时,不必使用。
        线程为了提高效率,将某成员变量(如A)拷贝了一份(如B),线程中对A的访问其实访问的是B。只在某些动作时才进行A和B的同步,因此存在A和B不一致 的情况。volatile就是用来避免这种情况的。 volatile告诉jvm,它所修饰的变量不保留拷贝,直接访问主内存中的(读操作多时使用较好;线程间需要通信,本条做不到)

   Volatile 变量具有 synchronized 的可见性特性,但是不具备原子特性。这就是说线程能够自动发现 volatile 变量的最新值。Volatile 变量可用于提供线程安全,但是只能应用于非常有限的一组用例:多个变量之间或者某个变量的当前值与修改后值之间没有约束。

            您只能在有限的一些情形下使用 volatile 变量替代锁。要使 volatile 变量提供理想的线程安全,必须同时满足下面两个条件:

对变量的写操作不依赖于当前值;该变量没有包含在具有其他变量的不变式中。



sleep()  vs wait()
  sleep是线程类(Thread)的方法,导致此线程暂停执行指定时间,把执行机会给其他线程,但是监控状态依然保持,到时后会自动恢复。调用sleep不会释放对象锁。
  wait是Object类的方法,对此对象调用wait方法导致本线程放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象发出notify方法(或notifyAll)后本线程才进入对象锁定池准备获得对象锁进入运行状态。

(如果变量被声明为volatile,在每次访问时都会和主存一致;如果变量在同步方法或者同步块中被访问,当在方法或者块的入口处获得锁以及方法或者块退出时释放锁时变量被同步。)



四、例子:

Demo1:

package test.thread;



/**

*

* @author ydj

*

*/

class SynTest{

   

    //非同步

    static void method(Thread thread){

        System.out.println("begin "+thread.getName());

        try{

        Thread.sleep(2000);

        }catch(Exception ex){

        ex.printStackTrace();

        }

        System.out.println("end "+thread.getName());

    }

   

    //同步方式一:同步方法

    synchronized static void method1(Thread thread){//这个方法是同步的方法,每次只有一个线程可以进来

       System.out.println("begin "+thread.getName());

        try{

        Thread.sleep(2000);

        }catch(Exception ex){

         ex.printStackTrace();

        }

        System.out.println("end "+thread.getName());

       }

   

    //同步方式二:同步代码块

    static void method2(Thread thread){

        synchronized(SynTest.class) {

        System.out.println("begin "+thread.getName());

        try{

              Thread.sleep(2000);

            }catch(Exception ex){

              ex.printStackTrace();

            }

            System.out.println("end "+thread.getName());

        }

    }

   



    //同步方式三:使用同步对象锁

    private static Object _lock1=new Object();

    private static byte _lock2[]={};//据说,此锁更可提高性能。源于:锁的对象越小越好

    static void method3(Thread thread){

        synchronized(_lock1) {

        System.out.println("begin "+thread.getName());

        try{

              Thread.sleep(2000);

            }catch(Exception ex){

              ex.printStackTrace();

            }

            System.out.println("end "+thread.getName());

        }

    }

   

    public static void main(String[] args){

       //启动3个线程,这里用了匿名类

      for(int i=0;i<3;i++){

            new Thread(){

                public void run(){

                  method(this);

                  //method1(this);

                  //method2(this);

                  //method3(this);

                }

            }.start();

        }

    }

}





/**

    执行method()方法结果:

       begin Thread-0

       begin Thread-2

       begin Thread-1

       end Thread-1

       end Thread-0

       end Thread-2

    说明了:在没有同步限制的条件下,多个线程可以同时进入一个对象的方法中操作。

         这样对共享可变数据是不安全的,即常说的:非线程安全(non thread-safe)。

*/



/**

    执行method1()/method2()/method3()方法结果(可能线程进入的顺序不同):

       begin Thread-0

       end Thread-0

       begin Thread-1

       end Thread-1

       begin Thread-2

       end Thread-2

    说明了:在同步限制的条件下,同时只可有一个线程进入一个对象的方法中操作,其它线程必须等待先它的线程退出后才可进入。

          这样可保证共享可变数据是安全的,即常说的:线程安全(thread-safe )。

*/



Demo2:

package test.thread;



import com.util.LogUtil;



/**

*

* @author ydj

*

*/

public class SynTest2 {



    public static void main(String[] args){

       Callme target=new Callme();

       Caller ob1=new Caller(target,"Hello");

       Caller ob2=new Caller(target,"Synchronized");

       Caller ob3=new Caller(target,"World");

    }

}



class Callme{

   

    /**

     * 有和没有synchronized的时候,结果是不一样的

     */

    synchronized void test(){

       LogUtil.log("测试是否是:一旦一个线程进入一个实例的任何同步方法,别的线程将不能进入该同一实例的其它同步方法,但是该实例的非同步方法仍然能够被调用");

    }

   

    void nonsynCall(String msg){

       LogUtil.log("["+msg);

       LogUtil.log("]");

    }

   

    synchronized void synCall(String msg){

       LogUtil.logPrint("["+msg);

       LogUtil.log("]");

    }

}



class Caller implements Runnable{

    String msg;

    Callme target;

    Thread t;

   

    Caller(Callme target,String msg){

       this.target=target;

       this.msg=msg;

       t=new Thread(this);

       t.start();

    }

   

    public void run() {

       // TODO Auto-generated method stub

       //target.nonsynCall(msg);

       target.synCall(msg);

       target.test();

    }

   

}

五、XXXX:

写程序到现在,还没有自己写过需要多线程并发访问的。看看前公司的底层代码,也没怎么发现到什么多线程的知识。也许,应用层很少用到这些东西。下个阶段准备学习学习JDK的并发包。

分享到:
评论
2 楼 宋建勇 2013-04-28  
缺少代码啊
key.isAcceptable()时怎么处理的啊
1 楼 GSD159 2012-12-26  
代码好像不是很完整啊....

相关推荐

Global site tag (gtag.js) - Google Analytics