Hadoop Replication策略

论坛 期权论坛 脚本     
匿名技术用户   2020-12-31 00:59   11   0

来源于源码阅读笔记。

前提:

机器故障是常态

文件不能丢失

需要对文件进行冗余的拷贝备份

思路:

不足拷贝数的:及时复制

超过拷贝数的:删除多余的

无效的:直接删除

几个常驻内存的队列

NeededReplications

需要进行replicate blocks

PendingReplications

正在进行replicate blocks

ExcessReplicateMap

超过Replicator 数的 blocks

RecentInvalidateSets

当前状态是失效的blocks

UnderReplicatedBlocks

NeededReplications所属的类

保存所有当前需要 Replicate block 信息

每个 Block 都有不同的 Replicate 优先级

0为最高

4表示不需要进行 Replica

优先级队列
0 只有一个 Replica block
1 当前 Replica*3< 期望 Replica 数的 block
2 其他
3 所有 Replica 都在一个 Rack block

PendingReplications
正在等待 DataNode 进行 Replicate blocks
pendingReplicationMonitor线程对其进行监视
监视超时仍未 Replicate 完成的 block
超时设置为 dfs.replication.pending.timeout.sec

PendingReplicationMonitor
NameNode 收到 blockReceive 的信息,将对应等待 replica block 移除,表示 replicate 成功

当发现超时的 block ,将其加入 timeoutItems 队列
ReplicationMonitor
独立的线程执行主要的 Replicate 工作
间隔: dfs.replication.interval 默认 3
computedDatanodeWork
– computeDatanodeWork
– processPendingReplications

ComputeDatanodework
执行 block replication invalidateion
具体的操作将于下次 heartbeat 时被通知到相对应的 datanode
Safemode时不执行
几个参数:
– blockToProcess:一次工作最多能 replicate block 个数
heartbeatSize * REPLICATION_WORK_MULTIPLIE_PRE_ITERATION(默认为 2 ,即活着 dn 的两倍)
– nodesToProcess:一次工作最多进行 invalidate dn 个数
heartbeatSize*INVALIDATE_WORK_PCT_PRE_ITERATION (默认为 32% ,即 1/3 dn
– workFound:如果没有需要 Replicate block ,则执行 invalidation

( Heartbeat.size()实际是当前收到的所有 heartbeat 的数目,即活着的 dn 的个数 )

执行步骤
(1)获取一个 srcNode 即发起 replicate datanode
(2)排除已经在 pending 并且个数足够的 replica
(3)选取一个 TargetNode 即需要将 replica 传输至 的 datanode
(4)更新 srcNode NameNode 中的信息,加入 replicatedblocks 对象与 targetNode
(5)更新 targetNode curApproxBlocksScheduled 信息
(6)最后将此 block needed 队列移除,加入 pending 队列

( TSP问题,实际是按照树的深度之和,计算两个 dn 距离,利用两次循环(选择排序)得出 pipeline )

获取srcNode 的算法
期望:获取一个正处于 DECOMMISION_INPROGRESS 状态的 datanode
原因:最不忙(没有写的traffic
不使用已经 decommissioned datanode
如果都不满足,则随机选择一个为达到 replication limit datanode

computeInvalidateWork流程
处理 recentInvalidateSets 队列中已经失效的 block
recentInvalidateSets: TreeSet<DN,list<block>>
共执行 nodesToProcess 次循环
每次循环,取出头一个 DN 对应的 blocklist
从中取出不超过 blockInvalidateLimit block
blockInvalidateLimit = max(100, 20 * heartbeatinterval / 1000)
剩余的继续放回队列中
将选出的 block 更新进对应的 datanode

ProcessPendingReplications
处理超时的 replica
循环 timeoutItems 中的对象,将其重新放回 needed 队列

DataNode heartbeat 后的工作

生成 replicate command (DNA_TRANSFER)

maxReplicationStreams – xmitsInProgress
(dfs.max-repl-streams,2) (并发的 xceriver 个数, dn threadGroup.activeCount)

生成 invalidate command (DNA_INVALIDATE)

blockInvalidateLimit
max(100, 20 * heartbeatInterval / 1000 )
heartbeat频率的 20 倍,即一次最多 20


NeededReplications更新

(每隔(dfs.namenode.decommission.interval,30) * 1000 间隔检测一次)
1.NameNode启动, leave safemode
2.Decommission Manager线程,检测处于 Decommission 状态的 datanode(1)
3.File complete
4.checkLease Manager

RecentInvalidate更新
1.Excess Replica
2.setReplica 变小
3.blockReport通知
4.删除文件
5.DiskError

ExcessReplicateMap
保存超过 Replica 数的 block
每当某 datanode 加入一个新的 block ,选择另外一个 datanode(1) ,并将其加入
recentInvalidate队列,等待删除
更新时刻:
– setRep 变小
– addStroedBlock

算法:
传入的参数是nonExcess list
1.从所有的 datanodes 中,生成一个 map<Rack,list<dn>>;
2. map 中分为两个集合 priSet (多于一个 dn Rack ), remains (仅有一个的)
3.先保证满足 delHint( 只有 blockReport 给出 )
4. priSet 中选剩下空间最小的
5. remain 中选剩下空间最小的


分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP