跳過導覽

4.x 的使用者指南

您是否知道此頁面是自動從 Github Wiki 頁面 產生?您可以在 這裡 自行改進!

前言

問題

現今我們使用一般用途應用程式或函式庫彼此溝通。例如,我們經常使用 HTTP 客戶端函式庫從網路伺服器擷取資訊,並透過網路服務呼叫遠端程序。不過,一般用途協定或其實現有時無法擴充得很容易。這就像我們不會使用一般用途的 HTTP 伺服器交換大型檔案、電子郵件訊息,以及幾乎即時的訊息,如財務資訊和多人遊戲資料。需要一個專門用於特殊用途的高度最佳化協定實現。例如,您可能想要實作一個針對基於 AJAX 的聊天應用程式、媒體串流或大型檔案傳輸而最佳化的 HTTP 伺服器。您甚至可以設計和實作一個完全依據您的需要量身打造的全新協定。另一個不可避免的情況是,當您處理舊有私有協定時,為了確保與舊系統的互通性。這種情況下重要的是,我們如何在不犧牲結果應用程式的穩定性和效能的情況下,儘快實作該協定。

解決方案

Netty 專案致力於提供非同步事件驅動網路應用程式架構和工具,以便快速開發可維護的高效能和高擴充性通訊協定伺服器和用戶端。

換句話說,Netty 是 NIO 用戶端伺服器架構,可輕鬆快速開發網路應用程式,例如通訊協定伺服器和用戶端。它大幅簡化和精簡網路程式設計,例如 TCP 和 UDP Socket 伺服器開發。

「快速且簡易」並不表示所產生的應用程式會造成可維護性或效能問題。Netty 已謹慎考量 FTP、SMTP、HTTP 和各種二進位和基於文字的舊版通訊協定的實作經驗,並進行仔細設計。因此,Netty 已成功找出方法,可同時達成易於開發、效能、穩定性和彈性,且不須妥協。

部分使用者可能已經找到其他聲稱具備相同優點的網路應用程式架構,而你可能會想詢問 Netty 與他們有何不同。答案在於它的建構理念。Netty 的設計宗旨是在 API 和實作方面,從第一天起便能提供最舒適的體驗。這並非有形之物,但隨著你閱讀此指南並運用 Netty,你會體會到這項理念將讓你的工作變得輕鬆許多。

入門

本章巡禮 Netty 的核心建構,並提供簡易範例協助你快速入門。在本章結束前,你就能自行在 Netty 上撰寫用戶端和伺服器。

如果你偏好由上而下的學習方法,不妨從 第 2 章,架構概觀 開始,再回到這裡。

入門前

要在本章執行範例,只需符合兩個最低需求:最新版本的 Netty 和 JDK 1.6 或更高版本。最新版本的 Netty 可在 專案下載頁面 取得。如需下載正確版本的 JDK,請參閱你偏好的 JDK 廠商網站。

在閱讀過程中,你可能會對本章介紹的類別有更多疑問。如果你想深入了解,請參閱 API 參考。為了你的方便,本文中的所有類別名稱都連結到線上 API 參考。此外,如果你發現任何不正確的資訊、文法錯誤或拼寫錯誤,或是有好的點子可協助改善文件,請不要猶豫 聯絡 Netty 專案社群 並提供建議。

撰寫放棄伺服器

全世界最簡化的通訊協定不是「Hello, World!」而是 放棄。這是一種不回應而放棄所有接收資料的通訊協定。

要實作 DISCARD 通訊協定,您唯一需要做的事就是忽略所有接收到的資料。讓我們從 Netty 所產生的 I/O 事件處理程式實作開始。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandler 延伸 ChannelInboundHandlerAdapter,它是 ChannelInboundHandler 的實作。 ChannelInboundHandler 提供各種事件處理程式方法,您可以覆寫這些方法。目前來說,只要延伸 ChannelInboundHandlerAdapter 就夠了,無須自己實作處理程式介面。
  2. 我們在此覆寫 channelRead() 事件處理程式方法。這個方法會隨著接收到的訊息函式呼叫,每當從客戶端收到新資料時,就會發生這種情況。在此範例中,收到訊息的類型是 ByteBuf
  3. 為實作 DISCARD 通訊協定,處理程式必須忽略收到訊息。 ByteBuf 是參考計數物件,必須透過 release() 方法明確釋放。請記住,釋放傳遞至處理程式的任何參考計數物件是處理程式的責任。通常, channelRead() 處理程式方法會實作如下
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. 當 Netty 發生 I/O 錯誤,或是處理程式實作在處理事件時擲回例外情況時,會以 Throwable 函式呼叫 exceptionCaught() 事件處理程式方法。在多數情況下,應在此處記錄捕獲的例外情況,並關閉其關聯的通道,不過此方法的實作方式可能會有所不同,具體取決於您想要如何處理例外情況。例如,您可能想要在關閉連線之前,傳送含有錯誤碼的回應訊息。

