跳過導覽

5.x 用戶指南

您知道此頁面是根據 Github Wiki 頁面 自動產生的嗎?您可以在 這裡 自行改善此文件!

第三方翻譯

前言

問題

現今我們使用一般用途的應用程式或函式庫來進行彼此之間的溝通。例如,我們經常使用 HTTP 客戶端函式庫來從網路伺服器中擷取資訊,並透過網路服務來呼叫遠端程式呼叫。

但是,一般用途的通訊協定或其實現方式有時無法擴充到很好。就好像我們不會使用一般用途的 HTTP 伺服器來交換大型檔案、電子郵件訊息,以及近乎即時的訊息(例如財務資訊和多人遊戲資料)。所需要的是一種高度最佳化的通訊協定實現方式,這類實現方式專門用於特定用途。例如,您可能想實作一個針對 AJAX 聊天應用程式、媒體串流或大型檔案傳輸而最佳化的 HTTP 伺服器。您甚至可能想設計和實作一種全新的通訊協定,讓其精確符合您的需要。

另一個無法避免的情況是,當您必須處理一個舊系統遺留專有協定以確保相容性時。這種情況中重要的是我們能夠多快實作該協定,同時又不犧牲所產生應用程式的穩定性與效能。

解決方案

Netty 這個專案旨在提供非同步事件導向網路應用程式架構與工具,以快速開發可維護的高效能·高擴充性協定伺服器與客戶端。

換言之,Netty 是 NIO 客戶端伺服器架構,可快速輕鬆開發網路應用程式,例如協定伺服器與客戶端。它大幅簡化與簡化網路程式設計,例如 TCP 與 UDP Socket 伺服器開發。

「快速與輕鬆」並非意味著產出的應用程式會因為可維護性或效能問題而受苦。Netty 在設計上經過仔細斟酌,具有實作許多協定的經驗,例如 FTP、SMTP、HTTP,以及各種位元組與文字為基礎的過往協定。結果是,Netty 成功找到一種方法,在不妥協的情況下達成開發容易性、效能、穩定性與彈性。

某些使用者可能已經找到其他自稱具有相同優點的網路應用程式架構,而您可能會想問 Netty 跟它們有什麼不同。答案在於建構的哲學。Netty 在設計上從第一天起就能讓您在 API 與實作方面有最舒服的體驗。它不是有形的事物,但您將會體會到在閱讀本指南與試用 Netty 時,這個哲學會讓您的生活更輕鬆。

入門

本章將使用簡潔範例繞行 Netty 的核心結構,讓您能夠快速入門。在本章結束時,您將能夠在 Netty 之上撰寫一個客戶端與一個伺服器。

如果您偏好由上而下的學習方法,那麼您可能會想從第 2 章「架構概觀」開始,然後再回到這裡。

入門前

執行本章介紹範例的最低需求只有兩個:Netty 最新版本,以及 JDK 1.6 以上版本。Netty 最新版本可以在 專案下載頁面取得。如欲下載正確版本的 JDK,請參閱您偏好的 JDK 廠商網站。

在您閱讀的過程中,您可能會對本章節中介紹的類別有更多疑問。如果您想更深入了解,請參考 API 參考。為方便起見,本文中的所有類別名稱均已連結至線上 API 參考。另外,請隨時聯繫 Netty 專案社群,讓我們知道是否有任何不正確的資訊、文法和拼寫錯誤,以及您是否有任何改善文件的好點子。

編寫 DISCARD 伺服器

世界上最簡單的通訊協定不是「Hello, World!」而是DISCARD。這是一種會捨棄所有接收到的資料而且不回應的協定。

