最近在读memcached的源码,这里贴一个assoc.c文件中所实现的哈希表的功能。
该文件实现了内部的一个hash算法。实现的功能有hash查找,插入,删除。
线程结构是一个内部的维护线程。该线程在变量expanding=true的时候启动,也就是在insert触发expand事件时进行。
因为启动了额外的线程进行扩容的工作,所以并不耽误对于hashtable的查找。(只是查找的算法比之前更复杂了。)
static void *assoc_maintenance_thread(void *arg) {
// do_run_maintenance_thread一直为true
while (do_run_maintenance_thread) {
int ii = 0;
/* Lock the cache, and bulk move multiple buckets to the new
* hash table. */
// 锁住全局的锁,防止hash表内部结构或者item结构等被改动。
// 除了这个线程,这个锁在thread.c被使用过,那里是在申请item,slab的时候锁的。
pthread_mutex_lock(&cache_lock);
// expanding是一个标志量,表示是否需要扩容。
// 在insert时如果判断哈希表密度过大,会设置为true,并唤醒该线程。
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
item *it, *next;
int bucket;
// 这里将一个bucket(也即是hashtable数组的一个链表元素)移动到新的hashtable中。
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;
bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}
old_hashtable[expand_bucket] = NULL;
// expand_bucket表示的是已经有多少个bucket被移动到了新的primary_hashtable中。
expand_bucket++;
if (expand_bucket == hashsize(hashpower - 1)) {
expanding = false;
free(old_hashtable);
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion done\n");
}
}
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
pthread_cond_wait(&maintenance_cond, &cache_lock);
}
pthread_mutex_unlock(&cache_lock);
}
return NULL;
}
程序中用到的两个常用方法:
// 计算2的x次方。
#define hashsize(n) ((ub4)1<<(n))
// 计算mask码。
#define hashmask(n) (hashsize(n)-1)
hashmask()方法是获得一个二进制的mask码。简单如:0000 0000 1111 1111。此时所表示的hash_power为8。(程序初始时的hash_power为16,此处为举例方便)
hash()方法计算出一个key的hash_key。
hash_key & hashmask()的结果就是该key需要存放的数组下标位置,比如:
0110 0100 1001 0011 & 0000 0000 1111 1111 = 0000 0000 1001 0011
assoc_find,_hashitem_before都是查找的函数。
assoc_find查找指定的item,_hashitem_before查找该item之前的节点的地址。
[1] -> a
[2] -> b
[3] -> c
[4] -> d -> e
[5] -> f
.
.
.
在上面的例子中,assoc_find("a", 1)找到的会是a,_hashitem_before("e", 1)返回的会是&d,_hashitem_before("f", 1)返回的是0,因为f没有前置节点了。
查找的基本思路如下:
判断expanding值是否为真,如果是,表示现在正处在hash扩容阶段,那么全局便会有old_hashtable和primary_hashtable两个哈希表,查找也将针对两个hash表进行。
随后要定位该key是在old_hashtable中还是primary_hashtable中。
定位hashtable的方法很简单,判断hash值和expand_bucket之间的大小差距。expand_bucket变量表示旧的bucket中已经有多少个被移动到新的bucket了,该值在expanding的过程中,每移动一个bucket就会加1。
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket];
} else {
it = primary_hashtable[hv & hashmask(hashpower)];
}
old_hashtable:
[0]
[1]
[2]
[3]
[4]
[5] -> e
[6] ->
[7] -> f
primary_hashtable:
[1] -> a
[2] ->
[3] ->
[4] -> d
在上例中,当前的expand_bucket值为5,也就是0-4的数组位置对应的item都已经移动到了primary_hashtable中,而5之后的位置还在维护中。
如果查找5以及5以上的,就在old中,否则在primary中。
insert操作会触发expand事件,触发的条件是hash表中2/3的内容都被填满了。
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_expand();
}
而insert操作和maintaince_thread之间是无锁的,这怎么看都有可能出问题。
insert操作和find操作的思想是一样的,
首先判断位置是在old_hashtable还是在primary_hashtable中。
如果在old_hashtable中,那么随后的操作就是对old_hashtable该位置链表节点的更新。
但是此时maintance_thread也有可能正在更新old_hashtable的这个位置。冲突发生了。
int assoc_insert(item *it) {
uint32_t hv;
unsigned int oldbucket;
assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */
hv = hash(ITEM_key(it), it->nkey, 0);
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
// 此处是寻找和插入的逻辑,虽然涉及到可能会出现同步错误的操作只有两步,
// 但还是有危险的。
// 可能这边正在插入着,另一边的maintance线程就将这个处理了一半bucket移动到
// primay_hashtable中了。
it->h_next = old_hashtable[oldbucket];
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)];
primary_hashtable[hv & hashmask(hashpower)] = it;
}
hash_items++;
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_expand();
}
MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
return 1;
}
|