July 18, 2012

Asynchronous IO (Part 2)

ကောင်မလေးများ ရေထဲတွင် ညီတူစွာ ရေထဲတွင် ကပြနိုင်သော Synchro ဟုခေါ်သော ရေးကူနည်းကို ကြီးဘူးပါက ချက်ချင်းကို မျက်စေ့ထဲ ပြေးမြင်ပါလိမ့်မည်။ Synchronous ဆိုသည်မှာ အချိန်တူစွာ အလုပ်လုပ်ခြင်းကို ဆိုလိုပါသည်။ Asynchronous ဆိုသည်မှာ Synchronous ၏ ပြောင်းပြန် ဖြစ်ပြီး အလုပ်တစ်ခုနှင့် တစ်ခု စောင့်စရာမလိုပဲ သီးခြားစီ အလုပ်လုပ်နိုင်ခြင်းကို Asynchronous ဟုခေါ်ဆိုပါသည်။

Asynchronous IO သည် အမည်အတိုင်း Input Output လုပ်နေစဉ် ကြံ့ကြာတတ်သော အချိန်အား စောင့်စရာမလိုပဲ အခြားသော လုပ်ဆောင်ချက်များအား ဆက်လုပ်စေနိုင်သော API ဖြစ်ပါသည်။ Asynchronous IO အား java.nio.channels.AsynchronousChannel အင်တာဖေစ် မှတဆင့် အသုံးပြုနိုင်ပါသည်။ လက်ရှိ အချိန်တွင် Socket Connection နှင့် File IO များတွင် အသုံးပြုနိုင်ပါသည်။ ပံ့ပိုးထားသော Class များမှာ java.nio.channels.AsynchronousSocketChannel နှင့် java.nio.channels.AsynchronousFileChannel တို့ ဖြစ်ကြပါသည်။

အဆိုပါ ကလပ်စ်များတွင် read နှင့် write လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေရာတွင် ပြီးဆုံးအောင် စောင့်စရာမလိုပဲ ဆက်လက်၍ အခြားသော လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေနိုင်ပါသည်။ လုပ်ဆောင်ချက်များ ပြီးဆုံးသည်ကို သိရှိနိုင်ရန်မှာ နည်းလမ်းနှစ်ခု ရှိပါသည်။ ပဋ္ဌမနည်းမှာ Concurrency Utilities ၏ java.util.concurrent.Future အား အသုံးပြုသောနည်း ဖြစ်ပြီး၊ နောက်တစ်နည်းမှာ ပြီးဆုံးသော Event အား လက်ခံရယူသောနည်း ဖြစ်ပါသည်။


Future Interface အား အသုံးပြု၍ Asynchronous IO


Future Interface ဖြင့် Asynchronous IO ၏ လုပ်ဆောင်ချက်များ၏ ရလဒ်များအား ရရှိနိုင်ခြင်း၊ လုပ်ဆောင်ချက်များ၏ အနေအထားအား ကြည့်ရှုနိုင်ခြင်း၊ လုပ်ဆောင်ချက်များအား ရပ်တန့်စေနိုင်ခြင်း အစရှိသည်တို့ကို လုပ်ဆောင်စေနိုင်ပါသည်။ ပြီးခဲ့သော ဘလောဂ်ဖြင့် ရေးသားခဲ့သော Echo ပြန်လုပ်ပေးသည့် ဆာဗာ အပလီကေးရှင်းအား Future Interface နှင့် Asynchronous IO အား အသုံးပြု၍ ပြုပြင် ရေးသားကြည့်ပါဦးမည်။