若要實作 DISCARD 協定,您唯一需要做的就是忽略所有接收到的資料。讓我們直接從處理器實作開始,它會處理 Netty 所產生的 I/O 事件。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandler會延伸ChannelHandlerAdapter,而這是ChannelHandler的實作。ChannelHandler提供了各種您可以覆寫的事件處理手法。就目前來說,只需延伸ChannelHandlerAdapter,而不必自己實作處理器介面。
  2. 我們在此覆寫channelRead()事件處理手法。這個手法會在從用戶端收到新資料時,用接收到的訊息呼叫。在此範例中,接收到的訊息類型是ByteBuf
  3. 若要實作DISCARD協定,處理器必須忽略接收到的訊息。ByteBuf是一個參照計數物件,必須透過release()手法明確地釋出。請記住,釋出傳遞至處理器的任何參照計數物件,是處理器的責任。通常,channelRead()處理手法會像以下範例一樣實作
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. 當 Netty 因為 I/O 錯誤而引發例外情況,或是處理器實作在處理事件時引發例外情況時,exceptionCaught()事件處理手法會使用Throwable呼叫。在多數情況下,應該在此記錄捕獲的例外並關閉其相關頻道,雖然此方法的實作會依據您處理例外情況的方式而不同。例如,您可能想在關閉連線前,傳送含有錯誤碼的回應訊息。

到目前為止都很好。我們已經實作了DISCARD伺服器的前半部分。現在剩下的工作是撰寫main()手法,它會使用DiscardServerHandler啟動伺服器。

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是一個多執行緒事件迴圈,用於處理 I/O 操作。Netty 提供各種 EventLoopGroup 實作,以支援不同類型的傳輸。我們在此範例中實作伺服端應用程式,因此會使用兩個 NioEventLoopGroup。第一個(通常稱為「boss」)接受傳入連線。第二個(通常稱為「worker」)在 boss 接受連線並將已接受的連線註冊到 worker 後,就會處理已接受連線的流量。使用的執行緒數目以及如何將它們對應到已建立的 Channel,取決於 EventLoopGroup 實作,甚至可能可以透過建構函式進行設定。
  2. ServerBootstrap 是一個用於設定伺服端的協助類別。您可以直接使用 Channel 設定伺服端。不過,請注意這是一個繁瑣的流程,而且在大部分情況下您都不需要這麼做。
  3. 在此,我們指定使用 NioServerSocketChannel 類別,此類別用於建立新的 Channel,以接受傳入的連線。
  4. 在此指定的處理程式將始終由新接受的 Channel 進行評估。ChannelInitializer 是一個特殊處理程式,其目的在於協助使用者設定新的 Channel。您很可能會想透過新增一些處理程式(例如 DiscardServerHandler)到 ChannelPipeline,來實作您的網路應用程式。隨著應用程式變得複雜,您很有可能會將更多處理程式新增到管線中,並最終將這個匿名類別萃取到頂層類別中。
  5. 您也可以設定特定於 Channel 實作的參數。我們正在撰寫一個 TCP/IP 伺服端,因此我們可以設定 Socket 選項,例如 tcpNoDelaykeepAlive。請參閱 ChannelOption 的 API 文件和特定的 ChannelConfig 實作,以瞭解所支援的 ChannelOption 大綱。
  6. 您注意到 option()childOption() 了嗎? option() 是用於接受即將進入連線的 NioServerSocketChannelchildOption() 則用於父代 ServerChannel 所接受的 Channel,在這個案例中是 NioServerSocketChannel
  7. 我們現在準備好了。剩下的就是綁定到連接埠並啟動伺服器。在此,我們綁定到機器中所有網路界面卡 (NIC) 的 8080 連接埠。您現在可以呼叫 bind() 方法,呼叫次數不限 (使用不同的綁定位址)。

恭喜!您剛剛在 Netty 上面完成了您的第一個伺服器。

檢視收到的資料

現在我們已經撰寫好第一個伺服器,我們需要測試看看它是否真的有效運作。最簡單的測試方式是使用 telnet 指令。例如,您可以在命令列輸入 telnet localhost 8080,然後輸入一些文字。

不過,我們能說伺服器運作良好嗎?由於這是一個捨棄伺服器,我們無法真正確定。您不會收到任何回應。為了證明它真的運作良好,我們來修改伺服器以列印所收到的內容。

我們已經知道 channelRead() 方法會在收到資料時呼叫。我們在 DiscardServerHandlerchannelRead() 方法中加入一些程式碼

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 這個非有效率的迴圈實際上可以簡化為:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 或者,您可以在這裡執行 in.release()

如果您再次執行 telnet 指令,您將會看到伺服器列印已收到的內容。

捨棄伺服器的完整原始碼位於發行版的 io.netty.example.discard 套件中。

撰寫迴音伺服器