到目前為止都很好。我們已經實作了 DISCARD 伺服器的上半部分。剩下的工作就是撰寫 main() 方法,並使用 DiscardServerHandler 啟動伺服器。

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是一個會處理 I/O 作業的多執行緒事件迴圈。Netty 提供了各種 EventLoopGroup 實作,適用於各種傳輸類型。我們在此範例中實作伺服器端應用程式,因此會使用兩個 NioEventLoopGroup。第一個稱為「boss」的迴圈,負責接受傳入的連線。第二個稱為「工作者」的迴圈,則負責在 boss 接受連線後傳送和接收已接受連線的相關流量,並將已接受連線註冊至工作者。所使用的執行緒數量,以及對已建立的 Channel 的對應方式,會因應 EventLoopGroup 實作而定,甚至也可以透過建構函式進行設定。
  2. ServerBootstrap 是一個會設定伺服器的輔助類別。您可以直接使用 Channel 來設定伺服器。不過請注意,這是一個繁瑣的程序,而且大多數時候您並不需要這麼做。
  3. 在此,我們指定要使用 NioServerSocketChannel 類別,可透過此類別建立新的 Channel,以接受傳入的連線。
  4. 在此指定的處理常式會永遠由新接受的 Channel 評估。這個 ChannelInitializer 是一個特定處理常式,其目的在於協助使用者設定新的 Channel。您最大可能會想要透過新增一些處理常式(例如 DiscardServerHandler)至 ChannelPipeline,來實作您的網路應用程式。隨著應用程式的複雜度提升,您可能會新增更多處理常式至這條串列,並最終將這個匿名類別摘取出一個頂層類別。
  5. 您也可以設定那些專屬於 Channel 實作的參數。我們正在撰寫 TCP/IP 伺服器,因此我們有權設定 tcpNoDelaykeepAlive 等套接字選項。請參閱 ChannelOption 和特定 ChannelConfig 實作的應用程式檔,以取得對受支援的 ChannelOption 概觀。
  6. 您注意到 option()childOption() 了嗎?option() 用於接受傳入連線的 NioServerSocketChannelchildOption() 用於父 ServerChannel 接受的 Channel,在本例中為 NioSocketChannel
  7. 我們現在準備開始了。剩下的就是繫結到埠並啟動伺服器。在此,我們繫結到機器中所有網路介面卡 (NIC) 的埠 8080。現在,您可以按自己的需要 (使用不同的繫結位址) 呼叫 bind() 方法多次。

恭喜!您剛剛在 Netty 上面完成了您的第一個伺服器。

檢視接收的資料

現在我們已經撰寫了我們的第一個伺服器,我們需要測試它是否真的有效。測試它的最簡單的方法是使用 _telnet_ 指令。例如,您可以在命令列中輸入 telnet localhost 8080 並輸入某個字元。

但是,我們能說伺服器運作正常嗎?我們其實並不知道,因為它是一個放棄伺服器。您根本不會收到任何回應。為了證明它真的有效,讓我們修改伺服器以印出它所接收到的內容。

我們已經知道每當收到資料時,就會呼叫 channelRead() 方法。讓我們在 DiscardServerHandlerchannelRead() 方法中放入一些代碼

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 這個效率低下的迴圈實際上可以簡化為:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 或者,您可以在這裡執行 in.release()

如果您再次執行 _telnet_ 指令,您會看到伺服器會印出它已接收到的內容。

放棄伺服器的完整原始程式碼位於分發套件的 io.netty.example.discard 套件中。

撰寫回音伺服器

