July 14, 2012

Asynchronous IO (Part 1)

Java 7 ထွက်ပြီး၍ ယခုဆိုလျှင် Update 5 ပင် ရောက်ရှိလာပြီ ဖြစ်၏။ Update 4 မှစ၍ Mac OS X အား Support လုပ်လာပြီး၊ G1GC ကိုလည်း တရားဝင် Support လုပ်လာပါသည်။ ထို့အပြင် JRockit ၏ ဖန်ရှင် အချို့သည်လည်း HotSpot VM ဆီသို့ ဖြည့်စွက်လာခဲ့ပါသည်။ ထို့အပြင် Bug အတော်များများကိုလည်း Fix လုပ်လာပြီး Performance ပိုင်းဆိုင်ရာ တွင်လည်း ပြုပြင်ပြောင်းလည်းမှု့များကို ပြုလုပ်လာခဲ့ကြပါသည်။

အထူးသဖြင့် Java 7 အား http://java.com မှတဆင့် ဖြန့်ဝေလာခြင်းကိုကြည့်ခြင်း အားဖြင့် Java 7 သည် Business Application များအတွက် အဆင်သင့်ဖြစ်ပြီဖြစ်ကြောင်း သိရှိနိုင်ပါသည်။ Java 6 ၏ EOL ဖြစ်သော ၂၀၁၂ခု ၁၁လပိုင်းသည်လည်း သိပ်ပြီး မဝေးတော့ပြီဖြစ်သောကြောင့် Java 7 အား ပြောင်းရွှေ့အသုံးပြုမည့် ပရိုဂျက်များအား မကြာမကြာ တွေ့မြင်ရမည်ဖြစ်ပါသည်။

ကျွှန်တော်တို့ ဆက်လက်၍ Nio2 ၏ Asynchronous IO နှင့် ပတ်သက်၍ ဆက်လက်လေ့လာသွားပါမည်။


ယခင် အသုံးပြုခဲ့သော IO

ပုံမှန်အားဖြင့် Input Output လုပ်ဆောင်ချက်များအား အသုံးပြုရာတွင်၊ လုပ်ဆောင်ချက်များ ပြီးဆုံးသည် အထိ Control အား အသုံးပြုနိုင်မည် မဟုတ်ပေ။ ဥပမာအားဖြင့် InputStream ဖြင့် stream.read(bytes) ဟု အသုံးပြုပါက အားလုံး ဖတ်ပြီးသည့်တိုင်အောင် read ဆီသို့ ပြန်ရောက်လာခြင်း မရှိပေ (Exception များမှအပ)။ ဤသို့ လုပ်ဆောင်ချက်များအား ရပ်တန့်သွားစေခြင်းအား Block ဖြစ်ခြင်းဟု Java တွင် ခေါ်ဆိုလေ့ရှိ၏။

အကယ်၍ Input Output Speed သည် အလွန်မြန်နေပါက အကြောင်းမဟုတ်။ သို့ရာတွင် CPU ၏ speed နှင့် စာလျှင် နှိုင်းယှဉ်၍မရအောင် နှေးကွေးလွန်းလှပေသည်။ ထို့ကြောင့် အလွန်လေးလံသော File များအား Input Output လုပ်ပါက စက်တစ်ခုလုံး လေးလံသွားစေနိုင်ပါသည်။

အထူးသဖြင့် Web Server များကဲ့သို့ Input Output အတော်များများကို ပြုလုပ်လေ့ရှိ သောကြောင့် Input Output ကြောင့် ဖြစ်ပွားစေသော Block သည် Web Server ၏ Performance အား လွန်စွာ အကျိုးသက်ရောက် စေနိုင်ပါသည်။

ပုံမှန်အားဖြင့် ဤကဲ့သို့သော အခြေအနေမျိုးတွင် Block ဖြစ်နေစဉ် အခြားသော လုပ်ဆောင်ချက်များအား ပြုလုပ်နိုင်ရန် စီမံ၍ အထက်ပါ အခက်အခဲများအား ဖြေရှင်းလေ့ရှိပါသည်။ ဥပမာအားဖြင့် Thread Pool များအား အသုံးပြု၍ Parallel Processing လုပ်ခြင်းအားဖြင့် Block ကြောင့် ဖြစ်တတ်သော စွမ်းရည် ကျဆင်းမှု့ကို ကာကွယ်စေခဲ့ပါသည်။ နမှုနာ ကုဒ်များအား လေ့လာကြည့်ပါမည်။

SimpleEchoServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleEchoServer {
    private static final int PORT = 5000;
    private ExecutorService service;
    
    public SimpleEchoServer() throws IOException {
        // new thread pool
        service = Executors.newCachedThreadPool();

        // server socket
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.bind(new InetSocketAddress(PORT));

        for (;;) {
            // wait
            SocketChannel channel = serverChannel.accept();
            System.out.println("Connect to: " 
                               + channel.socket()
                                   .getInetAddress().getHostName());

            // access from client
            startEcho(channel);
        }
    }

    public void startEcho(final SocketChannel channel) {
        // I/O processing
        Runnable runnable = new Runnable() {
            public void run() {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                    
                try {
                    for (;;) {
                        // Input
                        buffer.clear();
                        if (channel.read(buffer) < 0) {
                            break;
                        }
                        
                        // Output
                        buffer.flip();
                        channel.write(buffer);
                    }
                } catch (IOException ex) {
                    ex.printStackTrace();
                } finally {
                    try {
                        channel.close();
                    } catch(IOException ex) {}
                }
            }
        };

        // execute by thread pool
        service.execute(runnable);
    }

    public static void main(String[] args) {
        try {
            new SimpleEchoServer();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

}
အထက်ပါ ကုဒ်များ၏ စာကြောင်း ၁၅တွင် Concurrency API ၏ Excecutors အား အသုံးပြု၍ Thread Pool အား အသုံးပြုနိုင်သော ExcecutorService ၏ Instance အား ခေါ်ယူပါသည်။ တဖန်စာကြောင်း ၂၁၏ for loop ဖြင့် ဆာဗာအားစောင့်နေစေပြီး client မှ request လာသောအခါ စာကြောင်း၂၉ဖြင့် startEcho လုပ်ဆောင်ချက်ကို ခေါ်ယူလုပ်ဆောင်စေပါသည်။

setEcho လုပ်ဆောင်ချက်အတွင်းတွင် စာကြောင်း ၃၅မှ ၅၉အထိ Multi Thread Processing များတွင် အသုံးပြုနိုင်သော Runable Interface အား ဖြည့်စွက်ရေးသားပြီး runable Instance အား new လုပ်ပါသည်။ အတွင်းပိုင်းမှာမူ စာကြောင်း ၄၃တွင် channel အား အသုံးပြု၍ ဖတ်ယူထားသည်များကို စာကြောင်း ၄၅တွင် ပြန်ရေးနေရုံသာ ဖြစ်ပါသည်။

စာကြောင်း ၆၂တွင် အထက်ပါ runnable Instance အား Thread Pool မှ Thread တစ်ခု ဖြင့် အလုပ်လုပ်စေခြင်း သာဖြစ်၏။ ဤကဲ့သို့ Multi Thread Processing နည်းလမ်းအား အသုံးပြုခြင်း အားဖြင့် Input Output ကြောင့် Block ဖြစ်နေစဉ်ကာလ ဖြင့်လင့်ကစား ဆာဗာသည် အခြားသော လုပ်ငန်း ဆောင်တာများကို ပြုလုပ်နိုင်မည် ဖြစ်ပါသည်။

သို့ရာတွင်အကယ်၍ Request အရေအတွက်သည် အလွန်များပြားလာပါက အသုံးပြုနိုင်သော Thread များ အလွန်နည်းပါး လာပါလိမ့်မည်။ Request အရေအတွက်သည် CPU Core အရေအတွက်ထက် ပိုများလာပါက Thread များအား ပြောင်းလည်းခြင်း လုပ်ငန်း (Context Switch) ကို ဖြစ်ပေါ်စေပါသည်။ Context Switch သည် လွန်စွာ လေးလံသော အလုပ်ဖြစ်ပါသဖြင့် တတ်နိုင်သလောက် ရှောင်ရှား စေလိုပါသည်။ သို့ရာတွင် Request က များလာလေ Context Switch ဖြစ်စေရန် ရာနှုန်းမှာ မြင့်မားလာလေ ဖြစ်ပါလိမ့်မည်။

Server Performance အား မကျဆင်းစေရန် အတွက် တတ်နိုင်သ၍ Thread အရေအတွက်ကို လျှော့ချနိုင်စေရန် Block ဖြစ်သည့် အချိန်ကို လျှော့ချရန် လိုအပ်လာပါသည်။ ဤနေရာတွင် အသုံးပြုသည်မှာ nio ၏ Non Blocking I/O ပင် ဖြစ်သည်။


Non Blocking I/O

Non Blocking I/O သည် I/O လုပ်ငန်းပြီးဆုံး၍ ပြန်လည်အသုံးပြုနိုင်လာနိုင်သော Channel များအား စုစည်း၍ အသုံးပြုနိုင်ပါသည်။ Non Blocking I/O တွင် အဓိက နေရာတွင် အလုပ်လုပ်နေသူမှာ java.nio.channels.Selector ပင် ဖြစ်၏။

Selector အော့ဘဂျက်မှ အသိပေးလာသော Channel သည် အသုံးပြုရန် အသင့်ဖြစ်နေပါသဖြင့် Blocking ဖြစ်နေသော အချိန်ကို အတိုဆုံးဖြစ်အောင် ထိမ်းပေးနိုင်ပါသည်။ တဖန် Non Blocking IO တွင် Channel များအား Selector တစ်ခုဖြင့် ထိမ်းချုပ်နိုင်ပါသဖြင့် အသုံးပြုရန်လိုအပ်သော Thread အရေအတွက်ကို အနည်းဆုံးဖြစ်အောင် စီမံနိုင်ပါသည်။

ပြီးခဲ့သော နမှုနာရှိ ဆာဗာအား Non Blocking IO ကို အသုံးပြု၍ ပြုပြင်ရေးသားကြည့်ပါမည်။

NonBlockingEchoServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;

public class NonBlockingEchoServer {
 private static final int PORT = 5000;

 // Selector
 private Selector selector;

 public NonBlockingEchoServer() throws IOException {

  selector = SelectorProvider.provider().openSelector();
  ServerSocketChannel serverChannel = ServerSocketChannel.open();

  // Setting Non-blocking
  serverChannel.configureBlocking(false);
  serverChannel.bind(new InetSocketAddress(PORT));

  // registration of client request
  serverChannel.register(selector, SelectionKey.OP_ACCEPT);

  while (selector.select() > 0) {

   Iterator<?> keyIterator = selector.selectedKeys().iterator();

   while (keyIterator.hasNext()) {
    // get ready key
    SelectionKey key = (SelectionKey) keyIterator.next();
    keyIterator.remove();

    if (key.isAcceptable()) {
     ServerSocketChannel channel = (ServerSocketChannel) key
       .channel();
     accept(channel);
    } else {
     SocketChannel channel = (SocketChannel) key.channel();

     // ready to read
     if (key.isReadable()) {
      read(key, channel);
     }

     // ready to write
     if (key.isValid() && key.isWritable()) {
      echo(key, channel);
     }
    }
   }
  }
 }

 // connection
 private void accept(ServerSocketChannel serverChannel) throws IOException {
  SocketChannel channel = serverChannel.accept();
  channel.configureBlocking(false);

  ByteBuffer buffer = ByteBuffer.allocate(1024);

  // Registration
  channel.register(selector, SelectionKey.OP_READ, buffer);
  System.out.println("Connect to: "
    + channel.socket().getInetAddress().getHostName());
 }

 // Input
 private void read(SelectionKey key, SocketChannel channel)
   throws IOException {
  ByteBuffer buffer = (ByteBuffer) key.attachment();

  // Input
  buffer.clear();
  if (channel.read(buffer) < 0) {
   try {
    channel.close();
   } catch (IOException ex) {
   }
  } else {
   // Registration
   channel.register(selector, SelectionKey.OP_WRITE, buffer);
  }
 }

 // Output
 private void echo(SelectionKey key, SocketChannel channel)
   throws IOException {
  // get buffer
  ByteBuffer buffer = (ByteBuffer) key.attachment();

  // output
  buffer.flip();
  channel.write(buffer);

  // Registration
  channel.register(selector, SelectionKey.OP_READ, buffer);
 }

 public static void main(String[] args) {
  try {
   new NonBlockingEchoServer();
  } catch (IOException ex) {
   ex.printStackTrace();
  }
 }
}
စာကြောင်း ၂၉ ကိုကြည့်ပါ။ Selector#select ဖြင့် Input Output က အဆင်သင့်ဖြစ်သည်အထိ Block လုပ်ထားပါသည်။ Selector ကလပ်စ်တွင် အသုံးပြုနိုင်သည့် Operation အမျိုးအစားများမှာ OP_ACCEPT, OP_CONNECT, OP_READ နှင့် OP_WRITE ဟူ၍ ၄မျိုးရှိပါသည်။

Selector တွင် အသုံးပြုမည့် Operation များအား ကြိုတင် Registration လုပ်ထားရန် လိုအပ်ပါသည်။ အထက်ပါ Operation အမျိုးအစား လေးမျိုးတွင် ထူးခြားသည်မှာ OP_WRITE ပင် ဖြစ်၏။ အကြောင်းမှာ OP_WRITE သည် Registration လုပ်ပြီးသည်နှင့် အဆင်သင့်ဖြစ်နေပြီ ဖြစ်သောကြောင့် ဖြစ်၏။ ထို့ကြောင့် OP_WRITE အား ကြိုတင်၍ Registration လုပ်ထားပါက select လုပ်ဆောင်ချက်ကို ခေါ်ယူအသုံးပြုနိုင်ပါသဖြင့် အတော်များများ Block မဖြစ်ပဲ ချက်ချင်း အသုံးပြုနိုင်မည် ဖြစ်သည်။

ထို့ကြောင့် Output လုပ်ရန်လိုအပ်သည့်အခါမှသာ OP_WRITE အား Registration လုပ်ရန် လိုအပ်ပါသည်။ စာကြောင်း ၄၅မှ ၅၃အား ကြည့်ပါ။ read လုပ်ပြီး write ကို လုပ်ဆောင်စေပါသည်။ ထို့ကြောင့် read လုပ်ဆောင်ချက်၏ စာကြောင်း ၇၉မှ ၈၇တွင် if(challel.read(buffer) < 0) ဟု ရေးစရာမကျန်တော့ပါက channel အား close လုပ်ပြီး၊ ရေးစရာကျန်မှသာ OP_WRITE အား registration လုပ်ပါသည်။

Selector#select လုပ်ဆောင်ချက် အပြီး စာကြောင်း ၃၁ တွင် java.nio.channels.SelectionKey ၏ အော့ဘဂျက်၏ Set အား ခေါ်ယူပါသည်။ ပြီးပါက SelectionKey အော့ဘဂျက် ၏ Operation အမျိုးအစား အပေါ်မှုတည်၍ လုပ်ဆောင်ချက်များအား ခွဲခြား လုပ်ဆောင်စေပါသည်။

Client ဆီမှ Access လုပ်လာပါက စာကြောင်း ၄၁တွင် accept လုပ်ဆောင်ချက်ကို ခေါ်ယူပြီး၊ အတွင်းပိုင်း စာကြောင်း ၆၇တွင် OP_READ အနေဖြင့် SocketChennel အား Registration လုပ်ပါသည်။ တဖန် OP_READ Channel က အသင့်ဖြစ်ပါက စာကြောင်း ၄၇ဖြင့် read လုပ်ဆောင်ချက်ကို ခေါ်ယူပြီး အတွင်းတွင် စာကြောင်း ၇၉ဖြင့် channel မှ buffer အတွင်းသို့ data များအား read လုပ်ပါမည်။ ပြီးပါက channel အား OP_WRITE အနေဖြင့် Registration လုပ်ပါမည်။

တဖန် Registration လုပ်ထားသော Channel က Write လုပ်ရန် အစဉ်သင့်ဖြစ်သောအခါ စာကြောင်း ၅၂ဖြင့် echo လုပ်ဆောင်ချက်အား ခေါ်ယူပြီး၊ အတွင်းပိုင်းတွင် စာကြောင်း ၉၈ဖြင့် Buffer အတွင်းရှိ အချက်အလက်များအား write လုပ်ပါမည်။ ပြီးပါက စာကြောင်း ၁၀၁ဖြင့် OP_READ အနေဖြင့် Registration လုပ်မည် ဖြစ်သည်။ ဤနည်းအားဖြင့် Access လုပ်လာသော Socket Channel အတွင်းရှိ အချက်အလက်များအား မကုန်မချင်း ဖတ်လိုက် ရေးလိုက် လုပ်နေမည် ဖြစ်သည်။ Socket အတွင်းရှိ အချက်အလက်များ ကုန်ပါက စာကြောင်း ၈၁ဖြင့် channel အား close လုပ်မည် ဖြစ်ပါသည်။

ဤကဲ့သို့ Selector အား အသုံးပြုခြင်းအားဖြင့် Thread တစ်ခုတည်းဖြင့် Channel အများစုအား Block ဖြစ်ခြင်း အချိန်ကို အနည်းဆုံးဖြစ်အောင် ထိမ်းချုပ်ရင်း အလုပ်လုပ်စေနိုင်ပါသည်။

သို့ရာတွင် select, read နှင့် write လုပ်ဆောင်ချက်များအား လုပ်ဆောင်နေစဉ်တွင် Block ဖြစ်နေသည်မှာ ငြင်း၍မရပေ။ ဤကဲ့သို့သော အားနည်းချက်များအား ပြုပြင်ရန်အတွက် NIO2 တွင် Asynchronous IO အား တီထွင် ရေးသားခဲ့ပါသည်။ Asynchronous IO နှင့် ပတ်သက်ပြီး နောက်ဘလောဂ်များဖြင့် ဆက်လက် ဖော်ပြသွားပါဦးမည်။



မှတ်ချက်


Japan Java User Group ၏ အဖွဲ့ဝင် Mr Yuuichi Sakuraba ရေးသားသော New IO ဖြင့် Asynchronous IO အား လေ့လာရင်း ပြန်လည်တင်ပြပါသည်။

http://itpro.nikkeibp.co.jp/article/COLUMN/20110927/369451/?ST=develop&P=1

လေးစားစွာဖြင့်။
မင်းလွင်

No comments:

Post a Comment