到目前為止,我們都在消費資料,而沒有任何回應。不過,伺服器的通常作法是回應請求。讓我們學習透過實作 ECHO 協定(任何收到的資料都會送回)來撰寫一個回應訊息給客戶端。

與我們在前一節實作的捨棄伺服器唯一的不同,在於它將收到的資料送回去,而不是將收到的資料列印到主控台。因此,再次修改 channelRead() 方法就已經足夠了

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. 一個 ChannelHandlerContext 物件提供多種操作,能讓您觸發各種 I/O 事件和操作。在此,我們呼叫 write(Object) 來逐字逐句地寫入接收到的訊息。請注意,與在 DISCARD 範例中不同,我們沒有釋放接收到的訊息。這是因為當將訊息寫入線路時,Netty 會自動釋放訊息。
  2. ctx.write(Object) 並不會將訊息寫入線路。訊息會先內部暫存,然後再由 ctx.flush() 將訊息傳送到線路上。或者,您可以呼叫 ctx.writeAndFlush(msg) 以簡化操作。

如果您再次執行 telnet 指令,您會看到伺服器傳回您傳送的所有內容。

此回聲伺服器的完整原始程式碼位於發行版的 io.netty.example.echo 套件中。

撰寫時間伺服器

本節將實作 TIME 協定。它與先前的範例不同,在於它會傳送一個訊息,其中包含一個 32 位元整數,且不會接收任何要求,並在傳送訊息後關閉連線。在本範例中,您將了解如何建構和傳送訊息,以及如何在完成後關閉連線。

由於我們將忽略所有接收到的資料,但在建立連線後立即傳送訊息,因此我們這次無法使用 channelRead() 方法。相反地,我們應該覆寫 channelActive() 方法。以下是實作方式

package io.netty.example.time;

public class TimeServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 如前所述,將在建立連線並準備產生流量時呼叫 channelActive() 方法。我們在這個方法中,寫入一個 32 位元整數,代表目前的時刻。

  2. 若要傳送一則新訊息,我們需要分配一個包含訊息的新緩衝區。我們將寫入一個 32 位元整數,因此我們需要一個 ByteBuf,其容量至少為 4 個位元組。透過 ChannelHandlerContext.alloc() 取得目前的 ByteBufAllocator,並分配一個新緩衝區。

  3. 和往常一樣,我們寫入建構的訊息。

    但是,翻轉在哪裡?我們不是習慣在 NIO 中傳送訊息前呼叫 java.nio.ByteBuffer.flip() 嗎?ByteBuf 沒有這種方法,因為它有兩個指標;一個用於讀取操作,另一個用於寫入操作。將內容寫入 ByteBuf 時寫入指標會增加,而讀取指標不會改變。讀取指標和寫入指標分別代表訊息的起始和結束位置。

    相反地,NIO buffer 沒有提供乾淨的方式來找出訊息內容的起點和終點,在不呼叫 flip 方法的情況下。當你忘記 flip buffer 時,你會遇到麻煩,因為沒有或不正確的資料會被傳送出去。這樣的錯誤不會發生在 Netty,因為我們有不同的指標對應到不同的操作類型。你會發現這讓你的生活簡單多很多,當你習慣它之後—沒有翻轉的生活!

    另一個可能會注意到的點是 ChannelHandlerContext.write()(和 writeAndFlush())方法會回傳一個 ChannelFutureChannelFuture 表示一個尚未發生的 I/O 操作。這表示任何被要求的操作都尚未執行,因為在 Netty 中,所有操作都是非同步的。例如,下列程式碼可能會在訊息傳送之前就關閉連線

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    因此,你需要在 ChannelFuture 完成後,呼叫 close() 方法。這個 ChannelFuture 是由 write() 方法回傳,並且會在寫入操作完成時通知它的監聽器。請注意,close() 也可能不會立即關閉連線,而且它會回傳一個 ChannelFuture

  4. 那麼,如何在寫入要求完成時收到通知呢?這就像將一個 ChannelFutureListener 加入到回傳的 ChannelFuture 一樣簡單。在這裡,我們建立一個新的匿名 ChannelFutureListener,當操作完成時關閉 Channel

    或者,你可以使用預先定義的監聽器來簡化程式碼

    f.addListener(ChannelFutureListener.CLOSE);

要測試我們的時間伺服器是否如同預期運作,你可以使用 UNIX rdate 命令