到目前為止,我們一直在消耗資料,卻完全沒有回應。然而,伺服器通常應該對要求做出回應。讓我們透過實作 ECHO 通訊協定來學習如何撰寫回應訊息給客戶端,在該通訊協定中,任何接收到的資料都會傳送回客戶端。

與我們在先前各節中實作的放棄伺服器唯一的不同之處在於,它會將接收到的資料傳送回客戶端,而不是將接收到的資料列印到主控台上。因此,再次修改 channelRead() 方法就足夠了

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. 啟動 ChannelHandlerContext 物件提供各種作業,讓你觸發 I/O 事件與操作。此處,我們呼叫 write(Object) 來逐字寫入接收到的訊息。請注意,與在 DISCARD 範例不同的是,這裡沒有釋放收到的訊息。這是因為 Netty 在寫入後會自動釋放該訊息。
  2. ctx.write(Object) 並不會寫入訊息。而是會在內部緩衝,再由 ctx.flush() 寫入。或者,你可以為簡潔起見呼叫 ctx.writeAndFlush(msg)

如果你再次執行 telnet 指令,你會看到伺服器傳回你傳給它的內容。

回音伺服器的完整原始碼位於發行套件的 io.netty.example.echo 套件中。

寫入時間伺服器

本節要實作的通訊協定是 TIME 通訊協定。該協定不同於前述範例,它在未接收到任何要求時就傳送包含 32 位元整數的訊息,且在傳送訊息後就關閉連線。在本範例中,你會學習如何建構並傳送訊息,以及如何在完成時關閉連線。

我們打算略過接收到的資料,但在建立連線時就立即傳送訊息,因此這次我們無法使用 channelRead() 方法。相反地,我們應該覆寫 channelActive() 方法。以下為實作方式

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 經說明後得知,當連線建立並準備產生流量時,channelActive() 方法將會被呼叫。讓我們在此方法中寫入代表目前時間的 32 位元整數。

  2. 若要傳送新訊息,我們需要分配新的緩衝區來儲存訊息。我們要寫入 32 位元整數,因此我們需要建立一個 ByteBuf,且其容量至少為 4 位元組。透過 ChannelHandlerContext.alloc() 取得目前的 ByteBufAllocator,並分配一個新的緩衝區。

  3. 與平常一樣,我們撰寫建構的訊息。

    但是,翻轉在哪裡?我們以前在 NIO 傳送訊息之前不是都呼叫過 java.nio.ByteBuffer.flip() 嗎?ByteBuf 沒有這種方法,因為它有兩個指標:一個用於讀取作業,另一個用於寫入作業。寫入器索引會在寫入項目至 ByteBuf 時遞增,而讀取器索引不會變動。讀取器索引和寫入器索引表示訊息的開始和結束位置。

    相比之下,NIO 報緩衝區中並未提供一種簡潔的方法來找出訊息內容在不呼叫 flip 方法的情況下開始和結束的地方。如果您忘記翻轉緩衝區,那麼您將會陷入麻煩,這是因為將不會發送資料或將發送錯誤資料。Netty 中不會發生這樣一個錯誤,這是因為我們有不同的指標用於不同的操作類型。您將會發現,當您習慣這一點後,您的生活將會輕鬆許多 -- 一種不用翻轉的生活!

    要注意的另一點是 ChannelHandlerContext.write() (和 writeAndFlush())方法會傳回一個 ChannelFuture。一個 ChannelFuture 表示了一個尚未發生的 I/O 操作。這表示,任何請求的操作可能尚未被執行,因為在 Netty 中,所有操作都是非同步的。例如,下列程式碼可能會在傳送訊息之前關閉連線

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    因此,您需要在由 write() 方法傳回的 ChannelFuture 完成後呼叫 close() 方法,並在其寫入操作完成時通知其監聽器。請注意,close() 可能也無法立即關閉連線,並傳回一個 ChannelFuture

  4. 那麼,我們如何才能在寫入請求完成時收到通知呢?這就像將一個 ChannelFutureListener 新增到傳回的 ChannelFuture 一樣簡單。在此,我們建立一個新的匿名 ChannelFutureListener,該監聽器會在操作完成時關閉 Channel

    或者,您可以使用預先定義的監聽器簡化程式碼

    f.addListener(ChannelFutureListener.CLOSE);

