From e7fd41792fc0ee52a05fcaac87511f118328d147 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Wed, 18 Jan 2006 09:30:29 +0000 Subject: [DLM] The core of the DLM for GFS2/CLVM This is the core of the distributed lock manager which is required to use GFS2 as a cluster filesystem. It is also used by CLVM and can be used as a standalone lock manager independantly of either of these two projects. It implements VAX-style locking modes. Signed-off-by: David Teigland Signed-off-by: Steve Whitehouse --- fs/dlm/dir.c | 423 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 423 insertions(+) create mode 100644 fs/dlm/dir.c (limited to 'fs/dlm/dir.c') diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c new file mode 100644 index 000000000000..0f1dde54bcd2 --- /dev/null +++ b/fs/dlm/dir.c @@ -0,0 +1,423 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#include "dlm_internal.h" +#include "lockspace.h" +#include "member.h" +#include "lowcomms.h" +#include "rcom.h" +#include "config.h" +#include "memory.h" +#include "recover.h" +#include "util.h" +#include "lock.h" +#include "dir.h" + + +static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de) +{ + spin_lock(&ls->ls_recover_list_lock); + list_add(&de->list, &ls->ls_recover_list); + spin_unlock(&ls->ls_recover_list_lock); +} + +static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len) +{ + int found = FALSE; + struct dlm_direntry *de; + + spin_lock(&ls->ls_recover_list_lock); + list_for_each_entry(de, &ls->ls_recover_list, list) { + if (de->length == len) { + list_del(&de->list); + de->master_nodeid = 0; + memset(de->name, 0, len); + found = TRUE; + break; + } + } + spin_unlock(&ls->ls_recover_list_lock); + + if (!found) + de = allocate_direntry(ls, len); + return de; +} + +void dlm_clear_free_entries(struct dlm_ls *ls) +{ + struct dlm_direntry *de; + + spin_lock(&ls->ls_recover_list_lock); + while (!list_empty(&ls->ls_recover_list)) { + de = list_entry(ls->ls_recover_list.next, struct dlm_direntry, + list); + list_del(&de->list); + free_direntry(de); + } + spin_unlock(&ls->ls_recover_list_lock); +} + +/* + * We use the upper 16 bits of the hash value to select the directory node. + * Low bits are used for distribution of rsb's among hash buckets on each node. + * + * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of + * num_nodes to the hash value. This value in the desired range is used as an + * offset into the sorted list of nodeid's to give the particular nodeid. + */ + +int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) +{ + struct list_head *tmp; + struct dlm_member *memb = NULL; + uint32_t node, n = 0; + int nodeid; + + if (ls->ls_num_nodes == 1) { + nodeid = dlm_our_nodeid(); + goto out; + } + + if (ls->ls_node_array) { + node = (hash >> 16) % ls->ls_total_weight; + nodeid = ls->ls_node_array[node]; + goto out; + } + + /* make_member_array() failed to kmalloc ls_node_array... */ + + node = (hash >> 16) % ls->ls_num_nodes; + + list_for_each(tmp, &ls->ls_nodes) { + if (n++ != node) + continue; + memb = list_entry(tmp, struct dlm_member, list); + break; + } + + DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n", + ls->ls_num_nodes, n, node);); + nodeid = memb->nodeid; + out: + return nodeid; +} + +int dlm_dir_nodeid(struct dlm_rsb *r) +{ + return dlm_hash2nodeid(r->res_ls, r->res_hash); +} + +static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len) +{ + uint32_t val; + + val = jhash(name, len, 0); + val &= (ls->ls_dirtbl_size - 1); + + return val; +} + +static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de) +{ + uint32_t bucket; + + bucket = dir_hash(ls, de->name, de->length); + list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); +} + +static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, + int namelen, uint32_t bucket) +{ + struct dlm_direntry *de; + + list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) { + if (de->length == namelen && !memcmp(name, de->name, namelen)) + goto out; + } + de = NULL; + out: + return de; +} + +void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen) +{ + struct dlm_direntry *de; + uint32_t bucket; + + bucket = dir_hash(ls, name, namelen); + + write_lock(&ls->ls_dirtbl[bucket].lock); + + de = search_bucket(ls, name, namelen, bucket); + + if (!de) { + log_error(ls, "remove fr %u none", nodeid); + goto out; + } + + if (de->master_nodeid != nodeid) { + log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid); + goto out; + } + + list_del(&de->list); + free_direntry(de); + out: + write_unlock(&ls->ls_dirtbl[bucket].lock); +} + +void dlm_dir_clear(struct dlm_ls *ls) +{ + struct list_head *head; + struct dlm_direntry *de; + int i; + + DLM_ASSERT(list_empty(&ls->ls_recover_list), ); + + for (i = 0; i < ls->ls_dirtbl_size; i++) { + write_lock(&ls->ls_dirtbl[i].lock); + head = &ls->ls_dirtbl[i].list; + while (!list_empty(head)) { + de = list_entry(head->next, struct dlm_direntry, list); + list_del(&de->list); + put_free_de(ls, de); + } + write_unlock(&ls->ls_dirtbl[i].lock); + } +} + +int dlm_recover_directory(struct dlm_ls *ls) +{ + struct dlm_member *memb; + struct dlm_direntry *de; + char *b, *last_name = NULL; + int error = -ENOMEM, last_len, count = 0; + uint16_t namelen; + + log_debug(ls, "dlm_recover_directory"); + + if (dlm_no_directory(ls)) + goto out_status; + + dlm_dir_clear(ls); + + last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL); + if (!last_name) + goto out; + + list_for_each_entry(memb, &ls->ls_nodes, list) { + memset(last_name, 0, DLM_RESNAME_MAXLEN); + last_len = 0; + + for (;;) { + error = dlm_recovery_stopped(ls); + if (error) + goto out_free; + + error = dlm_rcom_names(ls, memb->nodeid, + last_name, last_len); + if (error) + goto out_free; + + schedule(); + + /* + * pick namelen/name pairs out of received buffer + */ + + b = ls->ls_recover_buf + sizeof(struct dlm_rcom); + + for (;;) { + memcpy(&namelen, b, sizeof(uint16_t)); + namelen = be16_to_cpu(namelen); + b += sizeof(uint16_t); + + /* namelen of 0xFFFFF marks end of names for + this node; namelen of 0 marks end of the + buffer */ + + if (namelen == 0xFFFF) + goto done; + if (!namelen) + break; + + error = -ENOMEM; + de = get_free_de(ls, namelen); + if (!de) + goto out_free; + + de->master_nodeid = memb->nodeid; + de->length = namelen; + last_len = namelen; + memcpy(de->name, b, namelen); + memcpy(last_name, b, namelen); + b += namelen; + + add_entry_to_hash(ls, de); + count++; + } + } + done: + ; + } + + out_status: + error = 0; + dlm_set_recover_status(ls, DLM_RS_DIR); + log_debug(ls, "dlm_recover_directory %d entries", count); + out_free: + kfree(last_name); + out: + dlm_clear_free_entries(ls); + return error; +} + +static int get_entry(struct dlm_ls *ls, int nodeid, char *name, + int namelen, int *r_nodeid) +{ + struct dlm_direntry *de, *tmp; + uint32_t bucket; + + bucket = dir_hash(ls, name, namelen); + + write_lock(&ls->ls_dirtbl[bucket].lock); + de = search_bucket(ls, name, namelen, bucket); + if (de) { + *r_nodeid = de->master_nodeid; + write_unlock(&ls->ls_dirtbl[bucket].lock); + if (*r_nodeid == nodeid) + return -EEXIST; + return 0; + } + + write_unlock(&ls->ls_dirtbl[bucket].lock); + + de = allocate_direntry(ls, namelen); + if (!de) + return -ENOMEM; + + de->master_nodeid = nodeid; + de->length = namelen; + memcpy(de->name, name, namelen); + + write_lock(&ls->ls_dirtbl[bucket].lock); + tmp = search_bucket(ls, name, namelen, bucket); + if (tmp) { + free_direntry(de); + de = tmp; + } else { + list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); + } + *r_nodeid = de->master_nodeid; + write_unlock(&ls->ls_dirtbl[bucket].lock); + return 0; +} + +int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen, + int *r_nodeid) +{ + return get_entry(ls, nodeid, name, namelen, r_nodeid); +} + +/* Copy the names of master rsb's into the buffer provided. + Only select names whose dir node is the given nodeid. */ + +void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, + char *outbuf, int outlen, int nodeid) +{ + struct list_head *list; + struct dlm_rsb *start_r = NULL, *r = NULL; + int offset = 0, start_namelen, error, dir_nodeid; + char *start_name; + uint16_t be_namelen; + + /* + * Find the rsb where we left off (or start again) + */ + + start_namelen = inlen; + start_name = inbuf; + + if (start_namelen > 1) { + /* + * We could also use a find_rsb_root() function here that + * searched the ls_root_list. + */ + error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER, + &start_r); + DLM_ASSERT(!error && start_r, + printk("error %d\n", error);); + DLM_ASSERT(!list_empty(&start_r->res_root_list), + dlm_print_rsb(start_r);); + dlm_put_rsb(start_r); + } + + /* + * Send rsb names for rsb's we're master of and whose directory node + * matches the requesting node. + */ + + down_read(&ls->ls_root_sem); + if (start_r) + list = start_r->res_root_list.next; + else + list = ls->ls_root_list.next; + + for (offset = 0; list != &ls->ls_root_list; list = list->next) { + r = list_entry(list, struct dlm_rsb, res_root_list); + if (r->res_nodeid) + continue; + + dir_nodeid = dlm_dir_nodeid(r); + if (dir_nodeid != nodeid) + continue; + + /* + * The block ends when we can't fit the following in the + * remaining buffer space: + * namelen (uint16_t) + + * name (r->res_length) + + * end-of-block record 0x0000 (uint16_t) + */ + + if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) { + /* Write end-of-block record */ + be_namelen = 0; + memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); + offset += sizeof(uint16_t); + goto out; + } + + be_namelen = cpu_to_be16(r->res_length); + memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); + offset += sizeof(uint16_t); + memcpy(outbuf + offset, r->res_name, r->res_length); + offset += r->res_length; + } + + /* + * If we've reached the end of the list (and there's room) write a + * terminating record. + */ + + if ((list == &ls->ls_root_list) && + (offset + sizeof(uint16_t) <= outlen)) { + be_namelen = 0xFFFF; + memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); + offset += sizeof(uint16_t); + } + + out: + up_read(&ls->ls_root_sem); +} + -- cgit v1.2.3-70-g09d2