$ rdate -o <port> -p <host>

其中 <port> 是你指定在 main() 方法中的埠號,而 <host> 通常是 localhost

撰寫一個時間用戶端

DISCARDECHO 伺服器不同,我們需要一個 TIME 協定的用戶端,因為人類無法將 32 位元的二進制資料轉譯成日曆上的日期。在本節中,我們討論如何確定伺服器正確運作,以及如何用 Netty 撰寫一個用戶端。

在 Netty 中,伺服器和用戶端最大的且唯一的差異,就是使用了不同的 BootstrapChannel 實作。請看以下的程式碼

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrap 類似於 ServerBootstrap,不同之處在於它適用於非伺服器頻道,例如用戶端或非連線頻道。
  2. 如果您只指定一個 EventLoopGroup,它將同時作為 boss 群組和工作群組使用。不過,boss 工作者不用於用戶端。
  3. 會使用 NioSocketChannel (而非 NioServerSocketChannel) 來建立用戶端 Channel
  4. 請注意,與使用 ServerBootstrap 時不同,我們在此不使用 childOption(),原因是用戶端 SocketChannel 沒有父項。
  5. 我們應該呼叫 connect() 方法而非 bind() 方法。

如您所見,這與伺服器端程式碼沒有太大的不同。那麼 ChannelHandler 實作呢?它應該從伺服器收到 32 位元整數,並將其轉譯成人類可讀的格式,然後印出轉譯後的時間,並關閉連線。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 在 TCP/IP 中,Netty 會將從對等方傳送的資料讀取到 [ByteBuf] 中。

這看起來非常簡單,而且與伺服器端範例沒有什麼不同。但是,這個處理常式有時會拒絕工作,並擲出 IndexOutOfBoundsException。我們將在下一個章節討論為什麼會發生這種情況。

處理基於串流的傳輸

Socket 緩衝器的一個小缺點

在基於串流的傳輸(例如 TCP/IP)中,接收到的資料會儲存在 socket 接收緩衝器中。不幸的是,基於串流的傳輸緩衝器不是封包佇列,而是位元組佇列。這表示,即使您以兩個獨立封包傳送兩則訊息,作業系統也不會將它們視為兩則訊息,而只會將它們視為一堆位元組。因此,無法保證您讀取的內容與遠端對等方所寫入的內容完全相同。例如,假設作業系統的 TCP/IP 堆疊已收到三個封包。

Three packets received as they were sent

由於基於串流的通訊協定的這項一般屬性,因此應用程式中以以下片段化的形式讀取它們的機率很高。

Three packets split and merged into four buffers

因此,接收方(無論是伺服器端或用戶端)都應該將接收到的資料還原成一個或多個有意義的畫面,以便應用程式邏輯輕鬆了解。以上述範例而言,接收到的資料應編成類似以下的畫面。

Four buffers defragged into three

第一個解決方案

接下來我們回頭檢視 TIME 範例程式碼。我們在此遇到同樣的問題。32 位元整數的資料量很小,不太可能會發生分段的情形。但問題在於它可能會分段,且隨著流量增加,分段的可能性也會提高。

最簡單的解決方法就是建立一個內部累積緩衝區並等到所有 4 個位元組都收到再放入內部緩衝區。以下修改後的 TimeClientHandler 實作修復了此問題

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 一個 ChannelHandler 有兩個生命週期的監聽方法:handlerAdded()handlerRemoved()。您可以執行隨意的 (de) 初始化任務,只要不會長時間封鎖。
  2. 首先,所有接收到的資料都應累積到 buf 中。
  3. 然後處理常式必須檢查 buf 是否有足夠資料,在此範例中為 4 個位元組,並執行實際的工作邏輯。否則,資料更多時 Netty 會再次呼叫 channelRead() 方法,最後將累積所有 4 個位元組。

第二個解決方案

儘管第一個解決方案解決了 TIME 範例程式碼的問題,但修改後的處理常式看起來並不完美。想像一個由多個欄位組成、例如可變長度欄位的更複雜的協定。您的 ChannelHandler 實作將很快地變得難以維護。

