完成上述步驟後,DistributedFileSystem將返回壹個FSDataInputStream(支持文件查找),客戶端可以從FSDataInputStream中讀取數據。FSDataInputStream包裝了壹個DFSInputSteam類來處理namenode和datanode的I/O操作。
然後,客戶端執行read()方法(步驟3),DFSInputStream(已經存儲了要讀取的文件的前幾個塊的位置信息)連接到第壹個datanode(即最近的datanode)以獲取數據。通過重復調用read()方法(步驟4和5),文件中的數據被傳輸到客戶端。當讀取到塊的結尾時,DFSInputStream將關閉指向該塊的流,轉而查找下壹個塊的位置信息,然後重復調用read()方法繼續對該塊進行流讀取。這些過程對用戶是透明的,在用戶看來,這是對整個文件的不間斷流式讀取。
讀取真實文件後,客戶端調用FSDataInputSteam中的close()方法來關閉文件輸入流(步驟6)。
如果DFSInputStream在讀取塊時檢測到錯誤,dfsinputstream將連接到下壹個datanode以獲取該塊的其他備份,同時,它將記錄之前檢測到的損壞的datanode以避免將來重復讀取該datanode。DFSInputSteam還將檢查從datanode讀取的數據的校驗和,如果發現有數據損壞,它將向namenode報告損壞的塊,並重新讀取其他datanode上的其他塊備份。
這種設計模式的壹個優點是文件讀取遍布在這個集群的datanode上,namenode只提供文件塊的位置信息,需要的帶寬非常少,從而有效避免了單點瓶頸問題,擴大了集群的規模。
Hadoop中的網絡拓撲
如何測量Hadoop集群中兩個節點之間的距離?要知道,在高速處理數據時,數據處理速度的唯壹限制因素是數據在不同節點之間的傳輸速度:這是由可怕的帶寬不足造成的。所以我們以帶寬為標準來衡量兩個節點之間的距離。
但是計算兩個節點之間的帶寬比較復雜,需要在靜態集群中進行測量,但是Hadoop集群壹般會隨著數據處理的規模而動態變化(並且兩個節點之間直接連接的連接數是節點數的平方)。因此Hadoop使用壹種簡單的方法來測量距離。它將集群中的網絡表示為樹形結構,兩個節點之間的距離是它們與祖先之間距離的總和。樹通常根據數據中心、機架和數據節點的結構來組織。計算節點上的本地計算速度最快,跨數據中心的計算速度最慢(跨數據中心的Hadoop集群現在很少使用,通常在壹個數據中心完成)。
如果計算節點n1位於數據中心c1的機架r1上,則可以表示為/C1/R1,以下是不同條件下兩個節點之間的距離:
距離(/d1/r1/n1,/d 1/r 1/n 1)= 0(同壹節點上的進程)
距離(/d1/r1/n1,/d 1/r 1/N2)= 2(同壹機架上的不同節點)
距離(/d1/r1/n1,/d 1/R2/n3)= 4(同壹數據中心不同機架上的節點)
距離(/d1/r1/n1,/D2/R3/n4)= 6(不同數據中心中的節點)
如下圖所示:
Hadoop
寫文件
現在我們來看看Hadoop中的文件寫入機制。通過文件寫入機制,我們可以更好地理解Hadoop中的壹致性模型。
Hadoop
上圖向我們展示了壹個創建新文件並將數據寫入其中的示例。
首先,客戶端通過DistributedFileSystem上的create()方法指示要創建的文件的文件名(步驟1),然後DistributedFileSystem通過RPC調用向NameNode申請創建壹個新文件(步驟2,該文件尚未分配相應的塊)。Namenode檢查是否存在同名文件,以及用戶是否具有創建該文件的相應權限。如果檢查通過,namenode將為該文件創建壹個新記錄;否則,文件創建將失敗,客戶端將獲得IOException異常。DistributedFileSystem返回壹個FSDataOutputStream供客戶端寫入數據。與FSDataInputStream類似,FSDataOutputStream封裝了壹個DFSOutputStream來處理namenode和datanode之間的通信。
當客戶端開始寫入數據時(步驟3),DFSOutputStream將寫入的數據分成數據包,並將它們放入中間隊列-數據隊列。DataStreamer從數據隊列中獲取數據,並向namenode申請壹個新塊來存儲它所獲得的數據。Namenode選擇壹系列合適的datanodes(數量由文件中副本的數量決定)來形成管道。這裏,我們假設副本是3,因此管道中有三個datanodes。DataSteamer將數據寫入管道流中的第壹個datanode(步驟4),第壹個datanode將接收到的數據傳輸到第二個datanode(步驟4),依此類推。
同時,DFSOutputStream還維護另壹個中間隊列——ack隊列。ACK隊列中的數據包不會被移出ACK隊列,直到它們被管道中的所有datanode確認(步驟5)。
如果在寫入數據時典當了datanode,將執行以下對用戶透明的步驟:
1)當管道線路關閉時,確認隊列中的所有數據將被移動到數據隊列的頭部並重新傳輸,這可以確保管道線路中典當數據節點下遊的數據節點不會因為典當數據節點而丟失數據包。
2)在仍然正常運行的datanode上的當前塊上做壹個標記,這樣當被丟棄的datanode重新啟動時,namenode將知道datanode上的哪個塊是崩潰後留下的部分損壞的塊,以便可以將其刪除。
3)從流水線中刪除當掉的datanode,未完成塊的其他數據繼續寫入仍在正常運行的其他兩個datanode中。namenode知道該數據塊仍處於復制不足狀態(即備份數量不足),然後他將安排壹個新的副本來達到所需的備份數量。後續的塊寫入方法與之前的正常時間相同。
管道中的多個datanode可能會失敗(盡管這並不常見),但只要創建了dfs.replication.min(默認為1)副本,我們就認為創建成功了。剩余的副本將在未來異步創建,以達到指定的副本數量。
當客戶端完成數據寫入時,它將調用close()方法(步驟6)。該操作將把所有剩余的包刷新到管道中,等待這些包被成功確認,然後通知namenode文件已成功寫入(步驟7)。此時,namenode知道文件由哪些塊組成(因為DataStreamer請求namenode分配新塊,namenode當然會知道它為給定文件分配了哪個塊),它將等待創建最小數量的副本,然後成功返回。
副本是如何分發的?
Hadoop在創建新文件時如何選擇塊的位置?壹般來說,應考慮以下因素:帶寬(包括寫入帶寬和讀取帶寬)和數據安全性。如果我們將所有三個備份放在壹個datanode上,它可以避免寫入帶寬的消耗,但它幾乎無法提供數據冗余帶來的安全性,因為如果這個datanode崩潰,該文件的所有數據都將丟失。另壹個極端是,如果三個冗余備份都放在不同的機架上,即使在數據中心,盡管數據將是安全的,但寫入數據將消耗大量帶寬。Hadoop 0.17.0為我們提供了默認的副本分配策略(在Hadoop 1之後。x,副本策略允許可插拔,也就是說,您可以制定自己的副本分配策略)。副本的默認分配策略是將第壹個備份放在與客戶端相同的datanode上(如果客戶端在群集外運行,則隨機選擇壹個datanode來存儲第壹個副本),將第二個副本放在與第壹個副本機架不同的隨機datanode上,將第三個副本放在與第二個副本機架相同的隨機datanode上。如果副本數量大於三個,後續副本將隨機存儲在集群中,Hadoop將盡量避免在同壹機架中存儲太多副本。選擇副本位置後,管線的網絡拓撲如下:
Hadoop
壹般來說,上面的默認副本分配策略為我們提供了良好的可用性(將塊放在兩個機架上,這樣更安全)、寫帶寬優化(寫數據只需跨越壹個機架)和讀帶寬優化(您可以從兩個機架中選擇距離較近的壹個進行讀取)。
壹致性模型
HDFS在某些地方可能不符合POSIX的性能(是的,妳沒有看錯,POSIX不僅適用於linux/unix,Hadoop使用POSIX設計來讀取文件系統文件流),因此它可能看起來與妳預期的不同,所以要小心。
創建文件後,可以在命名空間中看到該文件:
路徑p =新路徑(“p“);
fs . create(p);
assertThat(fs . exists(p),is(true));
但是,寫入該文件的任何數據都不能保證可見。即使刷新寫入的數據,該文件的長度仍可能為零:
路徑p =新路徑(“p“);
OutputStream out = fs . create(p);
out.write(“內容”。getBytes(“UTF-8“);
out . flush();
assert that(fs . getfilestatus(p)。getLen(),is(0L));
這是因為,在Hadoop中,該文件中的內容只有在完整的數據塊寫入文件後才可見(即這些數據將寫入硬盤),因此當前正在寫入的數據塊中的內容始終不可見。
Hadoop提供了壹種強制將緩沖區中的內容刷新到datanode的方法,該方法是FSDataOutputStream的sync()方法。調用sync()方法後,Hadoop確保所有已寫入的數據都已刷新到管道中的datanode中,並且對所有讀者都可見:
路徑p =新路徑(“p“);
FSDataOutputStream out = fs . create(p);
out.write(“內容”。getBytes(“UTF-8“);
out . flush();
out . sync();
assert that(fs . getfilestatus(p)。getLen()是(((長)“內容”。length())));
這個方法類似於POSIX中的fsync系統調用(它將給定文件描述符中的所有緩沖數據刷新到磁盤)。例如,使用java API寫壹個本地文件,我們可以確保在調用flush()和synchronization後可以看到寫了什麽:
file output stream out = new file output stream(local file);
out.write(“內容”。getBytes(“UTF-8“);
out . flush();//刷新到操作系統
out.getFD()。sync();//同步到磁盤(getFD()返回流對應的文件描述符)
assertThat(local file . length)是((long)“內容”。length())));
在HDFS中關閉流會隱式調用sync()方法:
路徑p =新路徑(“p“);
OutputStream out = fs . create(p);
out.write(“內容”。getBytes(“UTF-8“);
out . close();
assert that(fs . getfilestatus(p)。getLen()是(((長)“內容”。length())));
由於Hadoop中壹致性模型的限制,如果我們不調用sync()方法,我們很可能會丟失壹大塊數據。這是不可接受的,因此我們應該使用sync()方法來確保數據已經寫入磁盤。但是頻繁調用sync()方法並不好,因為這會導致大量的額外開銷。我們可以在寫入壹定量的數據後調用sync()方法。至於具體的數據大小,這取決於您的應用程序。數據越大越好,不會影響應用程序的性能。