Asynchronous IO သည် အမည်အတိုင်း Input Output လုပ်နေစဉ် ကြံ့ကြာတတ်သော အချိန်အား စောင့်စရာမလိုပဲ အခြားသော လုပ်ဆောင်ချက်များအား ဆက်လုပ်စေနိုင်သော API ဖြစ်ပါသည်။ Asynchronous IO အား java.nio.channels.AsynchronousChannel အင်တာဖေစ် မှတဆင့် အသုံးပြုနိုင်ပါသည်။ လက်ရှိ အချိန်တွင် Socket Connection နှင့် File IO များတွင် အသုံးပြုနိုင်ပါသည်။ ပံ့ပိုးထားသော Class များမှာ java.nio.channels.AsynchronousSocketChannel နှင့် java.nio.channels.AsynchronousFileChannel တို့ ဖြစ်ကြပါသည်။
အဆိုပါ ကလပ်စ်များတွင် read နှင့် write လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေရာတွင် ပြီးဆုံးအောင် စောင့်စရာမလိုပဲ ဆက်လက်၍ အခြားသော လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေနိုင်ပါသည်။ လုပ်ဆောင်ချက်များ ပြီးဆုံးသည်ကို သိရှိနိုင်ရန်မှာ နည်းလမ်းနှစ်ခု ရှိပါသည်။ ပဋ္ဌမနည်းမှာ Concurrency Utilities ၏ java.util.concurrent.Future အား အသုံးပြုသောနည်း ဖြစ်ပြီး၊ နောက်တစ်နည်းမှာ ပြီးဆုံးသော Event အား လက်ခံရယူသောနည်း ဖြစ်ပါသည်။
Future Interface အား အသုံးပြု၍ Asynchronous IO
Future Interface ဖြင့် Asynchronous IO ၏ လုပ်ဆောင်ချက်များ၏ ရလဒ်များအား ရရှိနိုင်ခြင်း၊ လုပ်ဆောင်ချက်များ၏ အနေအထားအား ကြည့်ရှုနိုင်ခြင်း၊ လုပ်ဆောင်ချက်များအား ရပ်တန့်စေနိုင်ခြင်း အစရှိသည်တို့ကို လုပ်ဆောင်စေနိုင်ပါသည်။ ပြီးခဲ့သော ဘလောဂ်ဖြင့် ရေးသားခဲ့သော Echo ပြန်လုပ်ပေးသည့် ဆာဗာ အပလီကေးရှင်းအား Future Interface နှင့် Asynchronous IO အား အသုံးပြု၍ ပြုပြင် ရေးသားကြည့်ပါဦးမည်။
SimpleFutureEchoServer .java
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleFutureEchoServer { private static final int PORT = 5000; public SimpleFutureEchoServer() throws IOException { AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(PORT)); for (;;) { // open asynchronous socket channel Future<AsynchronousSocketChannel> future = serverChannel.accept(); try { // get Asynchronous socket channel AsynchronousSocketChannel channel = future.get(); System.out.println("Connect to: " + ((InetSocketAddress)channel .getRemoteAddress()).getHostName()); // IO Processing startEcho(channel); } catch (InterruptedException | ExecutionException ex) {} } } private void startEcho(AsynchronousSocketChannel channel) { ByteBuffer buffer = ByteBuffer.allocate(1024); try { for (;;) { // Input buffer.clear(); Future<Integer> future = channel.read(buffer); // get Input Result if (future.get() < 0) { try { channel.close(); } catch (IOException ex) {} } buffer.flip(); // output channel.write(buffer).get(); } } catch (InterruptedException |ExecutionException ex) {} } public static void main(String[] args) { try { new SimpleFutureEchoServer(); } catch (IOException ex) { ex.printStackTrace(); } } }
စာကြောင်း ၁၉နှင့် ၂၀အား ကြည့်ပါ။ AsynchronousServerSocketChannel#accept လုပ်ဆောင်ချက်အား အသုံးပြု၍ AsynchronousSocketChannel အား အသုံးပြုနိုင်သော Future အင်တာဖေစ်၏ အော့ဘဂျက်အား ရယူပါသည်။ ဤနေရာတွင် ထူးခြားသည်မှာ ServerSocketChannel#accept ၏ ရလဒ်သည် SocketChennel ဖြစ်သော်လည်း ဤနေရာတွင်မှု Future အင်တာဖေစ် ဖြစ်သည်ဆိုသည့် အချက်ဖြစ်၏။
တဖန် Asynchronous Process ၏ ရလဒ်အားရရှိရန်မှာ Future#get အား အသုံးပြုနိုင်ပြီး၊ get လုပ်ဆောင်ချက်တွင် Timeout ဖြစ်ရန် အချိန်ကို ပါရာမီတာအဖြစ်လည်း သတ်မှတ် အသုံးပြုနိုင်ပါသည်။ ပြီးပါက စာကြောင်း ၂၉တွင် startEcho လုပ်ဆောင်ချက်ဖြင့် အထက်တွင် ရရှိထားသော AsynchronousSocketChannel ၏ အော့ဘဂျက်အား အသုံးပြု၍ Input Output အား လုပ်ဆောင်စေပါသည်။
startEcho လုပ်ဆောင်ချက် အတွင်းတွင် Input Output လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေပါသည်။ စာကြောင်း ၄၁အား ကြည့်ပါ။ AsynchronousSocketChannel#read လုပ်ဆောင်ချက်၏ရလဒ်သည် Future အင်တာဖေစ်ဖြစ်ပြီး၊ Future#get ဖြင့် Asynchronous လုပ်ဆောင်ချက်၏ ရလဒ်အား ရရှိနိုင်ပါသည်။ တဖန် စာကြောင်း ၅၂ဖြင့် Output ကို လုပ်ဆောင်စေပြီး၊ get ဖြင့် ရလဒ်အား စောင့်ယူပါသည်။ ဤကဲ့သို့ channel.write(buffer).get() ဟု တိုက်ရိုက် ရေးသားရခြင်းမှာ၊ Asynchronous ဖြင့် အလုပ်လုပ်နေစဉ် buffer အတွင်းရှိ အချက်အလက်များအား ပြီးအောင် မရေးရသေးခင်၊ for ဝါကျ၏ ထိပ်သို့ ပြန်ရောက်သွားပြီး buffer အား clear မလုပ်မိစေရန် ဖြစ်ပါသည်။
ဤနည်းအားဖြင့် Asynchronous IO တွင် Future အင်တာဖေစ်အား အသုံးပြု၍ Input output များအား အသုံးပြုနိုင်ကြောင်း တွေ့ရှိရမည် ဖြစ်သည်။ သို့ရာတွင် သေသေချာချာကြည့်မည်ဆိုပါက Thread တစ်ခုတည်းကို သာအသုံးပြုနေသည်ကို တွေ့ရပါမည်။ ထို့ကြောင့် Access များလာပါက အလားတူ Performance ကျဆင်းသွားမည်မှာ မလွဲပေ။ ဒါကြောင့်ဆိုပြီး Thread Pool ကို အသုံးပြုပြန်ရင်လည်း Asynchronous IO ကို အသုံးပြုသည်မှာ အဓိပ္ပါယ်ရှိမည် မဟုတ်။
ကျွှန်တော်တို့ ဆက်လက်၍ Work Thread နှင့်Main Thread ကို ခွဲရေးပြီး၊ Thread နှစ်ခုကြားတွင် Queue ဖြင့် အချက်အလက်များအား လက်ကမ်းပေးသည့် ပုံစံကို ဆက်လေ့လာ ကြည့်ပါဦးမည်။
Work Thread အား အသုံးပြုခြင်း
FutureEchoServer.java
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureEchoServer { private static final int PORT = 5000; private LinkedBlockingQueue<AsyncInfo> queue; static class AsyncInfo { private Future<Integer> future; private AsynchronousSocketChannel channel; private ByteBuffer buffer; public AsyncInfo(Future<Integer> future, AsynchronousSocketChannel channel, ByteBuffer buffer) { this.future = future; this.channel = channel; this.buffer = buffer; } } public FutureEchoServer() throws IOException { AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(PORT)); for (;;) { // open asynchronous socket channel Future<AsynchronousSocketChannel> future = serverChannel.accept(); try { // get Asynchronous socket channel AsynchronousSocketChannel channel = future.get(); System.out.println("Connect to: " + ((InetSocketAddress)channel .getRemoteAddress()).getHostName()); // Watching Queue startWatchQueue(); // IO Processing startEcho(channel); } catch (InterruptedException | ExecutionException ex) {} } } private void startEcho(AsynchronousSocketChannel channel) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.clear(); // Asynchronous Input Future<Integer> future = channel.read(buffer); // Queue Processing queue.offer(new AsyncInfo(future, channel, buffer)); } private void startWatchQueue() { // queue for receiving data queue = new LinkedBlockingQueue<>(); // work thread Runnable runnable = new Runnable() { public void run() { try { for (;;) { // get asynchronous info from queue AsyncInfo info = (AsyncInfo) queue.take(); Future<Integer> future = info.future; // get result // Timeout = 100ms int n = 0; try { n = future.get(100, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) {} if (future.isDone()) { // The process is done if (n < 0) { try { info.channel.close(); } catch (IOException ex) {} continue; } AsynchronousSocketChannel channel = info.channel; ByteBuffer buffer = info.buffer; // Output buffer.flip(); channel.write(buffer).get(); // Asynchronous Input buffer.clear(); Future<Integer> nextFuture = channel.read(buffer); // Next input to queue queue.offer(new AsyncInfo(nextFuture, channel, buffer)); } else { // If process is still working queue.offer(info); } } } catch (InterruptedException | ExecutionException ex) { } } }; // Work Thread Start ExecutorService service = Executors.newSingleThreadExecutor(); service.execute(runnable); } public static void main(String[] args) { try { new FutureEchoServer(); } catch (IOException ex) { ex.printStackTrace(); } } }
ဦးစွာ စာကြောင်း ၁၈မှ ၃၀ အထိ Queue ဖြင့် အချက်အလက်များအား လက်ဆင့်ကမ်းရန် အတွက် ကလပ်စ် တစ်ခုကို ရေးသားပါသည်။ Work Thread နှင့် Main Thread အကြားတွင် အချက်အလက်များအား လက်ဆင့်ပေးရန် အတွက်ဖြစ်ပါသည်။ Main Thread မှ အချက်အလက်များအား Queue တွင် ထည့်ထားပါက၊ အခြားသော Work Thread မှ တဆင့် ထို Queue ထဲမှ အချက်အလက်များအား ထုတ်ယူ အသုံးပြုနိုင်မည် ဖြစ်ပါသည်။ Future Object, AsynchronousSocketChannel နှင့် ByteBuffer Object တို့ကို ပိုင်ဆိုင်ပါသည်။
Asynchronous Socket Channel ကို ခေါ်ယူသည့် နေရာအထိကတော့ အထက်ပါ နမှုနာနဲ့ အတူတူပဲ ဖြစ်ပါတယ်။ Socket Channel ကို ခေါ်ယူပြီးသည့်အခါတွင် Work Thread ကို အလုပ်လုပ်စေရန် အတွက် စာကြောင်း ၄၉တွင် startWatchQueue လုပ်ဆောင်ချက်ကို ခေါ်ယူပါသည်။ ပြီးပါက Input လုပ်ဆောင်ချက်ကို လုပ်ဆောင်စေမည့် startEcho လုပ်ဆောင်ချက်ကို ခေါ်ယူပါသည်။
startEcho လုပ်ဆောင်ချက်အတွင်းတွင် Input လုပ်ငန်းများကို လုပ်ဆောင်မည့် future Object အား ရယူကာ Queue တွင် Registration ပြုလုပ်ပါသည်။ တကယ်တန်း Input Output လုပ်ငန်းများအား လုပ်ဆောင်မည်မှာ အထက်ပါ startWatchQueue တွင် စောင့်ကြည့်နေစေသော Work Thread အတွင်းတွင် ဖြစ်ပါသည်။
startWatchQueue တွင် စာကြောင်း ၇၇ဖြင့် အခြားသော Thread တစ်ခု အနေဖြင့် အလုပ်လုပ်စေနိုင်သော Runnable အင်တာဖေစ်၏ အော့ဘဂျက်အား သတ်မှတ်ရေးသားကာ၊ စာကြောင်း ၁၂၅တွင် Single Thread အနေဖြင့် အလုပ်လုပ်စေပါသည်။ ဤနည်းအားဖြင့် Runnable အော့ဘဂျက်သည် Work Thread အဖြစ်အလုပ်လုပ်ကာ Queue အား စောင့်ကြည့်ပါသည်။
Runnable Object အတွင်းမှာမူ စာကြောင်း ၇၇ ဖြင့် Queue အတွင်းမှ အသုံးပြုလိုသည့် info အား ထုတ်ယူကာ၊ စာကြောင်း ၇၈ဖြင့် info အတွင်းမှ input ကို လုပ်ဆောင်နိုင်သော future အား ရယူပါသည်။ စာကြောင်း ၈၇ ဖြင့် future သည် အလုပ်ပြီးပြီလားဆိုသည်ကို စစ်ဆေးပြီး၊ အလုပ်ပြီးပြီဆိုပါက စာကြောင်း ၈၉ဖြင့် ဖတ်စရာကျန်မကျန် စစ်ဆေးပြီး၊ မကျန်တော့ပါက Asynchronous Socket Channel အား close လုပ်မည် ဖြစ်ပါသည်။ ကျန်သေးပါက Buffer အတွင်းရှိ အချက်အလက်များအား Output လုပ်ပြီး၊ future Object အား အသစ်ခေါ်ယူကာ စာကြောင်း ၁၁၀ ဖြင့် Queue အတွင်းသို့ ပြန်ထည့်ပေးလိုက်ပါသည်။ ဤနည်းအားဖြင့် Input အချက်အလက်များ မကုန်မချင်း အကြိမ်ကြိမ် အလုပ်လုပ်စေပါသည်။
နောက်ဘလောဂ်ဖြင့် CompletionHandler ကို အသုံးပြုကာ AsynchronousSocketChannel အား အသုံးပြုရေးသားပုံကို ဆက်လက်ဖော်ပြပါဦးမည်။
ကိုးကား
http://itpro.nikkeibp.co.jp/article/COLUMN/20110927/369451/?ST=develop&P=4
လေးစားစွာဖြင့်
မင်းလွင်
No comments:
Post a Comment