若要測試我們的時間伺服器是否按預期運作,您可以使用 UNIX rdate 指令

$ rdate -o <port> -p <host>

其中,<port> 是您在 main() 方法中指定的埠號,而 <host> 通常是 localhost

寫入時間客戶端

不同於 DISCARDECHO 伺服器,我們需要一個客戶端來執行 TIME 通訊協定,因為人類無法將 32 位元二進位資料翻譯成日曆上的日期。在此部分中,我們將討論如何確保伺服器正常運作,以及學習如何使用 Netty 寫入客戶端。

Netty 中伺服器和客戶端之間最大也唯一的不同是,它們使用了不同的 BootstrapChannel 實作。請參閱下列程式碼

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrap 類似於 ServerBootstrap,但這適用於非伺服器通道,例如:客戶端或無連線通道。
  2. 如果您只指定一個 EventLoopGroup,則它將用作 boss 群組和工作群組。雖然 boss 工作群組並非用於客戶端。
  3. 取代 NioServerSocketChannel,可以使用 NioSocketChannel 來建立客戶端 Channel
  4. 請注意,我們在此不使用 childOption(),不同於我們在 ServerBootstrap 中使用的,因為客戶端 SocketChannel 沒有父代。
  5. 我們應該呼叫 connect() 方法,而不是 bind() 方法。

如您所見,這與伺服器端程式碼並沒有什麼不同。那麼 ChannelHandler 實作呢?它應該從伺服器接收 32 位元整數,將其轉換為人類可讀的格式,列印轉換後的時間,然後關閉連線。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 在 TCP/IP 中,Netty 會將從協力廠商傳送來的資料讀取為 ByteBuf

這看起來非常簡單,而且與伺服器端範例沒有什麼不同。但是,這個處理程式有時會拒絕運作,並引發 IndexOutOfBoundsException。我們在下一節會討論為什麼會發生這種情況。

處理串流傳輸

Socket 緩衝區的一小部分問題

在串流傳輸(例如 TCP/IP)中,接收的資料會儲存在 socket 接收緩衝區。不幸的是,串流傳輸的緩衝區並非封包佇列,而是位元組佇列。這意味著,就算您傳送兩個訊息作為兩個獨立的封包,但作業系統不會將它們當作兩個訊息看待,而會視為一堆位元組。因此,無法保證您所讀取的內容完全就是遠端協力廠商所寫的內容。例如,假設作業系統的 TCP/IP 堆疊接收了三個封包

Three packets received as they were sent

由於串流傳輸協定的這個一般屬性,因此在您的應用程式中以下列的片段形式讀取它們的機率很高

Three packets split and merged into four buffers

因此,無論是伺服器端或客戶端,接收端都應將收到的資料去片段化成一個或多個有意義的框架,以便應用程式邏輯能輕鬆理解它們。就上述範例而言,應如下列所示對接收到的資料進行分框

Four buffers defragged into three

第一個解決方案

現在讓我們回到 TIME 範例程式碼。我們在這裡遇到相同的問題。32 位元整數是極少量的資料,而且不太可能會經常分段。但是,問題在於它可能分段,而且隨著流量增加,分段的可能性也會增加。

簡單的解決方案是建立一個內部累加緩衝區,並等到所有 4 個位元組都收到內部緩衝區中。以下是已修正問題之修改過後的 TimeClientHandler 實作

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandler 有兩個生命週期 listener 方法:handlerAdded()handlerRemoved()。只要不要同時區段太長時間,就可以執行任意 (de) 初始化工作。
  2. 首先,所有收到的資料都應累加到 buf 中。
  3. 再來,handler 必須確認 buf 是否有足夠資料,此範例中為 4 個位元組,並進行實際的業務邏輯。否則,當更多資料到達時,Netty 將再次呼叫 channelRead() 方法,並進而累加所有 4 個位元組。

第二個解決方案

儘管第一個解決方案已解決 TIME 範例程式碼的問題,修改後的 handler 看起來並不那麼乾淨。想像一個更複雜的協定,其中包含多個欄位,例如可變長度欄位。您的 ChannelInboundHandler 實作很快將無法維護。

如同您可能已注意到的,您可以將一個以上的 ChannelHandler 加入 ChannelPipeline,因此,您可以將一個單體形的 ChannelHandler 分割成多個模組化的 handler,以降低應用程式的複雜度。例如,您可以將 TimeClientHandler 分割成兩個 handler

  • 處理分段問題的 TimeDecoder,以及
  • TimeClientHandler 的初始簡單版本。

很幸運地,Netty 提供一個可延伸類別,可協助您直接撰寫第一段程式碼

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler 的一個實作,它能輕鬆處理分段問題。
  2. ByteToMessageDecoder 在接收新的資料時,會呼叫 decode() 方法,並使用內部維護的累加緩衝區。
  3. 當累加緩衝區中沒有足夠資料時,decode() 可以決定不要將任何資料加入 out當有更多資料接收時,ByteToMessageDecoder 將會再次呼叫 decode()
  4. 如果 decode() 將物件加入 out,表示解碼器已成功解碼訊息。 ByteToMessageDecoder 會捨棄累積緩衝區已讀取的部分。請記住,您不需要解碼多則訊息。 ByteToMessageDecoder 會一直呼叫 decode() 方法,直到沒有內容加入 out

現在我們有一個處理常式可以插入 ChannelPipeline 中,我們應該修改 TimeClientChannelInitializer 的實作

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果您是冒險者,您可能會想嘗試 ReplayingDecoder,它能進一步簡化解碼器。但是,您需要參考 API 參考文件來取得更多資訊。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty 提供開箱即用的解碼器,讓您可以很輕鬆地實作大部分通訊協定,並幫助您避免最後得到無法維護的龐大處理常式實作。請參閱以下套件以取得更詳細的範例

使用 POJO 取代 ByteBuf

到目前為止,我們檢視的所有範例都將 ByteBuf 作為通訊協定訊息的主要資料結構。在本節中,我們將改進 TIME 通訊協定用戶端和伺服器範例,來使用 POJO 取代 ByteBuf

在您的 ChannelHandler 中使用 POJO 有個顯而易見的優點:將從 ByteBuf 中萃取資料的程式碼從處理常式中分離出來,會讓您的處理常式更易維護和重複使用。在 TIME 用戶端和伺服器範例中,我們只讀取一個 32 位元的整數,因此直接使用 ByteBuf 沒有什麼重大問題。不過,當您實作真實世界的通訊協定時,一定會發現有必要進行分離。

首先,讓我們定義一個稱為 UnixTime 的新類型。

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

我們現在可以修改 TimeDecoder,讓它產生 UnixTime,而不是 ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

有了更新後的解碼器,TimeClientHandler 就不再使用 ByteBuf

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

更簡單、更優雅,對吧?可以在伺服器端套用相同的技巧。這次,讓我們先更新 TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

現在,唯一遺失的部分是編碼器,它是 ChannelOutboundHandler 的實作,會把 UnixTime 翻譯回 ByteBuf。它比撰寫解碼器簡單得多,因為在編碼訊息時,無需處理封包分段與組裝。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 在這單一行中,有一些頗為重要的內容。

    首先,我們原原本本地傳遞原始 ChannelPromise,以便 Netty 在編碼資料實際寫入線路時,將標示為成功或失敗。

    其次,我們未呼叫 ctx.flush()。有一個獨立的處理常式 void flush(ChannelHandlerContext ctx) 專門用於覆寫 flush() 作業。

若要進一步簡化,您可以運用 MessageToByteEncoder

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

剩下的最後一項工作是將 TimeEncoder 插入伺服器端的 ChannelPipeline 中,置於 TimeServerHandler 前面,這是一個極微不足道的練習。

關閉您的應用程式

關閉 Netty 應用程式通常和關閉透過 shutdownGracefully() 建立的所有 EventLoopGroup 一樣簡單。它會傳回一個 FutureEventLoopGroup 完全終止後,以及屬於此群組的所有 Channel 關閉後,向您發出通知。

摘要

在本章中,我們快速瀏覽了 Netty,並展示了如何在 Netty 之上撰寫一個完全運作的網路應用程式。

後續各章節中,將有更詳細的 Netty 資訊。我們也鼓勵您檢閱 io.netty.example 套件中的 Netty 範例。

請另行注意,社群 隨時樂於回答您的問題,並提供構想,以協助您、並根據您的意見反應,持續改善 Netty 及其文件。

最近於 2024 年 7 月 19 日擷取