您可能已經注意到,您可以將多個 ChannelHandler 加入 ChannelPipeline,因此可以將一個巨型的 ChannelHandler 拆成多個模組化的,以減少應用程式的複雜性。例如,您可以將 TimeClientHandler 拆成兩個處理常式

  • 處理分段問題的 TimeDecoder,以及
  • 最初版本的 TimeClientHandler

幸運的是,Netty 提供了一個可延伸的類別協助您撰寫第一個迴圈

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoder 是一種 ChannelHandler 實作,它可以輕鬆處理分段問題。
  2. decode() 會在每次收到新資料時,使用內部維護的累積緩衝區呼叫 ByteToMessageDecoder 實作。
  3. decode() 可以決定不將任何項目加到 out,其中 out 在累積緩衝區中沒有足夠資料。當收到更多資料時,ByteToMessageDecoder 將再次呼叫 decode()
  4. 如果 decode() 將物件新增至 out,表示解碼器已成功解碼訊息。 ByteToMessageDecoder 將捨棄累積緩衝區已讀取的部分。請記住,您不需要解碼多則訊息。 ByteToMessageDecoder 會持續呼叫 decode() 方法,直到無法新增任何內容至 out 為止。

現在我們有另一個要插入至 ChannelPipeline 的處理常式,我們應該修改 TimeClient 中的 ChannelInitializer 實作

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果您是冒險心十足的人,您可能想嘗試 ReplayingDecoder,它會進一步簡化解碼器。您需要參考 API 說明文件才能取得進一步的資訊。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty 提供內建的解碼器,讓您能夠非常輕鬆地實作大部分協定,而且有助於避免最後實作出巨型的難以維護的處理常式。請參閱下列套件以取得更詳細的範例

以 POJO 取代 ByteBuf

迄今為止我們複習過的所有範例都使用 ByteBuf 作為協定訊息的主要資料結構。在本節中,我們將改良 TIME 協定的客戶端和伺服器範例,改用 POJO 而非 ByteBuf

ChannelHandler 中使用 POJO 的優點非常明顯;您的處理常式可以透過將從 ByteBuf 中擷取資訊的程式碼從處理常式中分離出來,進而更易於維護和重複使用。在 TIME 的客戶端和伺服器範例中,我們只讀取一個 32 位元整數,直接使用 ByteBuf 沒有太大的問題。但是,當您實作真實世界的協定時,您會發現有必要進行分離。

首先,讓我們定義一種稱為 UnixTime 的新類型。

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

現在,我們可以修改 TimeDecoder,以便產生 UnixTime 而不是 ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

有了更新版的解碼器,TimeClientHandler 就不再使用 ByteBuf

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

如此一來是不是簡單優雅多了?這種技術也可以套用在伺服器端,這一次我們先來更新TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

現在,僅缺一塊拼圖,一個編碼器,它是一個ChannelHandler 的實作,用來將UnixTime 轉譯回ByteBuf。它比編寫解碼器簡單多了,因為編碼訊息時無須處理封包片段化與組合。

package io.netty.example.time;

public class TimeEncoder extends ChannelHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 在這個處理程序方法中,有幾件重要事項需要注意

    首先,我們將原本的ChannelPromise 原樣傳入,這樣 Netty 編碼資料實際寫入通訊線路時,它就會將其標示為成功或失敗。

    其次,我們沒有呼叫ctx.flush()。有一個獨立的處理程序方法void flush(ChannelHandlerContext ctx),是用來覆寫flush() 操作。

若要進一步簡化,你可以利用MessageToByteEncoder

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int) msg.value());
    }
}

最後一個任務是在伺服器端的ChannelPipeline 中,插入TimeEncoderTimeServerHandler 之前,這是一個很簡單的練習而已。

關閉應用程式

關閉 Netty 應用程式通常只要透過shutdownGracefully() 關閉你建立好的所有EventLoopGroup 即可。它會回傳一個Future,在你完全終止EventLoopGroup,且關閉所有屬於該群組的Channel 後,通知你完成。

摘要

在這一章中,我們快速瀏覽了 Netty,並展示如何建立一個完全運作的網路應用程式在 Netty 之上。

在接下來的章節中,有更詳細的 Netty 資訊。我們也鼓勵你檢閱io.netty.example 套件中的 Netty 範例。

另外請注意,社群一直期待你的問題與想法,以協助你並根據你的回饋改善 Netty 及其文件。

上次於 19-Jul-2024 檢索