来源于源码阅读笔记。
前提:
机器故障是常态
文件不能丢失
需要对文件进行冗余的拷贝备份
思路:
不足拷贝数的:及时复制
超过拷贝数的:删除多余的
无效的:直接删除
几个常驻内存的队列
NeededReplications
需要进行replicate
的
blocks
PendingReplications
正在进行replicate
的
blocks
ExcessReplicateMap
超过Replicator
数的
blocks
RecentInvalidateSets
当前状态是失效的blocks
UnderReplicatedBlocks
NeededReplications所属的类
保存所有当前需要
Replicate
的
block
信息
每个
Block
都有不同的
Replicate
优先级
0为最高
4表示不需要进行
Replica
优先级队列
0 只有一个
Replica
的
block
1 当前
Replica*3<
期望
Replica
数的
block
2 其他
3 所有
Replica
都在一个
Rack
的
block
PendingReplications
正在等待
DataNode
进行
Replicate
的
blocks
pendingReplicationMonitor线程对其进行监视
监视超时仍未
Replicate
完成的
block
超时设置为
dfs.replication.pending.timeout.sec
PendingReplicationMonitor
当
NameNode
收到
blockReceive
的信息,将对应等待
replica
的
block
移除,表示
replicate
成功
当发现超时的
block
,将其加入
timeoutItems
队列
ReplicationMonitor
独立的线程执行主要的
Replicate
工作
间隔:
dfs.replication.interval
默认
3
秒
computedDatanodeWork
– computeDatanodeWork
– processPendingReplications
ComputeDatanodework
执行
block replication
和
invalidateion
具体的操作将于下次
heartbeat
时被通知到相对应的
datanode
Safemode时不执行
几个参数:
– blockToProcess:一次工作最多能
replicate
的
block
个数
heartbeatSize * REPLICATION_WORK_MULTIPLIE_PRE_ITERATION(默认为
2
,即活着
dn
的两倍)
– nodesToProcess:一次工作最多进行
invalidate
的
dn
个数
heartbeatSize*INVALIDATE_WORK_PCT_PRE_ITERATION (默认为
32%
,即
1/3
的
dn
)
– workFound:如果没有需要
Replicate
的
block
,则执行
invalidation
(
Heartbeat.size()实际是当前收到的所有
heartbeat
的数目,即活着的
dn
的个数
)
执行步骤
(1)获取一个
srcNode
即发起
replicate
的
datanode
(2)排除已经在
pending
并且个数足够的
replica
(3)选取一个
TargetNode
即需要将
replica
传输至 的
datanode
(4)更新
srcNode
在
NameNode
中的信息,加入
replicatedblocks
对象与
targetNode
(5)更新
targetNode
的
curApproxBlocksScheduled
信息
(6)最后将此
block
从
needed
队列移除,加入
pending
队列
(
TSP问题,实际是按照树的深度之和,计算两个
dn
距离,利用两次循环(选择排序)得出
pipeline
)
获取srcNode
的算法
期望:获取一个正处于
DECOMMISION_INPROGRESS
状态的
datanode
原因:最不忙(没有写的traffic
)
不使用已经
decommissioned
的
datanode
如果都不满足,则随机选择一个为达到
replication limit
的
datanode
computeInvalidateWork流程
处理
recentInvalidateSets
队列中已经失效的
block
recentInvalidateSets: TreeSet<DN,list<block>>
共执行
nodesToProcess
次循环
每次循环,取出头一个
DN
对应的
blocklist
从中取出不超过
blockInvalidateLimit
个
block
blockInvalidateLimit = max(100, 20 * heartbeatinterval / 1000)
剩余的继续放回队列中
将选出的
block
更新进对应的
datanode
中
ProcessPendingReplications
处理超时的
replica
循环
timeoutItems
中的对象,将其重新放回
needed
队列
DataNode heartbeat 后的工作
生成
replicate command (DNA_TRANSFER)
maxReplicationStreams – xmitsInProgress 个
(dfs.max-repl-streams,2) (并发的
xceriver
个数,
dn
的
threadGroup.activeCount)
生成
invalidate command (DNA_INVALIDATE)
blockInvalidateLimit个
max(100, 20 * heartbeatInterval / 1000 )
heartbeat频率的
20
倍,即一次最多
20
个
NeededReplications更新
(每隔(dfs.namenode.decommission.interval,30) * 1000
间隔检测一次)
1.NameNode启动,
leave safemode
时
2.Decommission Manager线程,检测处于
Decommission
状态的
datanode(1)
3.File complete
4.checkLease Manager
RecentInvalidate更新
1.Excess Replica
2.setReplica 变小
3.blockReport通知
4.删除文件
5.DiskError
ExcessReplicateMap
保存超过
Replica
数的
block
每当某
datanode
加入一个新的
block
,选择另外一个
datanode(1)
,并将其加入
recentInvalidate队列,等待删除
更新时刻:
– setRep 变小
– addStroedBlock
算法:
传入的参数是nonExcess list
1.从所有的
datanodes
中,生成一个
map<Rack,list<dn>>;
2.从
map
中分为两个集合
priSet
(多于一个
dn
的
Rack
),
remains
(仅有一个的)
3.先保证满足
delHint(
只有
blockReport
给出
)
4.从
priSet
中选剩下空间最小的
5.从
remain
中选剩下空间最小的