SimpleFutureEchoServer .java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleFutureEchoServer {
    private static final int PORT = 5000;
    
    public SimpleFutureEchoServer() throws IOException {
        AsynchronousServerSocketChannel serverChannel 
            = AsynchronousServerSocketChannel.open();
        serverChannel.bind(new InetSocketAddress(PORT));
        
        for (;;) {
            // open asynchronous socket channel
            Future<AsynchronousSocketChannel> future 
                = serverChannel.accept();

            try {
                // get Asynchronous socket channel
                AsynchronousSocketChannel channel = future.get();
                System.out.println("Connect to: " 
                                   + ((InetSocketAddress)channel
                                        .getRemoteAddress()).getHostName());
                // IO Processing
                startEcho(channel);
            } catch (InterruptedException | ExecutionException ex) {}
        }
    }

    private void startEcho(AsynchronousSocketChannel channel) {
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        try {
            for (;;) {
                // Input
                buffer.clear();
                Future<Integer> future = channel.read(buffer);
            
                // get Input Result
                if (future.get() < 0) {
                    try {
                        channel.close();
                    } catch (IOException ex) {}
                }
                
                buffer.flip();
                // output
                channel.write(buffer).get();
            }
        } catch (InterruptedException
                 |ExecutionException ex) {}
    }

    public static void main(String[] args) {
        try {
            new SimpleFutureEchoServer();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}
စာကြောင်း ၁၃မှ ၁၅ အထိတွင် AsynchronousServerSocketChannel ၏ အော့ဘဂျက်အား open လုပ်ယူပြီး၊ port နံပါတ် 5000 ဖြင့် bind လုပ်ပါသည်။ ပြီးပါက for ဝါကျဖြင့် client ဆီမှ access လုပ်လာမည်ကို စောင့်နေပြီး အလုပ်လုပ်စေပါသည်။

စာကြောင်း ၁၉နှင့် ၂၀အား ကြည့်ပါ။ AsynchronousServerSocketChannel#accept လုပ်ဆောင်ချက်အား အသုံးပြု၍ AsynchronousSocketChannel အား အသုံးပြုနိုင်သော Future အင်တာဖေစ်၏ အော့ဘဂျက်အား ရယူပါသည်။ ဤနေရာတွင် ထူးခြားသည်မှာ ServerSocketChannel#accept ၏ ရလဒ်သည် SocketChennel ဖြစ်သော်လည်း ဤနေရာတွင်မှု Future အင်တာဖေစ် ဖြစ်သည်ဆိုသည့် အချက်ဖြစ်၏။

တဖန် Asynchronous Process ၏ ရလဒ်အားရရှိရန်မှာ Future#get အား အသုံးပြုနိုင်ပြီး၊ get လုပ်ဆောင်ချက်တွင် Timeout ဖြစ်ရန် အချိန်ကို ပါရာမီတာအဖြစ်လည်း သတ်မှတ် အသုံးပြုနိုင်ပါသည်။ ပြီးပါက စာကြောင်း ၂၉တွင် startEcho လုပ်ဆောင်ချက်ဖြင့် အထက်တွင် ရရှိထားသော AsynchronousSocketChannel ၏ အော့ဘဂျက်အား အသုံးပြု၍ Input Output အား လုပ်ဆောင်စေပါသည်။

startEcho လုပ်ဆောင်ချက် အတွင်းတွင် Input Output လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေပါသည်။ စာကြောင်း ၄၁အား ကြည့်ပါ။ AsynchronousSocketChannel#read လုပ်ဆောင်ချက်၏ရလဒ်သည် Future အင်တာဖေစ်ဖြစ်ပြီး၊ Future#get ဖြင့် Asynchronous လုပ်ဆောင်ချက်၏ ရလဒ်အား ရရှိနိုင်ပါသည်။ တဖန် စာကြောင်း ၅၂ဖြင့် Output ကို လုပ်ဆောင်စေပြီး၊ get ဖြင့် ရလဒ်အား စောင့်ယူပါသည်။ ဤကဲ့သို့ channel.write(buffer).get() ဟု တိုက်ရိုက် ရေးသားရခြင်းမှာ၊ Asynchronous ဖြင့် အလုပ်လုပ်နေစဉ် buffer အတွင်းရှိ အချက်အလက်များအား ပြီးအောင် မရေးရသေးခင်၊ for ဝါကျ၏ ထိပ်သို့ ပြန်ရောက်သွားပြီး buffer အား clear မလုပ်မိစေရန် ဖြစ်ပါသည်။

ဤနည်းအားဖြင့် Asynchronous IO တွင် Future အင်တာဖေစ်အား အသုံးပြု၍ Input output များအား အသုံးပြုနိုင်ကြောင်း တွေ့ရှိရမည် ဖြစ်သည်။ သို့ရာတွင် သေသေချာချာကြည့်မည်ဆိုပါက Thread တစ်ခုတည်းကို သာအသုံးပြုနေသည်ကို တွေ့ရပါမည်။ ထို့ကြောင့် Access များလာပါက အလားတူ Performance ကျဆင်းသွားမည်မှာ မလွဲပေ။ ဒါကြောင့်ဆိုပြီး Thread Pool ကို အသုံးပြုပြန်ရင်လည်း Asynchronous IO ကို အသုံးပြုသည်မှာ အဓိပ္ပါယ်ရှိမည် မဟုတ်။

ကျွှန်တော်တို့ ဆက်လက်၍ Work Thread နှင့်Main Thread ကို ခွဲရေးပြီး၊ Thread နှစ်ခုကြားတွင် Queue ဖြင့် အချက်အလက်များအား လက်ကမ်းပေးသည့် ပုံစံကို ဆက်လေ့လာ ကြည့်ပါဦးမည်။


Work Thread အား အသုံးပြုခြင်း


FutureEchoServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureEchoServer {
    private static final int PORT = 5000;
    private LinkedBlockingQueue<AsyncInfo> queue;
    
    static class AsyncInfo {
        private Future<Integer> future;
        private AsynchronousSocketChannel channel;
        private ByteBuffer buffer;
 
        public AsyncInfo(Future<Integer> future,
                         AsynchronousSocketChannel channel,
                         ByteBuffer buffer) {
            this.future = future;
            this.channel = channel;
            this.buffer = buffer;
        }
    }
    
    public FutureEchoServer() throws IOException {
        AsynchronousServerSocketChannel serverChannel 
            = AsynchronousServerSocketChannel.open();
        serverChannel.bind(new InetSocketAddress(PORT));
        
        for (;;) {
            // open asynchronous socket channel
            Future<AsynchronousSocketChannel> future 
                = serverChannel.accept();

            try {
                // get Asynchronous socket channel
                AsynchronousSocketChannel channel = future.get();
                System.out.println("Connect to: " 
                                   + ((InetSocketAddress)channel
                                        .getRemoteAddress()).getHostName());
                // Watching Queue
                startWatchQueue();
                // IO Processing
                startEcho(channel);
            } catch (InterruptedException | ExecutionException ex) {}
        }
    }

    private void startEcho(AsynchronousSocketChannel channel) {
      
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.clear();
        // Asynchronous Input
        Future<Integer> future = channel.read(buffer);
 
        // Queue Processing
        queue.offer(new AsyncInfo(future, channel, buffer));
    }
    
    private void startWatchQueue() {
        // queue for receiving data
        queue = new LinkedBlockingQueue<>();
 
        // work thread
        Runnable runnable = new Runnable() {
            public void run() {
                try {
                    for (;;) {
                        // get asynchronous info from queue
                        AsyncInfo info = (AsyncInfo) queue.take();
                        Future<Integer> future = info.future;
                    
                        // get result
                        // Timeout = 100ms 
                        int n = 0;
                        try {
                            n = future.get(100, TimeUnit.MILLISECONDS);
                        } catch(TimeoutException ex) {}
                    
                        if (future.isDone()) {
                            // The process is done
                            if (n < 0) {
                                try {
                                    info.channel.close();
                                } catch (IOException ex) {}
                                continue;
                            }
 
                            AsynchronousSocketChannel channel 
                                = info.channel;
                            ByteBuffer buffer = info.buffer;
 
                            // Output
                            buffer.flip();
                            channel.write(buffer).get();
                            
                            // Asynchronous Input
                            buffer.clear();
                            Future<Integer> nextFuture 
                                = channel.read(buffer);
                            
                            // Next input to queue
                            queue.offer(new AsyncInfo(nextFuture,
                                                      channel,
                                                      buffer));
                        } else {
                            // If process is still working
                            queue.offer(info);
                        }
                    }
                } catch (InterruptedException
                         | ExecutionException ex) {
                }
            }
        };
 
        // Work Thread Start
        ExecutorService service = Executors.newSingleThreadExecutor();
        service.execute(runnable);
    }
    
    public static void main(String[] args) {
        try {
            new FutureEchoServer();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

ဦးစွာ စာကြောင်း ၁၈မှ ၃၀ အထိ Queue ဖြင့် အချက်အလက်များအား လက်ဆင့်ကမ်းရန် အတွက် ကလပ်စ် တစ်ခုကို ရေးသားပါသည်။ Work Thread နှင့် Main Thread အကြားတွင် အချက်အလက်များအား လက်ဆင့်ပေးရန် အတွက်ဖြစ်ပါသည်။ Main Thread မှ အချက်အလက်များအား Queue တွင် ထည့်ထားပါက၊ အခြားသော Work Thread မှ တဆင့် ထို Queue ထဲမှ အချက်အလက်များအား ထုတ်ယူ အသုံးပြုနိုင်မည် ဖြစ်ပါသည်။ Future Object, AsynchronousSocketChannel နှင့် ByteBuffer Object တို့ကို ပိုင်ဆိုင်ပါသည်။

Asynchronous Socket Channel ကို ခေါ်ယူသည့် နေရာအထိကတော့ အထက်ပါ နမှုနာနဲ့ အတူတူပဲ ဖြစ်ပါတယ်။ Socket Channel ကို ခေါ်ယူပြီးသည့်အခါတွင် Work Thread ကို အလုပ်လုပ်စေရန် အတွက် စာကြောင်း ၄၉တွင် startWatchQueue လုပ်ဆောင်ချက်ကို ခေါ်ယူပါသည်။ ပြီးပါက Input လုပ်ဆောင်ချက်ကို လုပ်ဆောင်စေမည့် startEcho လုပ်ဆောင်ချက်ကို ခေါ်ယူပါသည်။

startEcho လုပ်ဆောင်ချက်အတွင်းတွင် Input လုပ်ငန်းများကို လုပ်ဆောင်မည့် future Object အား ရယူကာ Queue တွင် Registration ပြုလုပ်ပါသည်။ တကယ်တန်း Input Output လုပ်ငန်းများအား လုပ်ဆောင်မည်မှာ အထက်ပါ startWatchQueue တွင် စောင့်ကြည့်နေစေသော Work Thread အတွင်းတွင် ဖြစ်ပါသည်။

startWatchQueue တွင် စာကြောင်း ၇၇ဖြင့် အခြားသော Thread တစ်ခု အနေဖြင့် အလုပ်လုပ်စေနိုင်သော Runnable အင်တာဖေစ်၏ အော့ဘဂျက်အား သတ်မှတ်ရေးသားကာ၊ စာကြောင်း ၁၂၅တွင် Single Thread အနေဖြင့် အလုပ်လုပ်စေပါသည်။ ဤနည်းအားဖြင့် Runnable အော့ဘဂျက်သည် Work Thread အဖြစ်အလုပ်လုပ်ကာ Queue အား စောင့်ကြည့်ပါသည်။

Runnable Object အတွင်းမှာမူ စာကြောင်း ၇၇ ဖြင့် Queue အတွင်းမှ အသုံးပြုလိုသည့် info အား ထုတ်ယူကာ၊ စာကြောင်း ၇၈ဖြင့် info အတွင်းမှ input ကို လုပ်ဆောင်နိုင်သော future အား ရယူပါသည်။ စာကြောင်း ၈၇ ဖြင့် future သည် အလုပ်ပြီးပြီလားဆိုသည်ကို စစ်ဆေးပြီး၊ အလုပ်ပြီးပြီဆိုပါက စာကြောင်း ၈၉ဖြင့် ဖတ်စရာကျန်မကျန် စစ်ဆေးပြီး၊ မကျန်တော့ပါက Asynchronous Socket Channel အား close လုပ်မည် ဖြစ်ပါသည်။ ကျန်သေးပါက Buffer အတွင်းရှိ အချက်အလက်များအား Output လုပ်ပြီး၊ future Object အား အသစ်ခေါ်ယူကာ စာကြောင်း ၁၁၀ ဖြင့် Queue အတွင်းသို့ ပြန်ထည့်ပေးလိုက်ပါသည်။ ဤနည်းအားဖြင့် Input အချက်အလက်များ မကုန်မချင်း အကြိမ်ကြိမ် အလုပ်လုပ်စေပါသည်။


နောက်ဘလောဂ်ဖြင့် CompletionHandler ကို အသုံးပြုကာ AsynchronousSocketChannel အား အသုံးပြုရေးသားပုံကို ဆက်လက်ဖော်ပြပါဦးမည်။


ကိုးကား

http://itpro.nikkeibp.co.jp/article/COLUMN/20110927/369451/?ST=develop&P=4


လေးစားစွာဖြင့်
မင်းလွင်

No comments:

Post a Comment