cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

member.c (16804B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/******************************************************************************
      3*******************************************************************************
      4**
      5**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
      6**
      7**
      8*******************************************************************************
      9******************************************************************************/
     10
     11#include "dlm_internal.h"
     12#include "lockspace.h"
     13#include "member.h"
     14#include "recoverd.h"
     15#include "recover.h"
     16#include "rcom.h"
     17#include "config.h"
     18#include "midcomms.h"
     19#include "lowcomms.h"
     20
     21int dlm_slots_version(struct dlm_header *h)
     22{
     23	if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
     24		return 0;
     25	return 1;
     26}
     27
     28void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
     29		   struct dlm_member *memb)
     30{
     31	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
     32
     33	if (!dlm_slots_version(&rc->rc_header))
     34		return;
     35
     36	memb->slot = le16_to_cpu(rf->rf_our_slot);
     37	memb->generation = le32_to_cpu(rf->rf_generation);
     38}
     39
     40void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
     41{
     42	struct dlm_slot *slot;
     43	struct rcom_slot *ro;
     44	int i;
     45
     46	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
     47
     48	/* ls_slots array is sparse, but not rcom_slots */
     49
     50	for (i = 0; i < ls->ls_slots_size; i++) {
     51		slot = &ls->ls_slots[i];
     52		if (!slot->nodeid)
     53			continue;
     54		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
     55		ro->ro_slot = cpu_to_le16(slot->slot);
     56		ro++;
     57	}
     58}
     59
     60#define SLOT_DEBUG_LINE 128
     61
     62static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
     63		      struct rcom_slot *ro0, struct dlm_slot *array,
     64		      int array_size)
     65{
     66	char line[SLOT_DEBUG_LINE];
     67	int len = SLOT_DEBUG_LINE - 1;
     68	int pos = 0;
     69	int ret, i;
     70
     71	memset(line, 0, sizeof(line));
     72
     73	if (array) {
     74		for (i = 0; i < array_size; i++) {
     75			if (!array[i].nodeid)
     76				continue;
     77
     78			ret = snprintf(line + pos, len - pos, " %d:%d",
     79				       array[i].slot, array[i].nodeid);
     80			if (ret >= len - pos)
     81				break;
     82			pos += ret;
     83		}
     84	} else if (ro0) {
     85		for (i = 0; i < num_slots; i++) {
     86			ret = snprintf(line + pos, len - pos, " %d:%d",
     87				       ro0[i].ro_slot, ro0[i].ro_nodeid);
     88			if (ret >= len - pos)
     89				break;
     90			pos += ret;
     91		}
     92	}
     93
     94	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
     95}
     96
     97int dlm_slots_copy_in(struct dlm_ls *ls)
     98{
     99	struct dlm_member *memb;
    100	struct dlm_rcom *rc = ls->ls_recover_buf;
    101	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
    102	struct rcom_slot *ro0, *ro;
    103	int our_nodeid = dlm_our_nodeid();
    104	int i, num_slots;
    105	uint32_t gen;
    106
    107	if (!dlm_slots_version(&rc->rc_header))
    108		return -1;
    109
    110	gen = le32_to_cpu(rf->rf_generation);
    111	if (gen <= ls->ls_generation) {
    112		log_error(ls, "dlm_slots_copy_in gen %u old %u",
    113			  gen, ls->ls_generation);
    114	}
    115	ls->ls_generation = gen;
    116
    117	num_slots = le16_to_cpu(rf->rf_num_slots);
    118	if (!num_slots)
    119		return -1;
    120
    121	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
    122
    123	log_slots(ls, gen, num_slots, ro0, NULL, 0);
    124
    125	list_for_each_entry(memb, &ls->ls_nodes, list) {
    126		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
    127			if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
    128				continue;
    129			memb->slot = le16_to_cpu(ro->ro_slot);
    130			memb->slot_prev = memb->slot;
    131			break;
    132		}
    133
    134		if (memb->nodeid == our_nodeid) {
    135			if (ls->ls_slot && ls->ls_slot != memb->slot) {
    136				log_error(ls, "dlm_slots_copy_in our slot "
    137					  "changed %d %d", ls->ls_slot,
    138					  memb->slot);
    139				return -1;
    140			}
    141
    142			if (!ls->ls_slot)
    143				ls->ls_slot = memb->slot;
    144		}
    145
    146		if (!memb->slot) {
    147			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
    148				   memb->nodeid);
    149			return -1;
    150		}
    151	}
    152
    153	return 0;
    154}
    155
    156/* for any nodes that do not support slots, we will not have set memb->slot
    157   in wait_status_all(), so memb->slot will remain -1, and we will not
    158   assign slots or set ls_num_slots here */
    159
    160int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
    161		     struct dlm_slot **slots_out, uint32_t *gen_out)
    162{
    163	struct dlm_member *memb;
    164	struct dlm_slot *array;
    165	int our_nodeid = dlm_our_nodeid();
    166	int array_size, max_slots, i;
    167	int need = 0;
    168	int max = 0;
    169	int num = 0;
    170	uint32_t gen = 0;
    171
    172	/* our own memb struct will have slot -1 gen 0 */
    173
    174	list_for_each_entry(memb, &ls->ls_nodes, list) {
    175		if (memb->nodeid == our_nodeid) {
    176			memb->slot = ls->ls_slot;
    177			memb->generation = ls->ls_generation;
    178			break;
    179		}
    180	}
    181
    182	list_for_each_entry(memb, &ls->ls_nodes, list) {
    183		if (memb->generation > gen)
    184			gen = memb->generation;
    185
    186		/* node doesn't support slots */
    187
    188		if (memb->slot == -1)
    189			return -1;
    190
    191		/* node needs a slot assigned */
    192
    193		if (!memb->slot)
    194			need++;
    195
    196		/* node has a slot assigned */
    197
    198		num++;
    199
    200		if (!max || max < memb->slot)
    201			max = memb->slot;
    202
    203		/* sanity check, once slot is assigned it shouldn't change */
    204
    205		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
    206			log_error(ls, "nodeid %d slot changed %d %d",
    207				  memb->nodeid, memb->slot_prev, memb->slot);
    208			return -1;
    209		}
    210		memb->slot_prev = memb->slot;
    211	}
    212
    213	array_size = max + need;
    214	array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
    215	if (!array)
    216		return -ENOMEM;
    217
    218	num = 0;
    219
    220	/* fill in slots (offsets) that are used */
    221
    222	list_for_each_entry(memb, &ls->ls_nodes, list) {
    223		if (!memb->slot)
    224			continue;
    225
    226		if (memb->slot > array_size) {
    227			log_error(ls, "invalid slot number %d", memb->slot);
    228			kfree(array);
    229			return -1;
    230		}
    231
    232		array[memb->slot - 1].nodeid = memb->nodeid;
    233		array[memb->slot - 1].slot = memb->slot;
    234		num++;
    235	}
    236
    237	/* assign new slots from unused offsets */
    238
    239	list_for_each_entry(memb, &ls->ls_nodes, list) {
    240		if (memb->slot)
    241			continue;
    242
    243		for (i = 0; i < array_size; i++) {
    244			if (array[i].nodeid)
    245				continue;
    246
    247			memb->slot = i + 1;
    248			memb->slot_prev = memb->slot;
    249			array[i].nodeid = memb->nodeid;
    250			array[i].slot = memb->slot;
    251			num++;
    252
    253			if (!ls->ls_slot && memb->nodeid == our_nodeid)
    254				ls->ls_slot = memb->slot;
    255			break;
    256		}
    257
    258		if (!memb->slot) {
    259			log_error(ls, "no free slot found");
    260			kfree(array);
    261			return -1;
    262		}
    263	}
    264
    265	gen++;
    266
    267	log_slots(ls, gen, num, NULL, array, array_size);
    268
    269	max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
    270		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
    271
    272	if (num > max_slots) {
    273		log_error(ls, "num_slots %d exceeds max_slots %d",
    274			  num, max_slots);
    275		kfree(array);
    276		return -1;
    277	}
    278
    279	*gen_out = gen;
    280	*slots_out = array;
    281	*slots_size = array_size;
    282	*num_slots = num;
    283	return 0;
    284}
    285
    286static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
    287{
    288	struct dlm_member *memb = NULL;
    289	struct list_head *tmp;
    290	struct list_head *newlist = &new->list;
    291	struct list_head *head = &ls->ls_nodes;
    292
    293	list_for_each(tmp, head) {
    294		memb = list_entry(tmp, struct dlm_member, list);
    295		if (new->nodeid < memb->nodeid)
    296			break;
    297	}
    298
    299	if (!memb)
    300		list_add_tail(newlist, head);
    301	else {
    302		/* FIXME: can use list macro here */
    303		newlist->prev = tmp->prev;
    304		newlist->next = tmp;
    305		tmp->prev->next = newlist;
    306		tmp->prev = newlist;
    307	}
    308}
    309
    310static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
    311{
    312	struct dlm_member *memb;
    313	int error;
    314
    315	memb = kzalloc(sizeof(*memb), GFP_NOFS);
    316	if (!memb)
    317		return -ENOMEM;
    318
    319	error = dlm_lowcomms_connect_node(node->nodeid);
    320	if (error < 0) {
    321		kfree(memb);
    322		return error;
    323	}
    324
    325	memb->nodeid = node->nodeid;
    326	memb->weight = node->weight;
    327	memb->comm_seq = node->comm_seq;
    328	dlm_midcomms_add_member(node->nodeid);
    329	add_ordered_member(ls, memb);
    330	ls->ls_num_nodes++;
    331	return 0;
    332}
    333
    334static struct dlm_member *find_memb(struct list_head *head, int nodeid)
    335{
    336	struct dlm_member *memb;
    337
    338	list_for_each_entry(memb, head, list) {
    339		if (memb->nodeid == nodeid)
    340			return memb;
    341	}
    342	return NULL;
    343}
    344
    345int dlm_is_member(struct dlm_ls *ls, int nodeid)
    346{
    347	if (find_memb(&ls->ls_nodes, nodeid))
    348		return 1;
    349	return 0;
    350}
    351
    352int dlm_is_removed(struct dlm_ls *ls, int nodeid)
    353{
    354	if (find_memb(&ls->ls_nodes_gone, nodeid))
    355		return 1;
    356	return 0;
    357}
    358
    359static void clear_memb_list(struct list_head *head,
    360			    void (*after_del)(int nodeid))
    361{
    362	struct dlm_member *memb;
    363
    364	while (!list_empty(head)) {
    365		memb = list_entry(head->next, struct dlm_member, list);
    366		list_del(&memb->list);
    367		if (after_del)
    368			after_del(memb->nodeid);
    369		kfree(memb);
    370	}
    371}
    372
    373static void clear_members_cb(int nodeid)
    374{
    375	dlm_midcomms_remove_member(nodeid);
    376}
    377
    378void dlm_clear_members(struct dlm_ls *ls)
    379{
    380	clear_memb_list(&ls->ls_nodes, clear_members_cb);
    381	ls->ls_num_nodes = 0;
    382}
    383
    384void dlm_clear_members_gone(struct dlm_ls *ls)
    385{
    386	clear_memb_list(&ls->ls_nodes_gone, NULL);
    387}
    388
    389static void make_member_array(struct dlm_ls *ls)
    390{
    391	struct dlm_member *memb;
    392	int i, w, x = 0, total = 0, all_zero = 0, *array;
    393
    394	kfree(ls->ls_node_array);
    395	ls->ls_node_array = NULL;
    396
    397	list_for_each_entry(memb, &ls->ls_nodes, list) {
    398		if (memb->weight)
    399			total += memb->weight;
    400	}
    401
    402	/* all nodes revert to weight of 1 if all have weight 0 */
    403
    404	if (!total) {
    405		total = ls->ls_num_nodes;
    406		all_zero = 1;
    407	}
    408
    409	ls->ls_total_weight = total;
    410	array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
    411	if (!array)
    412		return;
    413
    414	list_for_each_entry(memb, &ls->ls_nodes, list) {
    415		if (!all_zero && !memb->weight)
    416			continue;
    417
    418		if (all_zero)
    419			w = 1;
    420		else
    421			w = memb->weight;
    422
    423		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
    424
    425		for (i = 0; i < w; i++)
    426			array[x++] = memb->nodeid;
    427	}
    428
    429	ls->ls_node_array = array;
    430}
    431
    432/* send a status request to all members just to establish comms connections */
    433
    434static int ping_members(struct dlm_ls *ls)
    435{
    436	struct dlm_member *memb;
    437	int error = 0;
    438
    439	list_for_each_entry(memb, &ls->ls_nodes, list) {
    440		if (dlm_recovery_stopped(ls)) {
    441			error = -EINTR;
    442			break;
    443		}
    444		error = dlm_rcom_status(ls, memb->nodeid, 0);
    445		if (error)
    446			break;
    447	}
    448	if (error)
    449		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
    450			  error, ls->ls_recover_nodeid);
    451	return error;
    452}
    453
    454static void dlm_lsop_recover_prep(struct dlm_ls *ls)
    455{
    456	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
    457		return;
    458	ls->ls_ops->recover_prep(ls->ls_ops_arg);
    459}
    460
    461static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
    462{
    463	struct dlm_slot slot;
    464	uint32_t seq;
    465	int error;
    466
    467	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
    468		return;
    469
    470	/* if there is no comms connection with this node
    471	   or the present comms connection is newer
    472	   than the one when this member was added, then
    473	   we consider the node to have failed (versus
    474	   being removed due to dlm_release_lockspace) */
    475
    476	error = dlm_comm_seq(memb->nodeid, &seq);
    477
    478	if (!error && seq == memb->comm_seq)
    479		return;
    480
    481	slot.nodeid = memb->nodeid;
    482	slot.slot = memb->slot;
    483
    484	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
    485}
    486
    487void dlm_lsop_recover_done(struct dlm_ls *ls)
    488{
    489	struct dlm_member *memb;
    490	struct dlm_slot *slots;
    491	int i, num;
    492
    493	if (!ls->ls_ops || !ls->ls_ops->recover_done)
    494		return;
    495
    496	num = ls->ls_num_nodes;
    497	slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
    498	if (!slots)
    499		return;
    500
    501	i = 0;
    502	list_for_each_entry(memb, &ls->ls_nodes, list) {
    503		if (i == num) {
    504			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
    505			goto out;
    506		}
    507		slots[i].nodeid = memb->nodeid;
    508		slots[i].slot = memb->slot;
    509		i++;
    510	}
    511
    512	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
    513				 ls->ls_slot, ls->ls_generation);
    514 out:
    515	kfree(slots);
    516}
    517
    518static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
    519						int nodeid)
    520{
    521	int i;
    522
    523	for (i = 0; i < rv->nodes_count; i++) {
    524		if (rv->nodes[i].nodeid == nodeid)
    525			return &rv->nodes[i];
    526	}
    527	return NULL;
    528}
    529
    530int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
    531{
    532	struct dlm_member *memb, *safe;
    533	struct dlm_config_node *node;
    534	int i, error, neg = 0, low = -1;
    535
    536	/* previously removed members that we've not finished removing need to
    537	   count as a negative change so the "neg" recovery steps will happen */
    538
    539	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
    540		log_rinfo(ls, "prev removed member %d", memb->nodeid);
    541		neg++;
    542	}
    543
    544	/* move departed members from ls_nodes to ls_nodes_gone */
    545
    546	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
    547		node = find_config_node(rv, memb->nodeid);
    548		if (node && !node->new)
    549			continue;
    550
    551		if (!node) {
    552			log_rinfo(ls, "remove member %d", memb->nodeid);
    553		} else {
    554			/* removed and re-added */
    555			log_rinfo(ls, "remove member %d comm_seq %u %u",
    556				  memb->nodeid, memb->comm_seq, node->comm_seq);
    557		}
    558
    559		neg++;
    560		list_move(&memb->list, &ls->ls_nodes_gone);
    561		dlm_midcomms_remove_member(memb->nodeid);
    562		ls->ls_num_nodes--;
    563		dlm_lsop_recover_slot(ls, memb);
    564	}
    565
    566	/* add new members to ls_nodes */
    567
    568	for (i = 0; i < rv->nodes_count; i++) {
    569		node = &rv->nodes[i];
    570		if (dlm_is_member(ls, node->nodeid))
    571			continue;
    572		dlm_add_member(ls, node);
    573		log_rinfo(ls, "add member %d", node->nodeid);
    574	}
    575
    576	list_for_each_entry(memb, &ls->ls_nodes, list) {
    577		if (low == -1 || memb->nodeid < low)
    578			low = memb->nodeid;
    579	}
    580	ls->ls_low_nodeid = low;
    581
    582	make_member_array(ls);
    583	*neg_out = neg;
    584
    585	error = ping_members(ls);
    586	/* error -EINTR means that a new recovery action is triggered.
    587	 * We ignore this recovery action and let run the new one which might
    588	 * have new member configuration.
    589	 */
    590	if (error == -EINTR)
    591		error = 0;
    592
    593	/* new_lockspace() may be waiting to know if the config
    594	 * is good or bad
    595	 */
    596	ls->ls_members_result = error;
    597	complete(&ls->ls_members_done);
    598
    599	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
    600	return error;
    601}
    602
    603/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
    604   dlm_ls_start() is called on any of them to start the new recovery. */
    605
    606int dlm_ls_stop(struct dlm_ls *ls)
    607{
    608	int new;
    609
    610	/*
    611	 * Prevent dlm_recv from being in the middle of something when we do
    612	 * the stop.  This includes ensuring dlm_recv isn't processing a
    613	 * recovery message (rcom), while dlm_recoverd is aborting and
    614	 * resetting things from an in-progress recovery.  i.e. we want
    615	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
    616	 * processing an rcom at the same time.  Stopping dlm_recv also makes
    617	 * it easy for dlm_receive_message() to check locking stopped and add a
    618	 * message to the requestqueue without races.
    619	 */
    620
    621	down_write(&ls->ls_recv_active);
    622
    623	/*
    624	 * Abort any recovery that's in progress (see RECOVER_STOP,
    625	 * dlm_recovery_stopped()) and tell any other threads running in the
    626	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
    627	 */
    628
    629	spin_lock(&ls->ls_recover_lock);
    630	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
    631	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
    632	ls->ls_recover_seq++;
    633	spin_unlock(&ls->ls_recover_lock);
    634
    635	/*
    636	 * Let dlm_recv run again, now any normal messages will be saved on the
    637	 * requestqueue for later.
    638	 */
    639
    640	up_write(&ls->ls_recv_active);
    641
    642	/*
    643	 * This in_recovery lock does two things:
    644	 * 1) Keeps this function from returning until all threads are out
    645	 *    of locking routines and locking is truly stopped.
    646	 * 2) Keeps any new requests from being processed until it's unlocked
    647	 *    when recovery is complete.
    648	 */
    649
    650	if (new) {
    651		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
    652		wake_up_process(ls->ls_recoverd_task);
    653		wait_event(ls->ls_recover_lock_wait,
    654			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
    655	}
    656
    657	/*
    658	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
    659	 * running) has noticed RECOVER_STOP above and quit processing the
    660	 * previous recovery.
    661	 */
    662
    663	dlm_recoverd_suspend(ls);
    664
    665	spin_lock(&ls->ls_recover_lock);
    666	kfree(ls->ls_slots);
    667	ls->ls_slots = NULL;
    668	ls->ls_num_slots = 0;
    669	ls->ls_slots_size = 0;
    670	ls->ls_recover_status = 0;
    671	spin_unlock(&ls->ls_recover_lock);
    672
    673	dlm_recoverd_resume(ls);
    674
    675	if (!ls->ls_recover_begin)
    676		ls->ls_recover_begin = jiffies;
    677
    678	dlm_lsop_recover_prep(ls);
    679	return 0;
    680}
    681
    682int dlm_ls_start(struct dlm_ls *ls)
    683{
    684	struct dlm_recover *rv, *rv_old;
    685	struct dlm_config_node *nodes = NULL;
    686	int error, count;
    687
    688	rv = kzalloc(sizeof(*rv), GFP_NOFS);
    689	if (!rv)
    690		return -ENOMEM;
    691
    692	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
    693	if (error < 0)
    694		goto fail_rv;
    695
    696	spin_lock(&ls->ls_recover_lock);
    697
    698	/* the lockspace needs to be stopped before it can be started */
    699
    700	if (!dlm_locking_stopped(ls)) {
    701		spin_unlock(&ls->ls_recover_lock);
    702		log_error(ls, "start ignored: lockspace running");
    703		error = -EINVAL;
    704		goto fail;
    705	}
    706
    707	rv->nodes = nodes;
    708	rv->nodes_count = count;
    709	rv->seq = ++ls->ls_recover_seq;
    710	rv_old = ls->ls_recover_args;
    711	ls->ls_recover_args = rv;
    712	spin_unlock(&ls->ls_recover_lock);
    713
    714	if (rv_old) {
    715		log_error(ls, "unused recovery %llx %d",
    716			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
    717		kfree(rv_old->nodes);
    718		kfree(rv_old);
    719	}
    720
    721	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
    722	wake_up_process(ls->ls_recoverd_task);
    723	return 0;
    724
    725 fail:
    726	kfree(nodes);
    727 fail_rv:
    728	kfree(rv);
    729	return error;
    730}
    731