cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

stack_user.c (29418B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/*
      3 * stack_user.c
      4 *
      5 * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
      6 *
      7 * Copyright (C) 2007 Oracle.  All rights reserved.
      8 */
      9
     10#include <linux/module.h>
     11#include <linux/fs.h>
     12#include <linux/miscdevice.h>
     13#include <linux/mutex.h>
     14#include <linux/slab.h>
     15#include <linux/reboot.h>
     16#include <linux/sched.h>
     17#include <linux/uaccess.h>
     18
     19#include "stackglue.h"
     20
     21#include <linux/dlm_plock.h>
     22
     23/*
     24 * The control protocol starts with a handshake.  Until the handshake
     25 * is complete, the control device will fail all write(2)s.
     26 *
     27 * The handshake is simple.  First, the client reads until EOF.  Each line
     28 * of output is a supported protocol tag.  All protocol tags are a single
     29 * character followed by a two hex digit version number.  Currently the
     30 * only things supported is T01, for "Text-base version 0x01".  Next, the
     31 * client writes the version they would like to use, including the newline.
     32 * Thus, the protocol tag is 'T01\n'.  If the version tag written is
     33 * unknown, -EINVAL is returned.  Once the negotiation is complete, the
     34 * client can start sending messages.
     35 *
     36 * The T01 protocol has three messages.  First is the "SETN" message.
     37 * It has the following syntax:
     38 *
     39 *  SETN<space><8-char-hex-nodenum><newline>
     40 *
     41 * This is 14 characters.
     42 *
     43 * The "SETN" message must be the first message following the protocol.
     44 * It tells ocfs2_control the local node number.
     45 *
     46 * Next comes the "SETV" message.  It has the following syntax:
     47 *
     48 *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
     49 *
     50 * This is 11 characters.
     51 *
     52 * The "SETV" message sets the filesystem locking protocol version as
     53 * negotiated by the client.  The client negotiates based on the maximum
     54 * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
     55 * number from the "SETV" message must match
     56 * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
     57 * must be less than or equal to ...sp_max_version.pv_minor.
     58 *
     59 * Once this information has been set, mounts will be allowed.  From this
     60 * point on, the "DOWN" message can be sent for node down notification.
     61 * It has the following syntax:
     62 *
     63 *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
     64 *
     65 * eg:
     66 *
     67 *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
     68 *
     69 * This is 47 characters.
     70 */
     71
     72/*
     73 * Whether or not the client has done the handshake.
     74 * For now, we have just one protocol version.
     75 */
     76#define OCFS2_CONTROL_PROTO			"T01\n"
     77#define OCFS2_CONTROL_PROTO_LEN			4
     78
     79/* Handshake states */
     80#define OCFS2_CONTROL_HANDSHAKE_INVALID		(0)
     81#define OCFS2_CONTROL_HANDSHAKE_READ		(1)
     82#define OCFS2_CONTROL_HANDSHAKE_PROTOCOL	(2)
     83#define OCFS2_CONTROL_HANDSHAKE_VALID		(3)
     84
     85/* Messages */
     86#define OCFS2_CONTROL_MESSAGE_OP_LEN		4
     87#define OCFS2_CONTROL_MESSAGE_SETNODE_OP	"SETN"
     88#define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN	14
     89#define OCFS2_CONTROL_MESSAGE_SETVERSION_OP	"SETV"
     90#define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN	11
     91#define OCFS2_CONTROL_MESSAGE_DOWN_OP		"DOWN"
     92#define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN	47
     93#define OCFS2_TEXT_UUID_LEN			32
     94#define OCFS2_CONTROL_MESSAGE_VERNUM_LEN	2
     95#define OCFS2_CONTROL_MESSAGE_NODENUM_LEN	8
     96#define VERSION_LOCK				"version_lock"
     97
     98enum ocfs2_connection_type {
     99	WITH_CONTROLD,
    100	NO_CONTROLD
    101};
    102
    103/*
    104 * ocfs2_live_connection is refcounted because the filesystem and
    105 * miscdevice sides can detach in different order.  Let's just be safe.
    106 */
    107struct ocfs2_live_connection {
    108	struct list_head		oc_list;
    109	struct ocfs2_cluster_connection	*oc_conn;
    110	enum ocfs2_connection_type	oc_type;
    111	atomic_t                        oc_this_node;
    112	int                             oc_our_slot;
    113	struct dlm_lksb                 oc_version_lksb;
    114	char                            oc_lvb[DLM_LVB_LEN];
    115	struct completion               oc_sync_wait;
    116	wait_queue_head_t		oc_wait;
    117};
    118
    119struct ocfs2_control_private {
    120	struct list_head op_list;
    121	int op_state;
    122	int op_this_node;
    123	struct ocfs2_protocol_version op_proto;
    124};
    125
    126/* SETN<space><8-char-hex-nodenum><newline> */
    127struct ocfs2_control_message_setn {
    128	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
    129	char	space;
    130	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
    131	char	newline;
    132};
    133
    134/* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
    135struct ocfs2_control_message_setv {
    136	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
    137	char	space1;
    138	char	major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
    139	char	space2;
    140	char	minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
    141	char	newline;
    142};
    143
    144/* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
    145struct ocfs2_control_message_down {
    146	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
    147	char	space1;
    148	char	uuid[OCFS2_TEXT_UUID_LEN];
    149	char	space2;
    150	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
    151	char	newline;
    152};
    153
    154union ocfs2_control_message {
    155	char					tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
    156	struct ocfs2_control_message_setn	u_setn;
    157	struct ocfs2_control_message_setv	u_setv;
    158	struct ocfs2_control_message_down	u_down;
    159};
    160
    161static struct ocfs2_stack_plugin ocfs2_user_plugin;
    162
    163static atomic_t ocfs2_control_opened;
    164static int ocfs2_control_this_node = -1;
    165static struct ocfs2_protocol_version running_proto;
    166
    167static LIST_HEAD(ocfs2_live_connection_list);
    168static LIST_HEAD(ocfs2_control_private_list);
    169static DEFINE_MUTEX(ocfs2_control_lock);
    170
    171static inline void ocfs2_control_set_handshake_state(struct file *file,
    172						     int state)
    173{
    174	struct ocfs2_control_private *p = file->private_data;
    175	p->op_state = state;
    176}
    177
    178static inline int ocfs2_control_get_handshake_state(struct file *file)
    179{
    180	struct ocfs2_control_private *p = file->private_data;
    181	return p->op_state;
    182}
    183
    184static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
    185{
    186	size_t len = strlen(name);
    187	struct ocfs2_live_connection *c;
    188
    189	BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
    190
    191	list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
    192		if ((c->oc_conn->cc_namelen == len) &&
    193		    !strncmp(c->oc_conn->cc_name, name, len))
    194			return c;
    195	}
    196
    197	return NULL;
    198}
    199
    200/*
    201 * ocfs2_live_connection structures are created underneath the ocfs2
    202 * mount path.  Since the VFS prevents multiple calls to
    203 * fill_super(), we can't get dupes here.
    204 */
    205static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
    206				     struct ocfs2_live_connection *c)
    207{
    208	int rc = 0;
    209
    210	mutex_lock(&ocfs2_control_lock);
    211	c->oc_conn = conn;
    212
    213	if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
    214		list_add(&c->oc_list, &ocfs2_live_connection_list);
    215	else {
    216		printk(KERN_ERR
    217		       "ocfs2: Userspace control daemon is not present\n");
    218		rc = -ESRCH;
    219	}
    220
    221	mutex_unlock(&ocfs2_control_lock);
    222	return rc;
    223}
    224
    225/*
    226 * This function disconnects the cluster connection from ocfs2_control.
    227 * Afterwards, userspace can't affect the cluster connection.
    228 */
    229static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
    230{
    231	mutex_lock(&ocfs2_control_lock);
    232	list_del_init(&c->oc_list);
    233	c->oc_conn = NULL;
    234	mutex_unlock(&ocfs2_control_lock);
    235
    236	kfree(c);
    237}
    238
    239static int ocfs2_control_cfu(void *target, size_t target_len,
    240			     const char __user *buf, size_t count)
    241{
    242	/* The T01 expects write(2) calls to have exactly one command */
    243	if ((count != target_len) ||
    244	    (count > sizeof(union ocfs2_control_message)))
    245		return -EINVAL;
    246
    247	if (copy_from_user(target, buf, target_len))
    248		return -EFAULT;
    249
    250	return 0;
    251}
    252
    253static ssize_t ocfs2_control_validate_protocol(struct file *file,
    254					       const char __user *buf,
    255					       size_t count)
    256{
    257	ssize_t ret;
    258	char kbuf[OCFS2_CONTROL_PROTO_LEN];
    259
    260	ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
    261				buf, count);
    262	if (ret)
    263		return ret;
    264
    265	if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
    266		return -EINVAL;
    267
    268	ocfs2_control_set_handshake_state(file,
    269					  OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
    270
    271	return count;
    272}
    273
    274static void ocfs2_control_send_down(const char *uuid,
    275				    int nodenum)
    276{
    277	struct ocfs2_live_connection *c;
    278
    279	mutex_lock(&ocfs2_control_lock);
    280
    281	c = ocfs2_connection_find(uuid);
    282	if (c) {
    283		BUG_ON(c->oc_conn == NULL);
    284		c->oc_conn->cc_recovery_handler(nodenum,
    285						c->oc_conn->cc_recovery_data);
    286	}
    287
    288	mutex_unlock(&ocfs2_control_lock);
    289}
    290
    291/*
    292 * Called whenever configuration elements are sent to /dev/ocfs2_control.
    293 * If all configuration elements are present, try to set the global
    294 * values.  If there is a problem, return an error.  Skip any missing
    295 * elements, and only bump ocfs2_control_opened when we have all elements
    296 * and are successful.
    297 */
    298static int ocfs2_control_install_private(struct file *file)
    299{
    300	int rc = 0;
    301	int set_p = 1;
    302	struct ocfs2_control_private *p = file->private_data;
    303
    304	BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
    305
    306	mutex_lock(&ocfs2_control_lock);
    307
    308	if (p->op_this_node < 0) {
    309		set_p = 0;
    310	} else if ((ocfs2_control_this_node >= 0) &&
    311		   (ocfs2_control_this_node != p->op_this_node)) {
    312		rc = -EINVAL;
    313		goto out_unlock;
    314	}
    315
    316	if (!p->op_proto.pv_major) {
    317		set_p = 0;
    318	} else if (!list_empty(&ocfs2_live_connection_list) &&
    319		   ((running_proto.pv_major != p->op_proto.pv_major) ||
    320		    (running_proto.pv_minor != p->op_proto.pv_minor))) {
    321		rc = -EINVAL;
    322		goto out_unlock;
    323	}
    324
    325	if (set_p) {
    326		ocfs2_control_this_node = p->op_this_node;
    327		running_proto.pv_major = p->op_proto.pv_major;
    328		running_proto.pv_minor = p->op_proto.pv_minor;
    329	}
    330
    331out_unlock:
    332	mutex_unlock(&ocfs2_control_lock);
    333
    334	if (!rc && set_p) {
    335		/* We set the global values successfully */
    336		atomic_inc(&ocfs2_control_opened);
    337		ocfs2_control_set_handshake_state(file,
    338					OCFS2_CONTROL_HANDSHAKE_VALID);
    339	}
    340
    341	return rc;
    342}
    343
    344static int ocfs2_control_get_this_node(void)
    345{
    346	int rc;
    347
    348	mutex_lock(&ocfs2_control_lock);
    349	if (ocfs2_control_this_node < 0)
    350		rc = -EINVAL;
    351	else
    352		rc = ocfs2_control_this_node;
    353	mutex_unlock(&ocfs2_control_lock);
    354
    355	return rc;
    356}
    357
    358static int ocfs2_control_do_setnode_msg(struct file *file,
    359					struct ocfs2_control_message_setn *msg)
    360{
    361	long nodenum;
    362	char *ptr = NULL;
    363	struct ocfs2_control_private *p = file->private_data;
    364
    365	if (ocfs2_control_get_handshake_state(file) !=
    366	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
    367		return -EINVAL;
    368
    369	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
    370		    OCFS2_CONTROL_MESSAGE_OP_LEN))
    371		return -EINVAL;
    372
    373	if ((msg->space != ' ') || (msg->newline != '\n'))
    374		return -EINVAL;
    375	msg->space = msg->newline = '\0';
    376
    377	nodenum = simple_strtol(msg->nodestr, &ptr, 16);
    378	if (!ptr || *ptr)
    379		return -EINVAL;
    380
    381	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
    382	    (nodenum > INT_MAX) || (nodenum < 0))
    383		return -ERANGE;
    384	p->op_this_node = nodenum;
    385
    386	return ocfs2_control_install_private(file);
    387}
    388
    389static int ocfs2_control_do_setversion_msg(struct file *file,
    390					   struct ocfs2_control_message_setv *msg)
    391{
    392	long major, minor;
    393	char *ptr = NULL;
    394	struct ocfs2_control_private *p = file->private_data;
    395	struct ocfs2_protocol_version *max =
    396		&ocfs2_user_plugin.sp_max_proto;
    397
    398	if (ocfs2_control_get_handshake_state(file) !=
    399	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
    400		return -EINVAL;
    401
    402	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
    403		    OCFS2_CONTROL_MESSAGE_OP_LEN))
    404		return -EINVAL;
    405
    406	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
    407	    (msg->newline != '\n'))
    408		return -EINVAL;
    409	msg->space1 = msg->space2 = msg->newline = '\0';
    410
    411	major = simple_strtol(msg->major, &ptr, 16);
    412	if (!ptr || *ptr)
    413		return -EINVAL;
    414	minor = simple_strtol(msg->minor, &ptr, 16);
    415	if (!ptr || *ptr)
    416		return -EINVAL;
    417
    418	/*
    419	 * The major must be between 1 and 255, inclusive.  The minor
    420	 * must be between 0 and 255, inclusive.  The version passed in
    421	 * must be within the maximum version supported by the filesystem.
    422	 */
    423	if ((major == LONG_MIN) || (major == LONG_MAX) ||
    424	    (major > (u8)-1) || (major < 1))
    425		return -ERANGE;
    426	if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
    427	    (minor > (u8)-1) || (minor < 0))
    428		return -ERANGE;
    429	if ((major != max->pv_major) ||
    430	    (minor > max->pv_minor))
    431		return -EINVAL;
    432
    433	p->op_proto.pv_major = major;
    434	p->op_proto.pv_minor = minor;
    435
    436	return ocfs2_control_install_private(file);
    437}
    438
    439static int ocfs2_control_do_down_msg(struct file *file,
    440				     struct ocfs2_control_message_down *msg)
    441{
    442	long nodenum;
    443	char *p = NULL;
    444
    445	if (ocfs2_control_get_handshake_state(file) !=
    446	    OCFS2_CONTROL_HANDSHAKE_VALID)
    447		return -EINVAL;
    448
    449	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
    450		    OCFS2_CONTROL_MESSAGE_OP_LEN))
    451		return -EINVAL;
    452
    453	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
    454	    (msg->newline != '\n'))
    455		return -EINVAL;
    456	msg->space1 = msg->space2 = msg->newline = '\0';
    457
    458	nodenum = simple_strtol(msg->nodestr, &p, 16);
    459	if (!p || *p)
    460		return -EINVAL;
    461
    462	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
    463	    (nodenum > INT_MAX) || (nodenum < 0))
    464		return -ERANGE;
    465
    466	ocfs2_control_send_down(msg->uuid, nodenum);
    467
    468	return 0;
    469}
    470
    471static ssize_t ocfs2_control_message(struct file *file,
    472				     const char __user *buf,
    473				     size_t count)
    474{
    475	ssize_t ret;
    476	union ocfs2_control_message msg;
    477
    478	/* Try to catch padding issues */
    479	WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
    480		(sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
    481
    482	memset(&msg, 0, sizeof(union ocfs2_control_message));
    483	ret = ocfs2_control_cfu(&msg, count, buf, count);
    484	if (ret)
    485		goto out;
    486
    487	if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
    488	    !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
    489		     OCFS2_CONTROL_MESSAGE_OP_LEN))
    490		ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
    491	else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
    492		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
    493			  OCFS2_CONTROL_MESSAGE_OP_LEN))
    494		ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
    495	else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
    496		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
    497			  OCFS2_CONTROL_MESSAGE_OP_LEN))
    498		ret = ocfs2_control_do_down_msg(file, &msg.u_down);
    499	else
    500		ret = -EINVAL;
    501
    502out:
    503	return ret ? ret : count;
    504}
    505
    506static ssize_t ocfs2_control_write(struct file *file,
    507				   const char __user *buf,
    508				   size_t count,
    509				   loff_t *ppos)
    510{
    511	ssize_t ret;
    512
    513	switch (ocfs2_control_get_handshake_state(file)) {
    514		case OCFS2_CONTROL_HANDSHAKE_INVALID:
    515			ret = -EINVAL;
    516			break;
    517
    518		case OCFS2_CONTROL_HANDSHAKE_READ:
    519			ret = ocfs2_control_validate_protocol(file, buf,
    520							      count);
    521			break;
    522
    523		case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
    524		case OCFS2_CONTROL_HANDSHAKE_VALID:
    525			ret = ocfs2_control_message(file, buf, count);
    526			break;
    527
    528		default:
    529			BUG();
    530			ret = -EIO;
    531			break;
    532	}
    533
    534	return ret;
    535}
    536
    537/*
    538 * This is a naive version.  If we ever have a new protocol, we'll expand
    539 * it.  Probably using seq_file.
    540 */
    541static ssize_t ocfs2_control_read(struct file *file,
    542				  char __user *buf,
    543				  size_t count,
    544				  loff_t *ppos)
    545{
    546	ssize_t ret;
    547
    548	ret = simple_read_from_buffer(buf, count, ppos,
    549			OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
    550
    551	/* Have we read the whole protocol list? */
    552	if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
    553		ocfs2_control_set_handshake_state(file,
    554						  OCFS2_CONTROL_HANDSHAKE_READ);
    555
    556	return ret;
    557}
    558
    559static int ocfs2_control_release(struct inode *inode, struct file *file)
    560{
    561	struct ocfs2_control_private *p = file->private_data;
    562
    563	mutex_lock(&ocfs2_control_lock);
    564
    565	if (ocfs2_control_get_handshake_state(file) !=
    566	    OCFS2_CONTROL_HANDSHAKE_VALID)
    567		goto out;
    568
    569	if (atomic_dec_and_test(&ocfs2_control_opened)) {
    570		if (!list_empty(&ocfs2_live_connection_list)) {
    571			/* XXX: Do bad things! */
    572			printk(KERN_ERR
    573			       "ocfs2: Unexpected release of ocfs2_control!\n"
    574			       "       Loss of cluster connection requires "
    575			       "an emergency restart!\n");
    576			emergency_restart();
    577		}
    578		/*
    579		 * Last valid close clears the node number and resets
    580		 * the locking protocol version
    581		 */
    582		ocfs2_control_this_node = -1;
    583		running_proto.pv_major = 0;
    584		running_proto.pv_minor = 0;
    585	}
    586
    587out:
    588	list_del_init(&p->op_list);
    589	file->private_data = NULL;
    590
    591	mutex_unlock(&ocfs2_control_lock);
    592
    593	kfree(p);
    594
    595	return 0;
    596}
    597
    598static int ocfs2_control_open(struct inode *inode, struct file *file)
    599{
    600	struct ocfs2_control_private *p;
    601
    602	p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
    603	if (!p)
    604		return -ENOMEM;
    605	p->op_this_node = -1;
    606
    607	mutex_lock(&ocfs2_control_lock);
    608	file->private_data = p;
    609	list_add(&p->op_list, &ocfs2_control_private_list);
    610	mutex_unlock(&ocfs2_control_lock);
    611
    612	return 0;
    613}
    614
    615static const struct file_operations ocfs2_control_fops = {
    616	.open    = ocfs2_control_open,
    617	.release = ocfs2_control_release,
    618	.read    = ocfs2_control_read,
    619	.write   = ocfs2_control_write,
    620	.owner   = THIS_MODULE,
    621	.llseek  = default_llseek,
    622};
    623
    624static struct miscdevice ocfs2_control_device = {
    625	.minor		= MISC_DYNAMIC_MINOR,
    626	.name		= "ocfs2_control",
    627	.fops		= &ocfs2_control_fops,
    628};
    629
    630static int ocfs2_control_init(void)
    631{
    632	int rc;
    633
    634	atomic_set(&ocfs2_control_opened, 0);
    635
    636	rc = misc_register(&ocfs2_control_device);
    637	if (rc)
    638		printk(KERN_ERR
    639		       "ocfs2: Unable to register ocfs2_control device "
    640		       "(errno %d)\n",
    641		       -rc);
    642
    643	return rc;
    644}
    645
    646static void ocfs2_control_exit(void)
    647{
    648	misc_deregister(&ocfs2_control_device);
    649}
    650
    651static void fsdlm_lock_ast_wrapper(void *astarg)
    652{
    653	struct ocfs2_dlm_lksb *lksb = astarg;
    654	int status = lksb->lksb_fsdlm.sb_status;
    655
    656	/*
    657	 * For now we're punting on the issue of other non-standard errors
    658	 * where we can't tell if the unlock_ast or lock_ast should be called.
    659	 * The main "other error" that's possible is EINVAL which means the
    660	 * function was called with invalid args, which shouldn't be possible
    661	 * since the caller here is under our control.  Other non-standard
    662	 * errors probably fall into the same category, or otherwise are fatal
    663	 * which means we can't carry on anyway.
    664	 */
    665
    666	if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
    667		lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
    668	else
    669		lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
    670}
    671
    672static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
    673{
    674	struct ocfs2_dlm_lksb *lksb = astarg;
    675
    676	lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
    677}
    678
    679static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
    680			 int mode,
    681			 struct ocfs2_dlm_lksb *lksb,
    682			 u32 flags,
    683			 void *name,
    684			 unsigned int namelen)
    685{
    686	if (!lksb->lksb_fsdlm.sb_lvbptr)
    687		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
    688					     sizeof(struct dlm_lksb);
    689
    690	return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
    691			flags|DLM_LKF_NODLCKWT, name, namelen, 0,
    692			fsdlm_lock_ast_wrapper, lksb,
    693			fsdlm_blocking_ast_wrapper);
    694}
    695
    696static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
    697			   struct ocfs2_dlm_lksb *lksb,
    698			   u32 flags)
    699{
    700	return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
    701			  flags, &lksb->lksb_fsdlm, lksb);
    702}
    703
    704static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
    705{
    706	return lksb->lksb_fsdlm.sb_status;
    707}
    708
    709static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
    710{
    711	int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
    712
    713	return !invalid;
    714}
    715
    716static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
    717{
    718	if (!lksb->lksb_fsdlm.sb_lvbptr)
    719		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
    720					     sizeof(struct dlm_lksb);
    721	return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
    722}
    723
    724static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
    725{
    726}
    727
    728static int user_plock(struct ocfs2_cluster_connection *conn,
    729		      u64 ino,
    730		      struct file *file,
    731		      int cmd,
    732		      struct file_lock *fl)
    733{
    734	/*
    735	 * This more or less just demuxes the plock request into any
    736	 * one of three dlm calls.
    737	 *
    738	 * Internally, fs/dlm will pass these to a misc device, which
    739	 * a userspace daemon will read and write to.
    740	 *
    741	 * For now, cancel requests (which happen internally only),
    742	 * are turned into unlocks. Most of this function taken from
    743	 * gfs2_lock.
    744	 */
    745
    746	if (cmd == F_CANCELLK) {
    747		cmd = F_SETLK;
    748		fl->fl_type = F_UNLCK;
    749	}
    750
    751	if (IS_GETLK(cmd))
    752		return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
    753	else if (fl->fl_type == F_UNLCK)
    754		return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
    755	else
    756		return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
    757}
    758
    759/*
    760 * Compare a requested locking protocol version against the current one.
    761 *
    762 * If the major numbers are different, they are incompatible.
    763 * If the current minor is greater than the request, they are incompatible.
    764 * If the current minor is less than or equal to the request, they are
    765 * compatible, and the requester should run at the current minor version.
    766 */
    767static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
    768			       struct ocfs2_protocol_version *request)
    769{
    770	if (existing->pv_major != request->pv_major)
    771		return 1;
    772
    773	if (existing->pv_minor > request->pv_minor)
    774		return 1;
    775
    776	if (existing->pv_minor < request->pv_minor)
    777		request->pv_minor = existing->pv_minor;
    778
    779	return 0;
    780}
    781
    782static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
    783{
    784	struct ocfs2_protocol_version *pv =
    785		(struct ocfs2_protocol_version *)lvb;
    786	/*
    787	 * ocfs2_protocol_version has two u8 variables, so we don't
    788	 * need any endian conversion.
    789	 */
    790	ver->pv_major = pv->pv_major;
    791	ver->pv_minor = pv->pv_minor;
    792}
    793
    794static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
    795{
    796	struct ocfs2_protocol_version *pv =
    797		(struct ocfs2_protocol_version *)lvb;
    798	/*
    799	 * ocfs2_protocol_version has two u8 variables, so we don't
    800	 * need any endian conversion.
    801	 */
    802	pv->pv_major = ver->pv_major;
    803	pv->pv_minor = ver->pv_minor;
    804}
    805
    806static void sync_wait_cb(void *arg)
    807{
    808	struct ocfs2_cluster_connection *conn = arg;
    809	struct ocfs2_live_connection *lc = conn->cc_private;
    810	complete(&lc->oc_sync_wait);
    811}
    812
    813static int sync_unlock(struct ocfs2_cluster_connection *conn,
    814		struct dlm_lksb *lksb, char *name)
    815{
    816	int error;
    817	struct ocfs2_live_connection *lc = conn->cc_private;
    818
    819	error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
    820	if (error) {
    821		printk(KERN_ERR "%s lkid %x error %d\n",
    822				name, lksb->sb_lkid, error);
    823		return error;
    824	}
    825
    826	wait_for_completion(&lc->oc_sync_wait);
    827
    828	if (lksb->sb_status != -DLM_EUNLOCK) {
    829		printk(KERN_ERR "%s lkid %x status %d\n",
    830				name, lksb->sb_lkid, lksb->sb_status);
    831		return -1;
    832	}
    833	return 0;
    834}
    835
    836static int sync_lock(struct ocfs2_cluster_connection *conn,
    837		int mode, uint32_t flags,
    838		struct dlm_lksb *lksb, char *name)
    839{
    840	int error, status;
    841	struct ocfs2_live_connection *lc = conn->cc_private;
    842
    843	error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
    844			name, strlen(name),
    845			0, sync_wait_cb, conn, NULL);
    846	if (error) {
    847		printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
    848				name, lksb->sb_lkid, flags, mode, error);
    849		return error;
    850	}
    851
    852	wait_for_completion(&lc->oc_sync_wait);
    853
    854	status = lksb->sb_status;
    855
    856	if (status && status != -EAGAIN) {
    857		printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
    858				name, lksb->sb_lkid, flags, mode, status);
    859	}
    860
    861	return status;
    862}
    863
    864
    865static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
    866		int flags)
    867{
    868	struct ocfs2_live_connection *lc = conn->cc_private;
    869	return sync_lock(conn, mode, flags,
    870			&lc->oc_version_lksb, VERSION_LOCK);
    871}
    872
    873static int version_unlock(struct ocfs2_cluster_connection *conn)
    874{
    875	struct ocfs2_live_connection *lc = conn->cc_private;
    876	return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
    877}
    878
    879/* get_protocol_version()
    880 *
    881 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
    882 * The algorithm is:
    883 * 1. Attempt to take the lock in EX mode (non-blocking).
    884 * 2. If successful (which means it is the first mount), write the
    885 *    version number and downconvert to PR lock.
    886 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
    887 *    taking the PR lock.
    888 */
    889
    890static int get_protocol_version(struct ocfs2_cluster_connection *conn)
    891{
    892	int ret;
    893	struct ocfs2_live_connection *lc = conn->cc_private;
    894	struct ocfs2_protocol_version pv;
    895
    896	running_proto.pv_major =
    897		ocfs2_user_plugin.sp_max_proto.pv_major;
    898	running_proto.pv_minor =
    899		ocfs2_user_plugin.sp_max_proto.pv_minor;
    900
    901	lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
    902	ret = version_lock(conn, DLM_LOCK_EX,
    903			DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
    904	if (!ret) {
    905		conn->cc_version.pv_major = running_proto.pv_major;
    906		conn->cc_version.pv_minor = running_proto.pv_minor;
    907		version_to_lvb(&running_proto, lc->oc_lvb);
    908		version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
    909	} else if (ret == -EAGAIN) {
    910		ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
    911		if (ret)
    912			goto out;
    913		lvb_to_version(lc->oc_lvb, &pv);
    914
    915		if ((pv.pv_major != running_proto.pv_major) ||
    916				(pv.pv_minor > running_proto.pv_minor)) {
    917			ret = -EINVAL;
    918			goto out;
    919		}
    920
    921		conn->cc_version.pv_major = pv.pv_major;
    922		conn->cc_version.pv_minor = pv.pv_minor;
    923	}
    924out:
    925	return ret;
    926}
    927
    928static void user_recover_prep(void *arg)
    929{
    930}
    931
    932static void user_recover_slot(void *arg, struct dlm_slot *slot)
    933{
    934	struct ocfs2_cluster_connection *conn = arg;
    935	printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
    936			slot->nodeid, slot->slot);
    937	conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
    938
    939}
    940
    941static void user_recover_done(void *arg, struct dlm_slot *slots,
    942		int num_slots, int our_slot,
    943		uint32_t generation)
    944{
    945	struct ocfs2_cluster_connection *conn = arg;
    946	struct ocfs2_live_connection *lc = conn->cc_private;
    947	int i;
    948
    949	for (i = 0; i < num_slots; i++)
    950		if (slots[i].slot == our_slot) {
    951			atomic_set(&lc->oc_this_node, slots[i].nodeid);
    952			break;
    953		}
    954
    955	lc->oc_our_slot = our_slot;
    956	wake_up(&lc->oc_wait);
    957}
    958
    959static const struct dlm_lockspace_ops ocfs2_ls_ops = {
    960	.recover_prep = user_recover_prep,
    961	.recover_slot = user_recover_slot,
    962	.recover_done = user_recover_done,
    963};
    964
    965static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
    966{
    967	version_unlock(conn);
    968	dlm_release_lockspace(conn->cc_lockspace, 2);
    969	conn->cc_lockspace = NULL;
    970	ocfs2_live_connection_drop(conn->cc_private);
    971	conn->cc_private = NULL;
    972	return 0;
    973}
    974
    975static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
    976{
    977	dlm_lockspace_t *fsdlm;
    978	struct ocfs2_live_connection *lc;
    979	int rc, ops_rv;
    980
    981	BUG_ON(conn == NULL);
    982
    983	lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
    984	if (!lc)
    985		return -ENOMEM;
    986
    987	init_waitqueue_head(&lc->oc_wait);
    988	init_completion(&lc->oc_sync_wait);
    989	atomic_set(&lc->oc_this_node, 0);
    990	conn->cc_private = lc;
    991	lc->oc_type = NO_CONTROLD;
    992
    993	rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
    994			       DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
    995			       &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
    996	if (rc) {
    997		if (rc == -EEXIST || rc == -EPROTO)
    998			printk(KERN_ERR "ocfs2: Unable to create the "
    999				"lockspace %s (%d), because a ocfs2-tools "
   1000				"program is running on this file system "
   1001				"with the same name lockspace\n",
   1002				conn->cc_name, rc);
   1003		goto out;
   1004	}
   1005
   1006	if (ops_rv == -EOPNOTSUPP) {
   1007		lc->oc_type = WITH_CONTROLD;
   1008		printk(KERN_NOTICE "ocfs2: You seem to be using an older "
   1009				"version of dlm_controld and/or ocfs2-tools."
   1010				" Please consider upgrading.\n");
   1011	} else if (ops_rv) {
   1012		rc = ops_rv;
   1013		goto out;
   1014	}
   1015	conn->cc_lockspace = fsdlm;
   1016
   1017	rc = ocfs2_live_connection_attach(conn, lc);
   1018	if (rc)
   1019		goto out;
   1020
   1021	if (lc->oc_type == NO_CONTROLD) {
   1022		rc = get_protocol_version(conn);
   1023		if (rc) {
   1024			printk(KERN_ERR "ocfs2: Could not determine"
   1025					" locking version\n");
   1026			user_cluster_disconnect(conn);
   1027			goto out;
   1028		}
   1029		wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
   1030	}
   1031
   1032	/*
   1033	 * running_proto must have been set before we allowed any mounts
   1034	 * to proceed.
   1035	 */
   1036	if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
   1037		printk(KERN_ERR
   1038		       "Unable to mount with fs locking protocol version "
   1039		       "%u.%u because negotiated protocol is %u.%u\n",
   1040		       conn->cc_version.pv_major, conn->cc_version.pv_minor,
   1041		       running_proto.pv_major, running_proto.pv_minor);
   1042		rc = -EPROTO;
   1043		ocfs2_live_connection_drop(lc);
   1044		lc = NULL;
   1045	}
   1046
   1047out:
   1048	if (rc)
   1049		kfree(lc);
   1050	return rc;
   1051}
   1052
   1053
   1054static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
   1055				  unsigned int *this_node)
   1056{
   1057	int rc;
   1058	struct ocfs2_live_connection *lc = conn->cc_private;
   1059
   1060	if (lc->oc_type == WITH_CONTROLD)
   1061		rc = ocfs2_control_get_this_node();
   1062	else if (lc->oc_type == NO_CONTROLD)
   1063		rc = atomic_read(&lc->oc_this_node);
   1064	else
   1065		rc = -EINVAL;
   1066
   1067	if (rc < 0)
   1068		return rc;
   1069
   1070	*this_node = rc;
   1071	return 0;
   1072}
   1073
   1074static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
   1075	.connect	= user_cluster_connect,
   1076	.disconnect	= user_cluster_disconnect,
   1077	.this_node	= user_cluster_this_node,
   1078	.dlm_lock	= user_dlm_lock,
   1079	.dlm_unlock	= user_dlm_unlock,
   1080	.lock_status	= user_dlm_lock_status,
   1081	.lvb_valid	= user_dlm_lvb_valid,
   1082	.lock_lvb	= user_dlm_lvb,
   1083	.plock		= user_plock,
   1084	.dump_lksb	= user_dlm_dump_lksb,
   1085};
   1086
   1087static struct ocfs2_stack_plugin ocfs2_user_plugin = {
   1088	.sp_name	= "user",
   1089	.sp_ops		= &ocfs2_user_plugin_ops,
   1090	.sp_owner	= THIS_MODULE,
   1091};
   1092
   1093
   1094static int __init ocfs2_user_plugin_init(void)
   1095{
   1096	int rc;
   1097
   1098	rc = ocfs2_control_init();
   1099	if (!rc) {
   1100		rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
   1101		if (rc)
   1102			ocfs2_control_exit();
   1103	}
   1104
   1105	return rc;
   1106}
   1107
   1108static void __exit ocfs2_user_plugin_exit(void)
   1109{
   1110	ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
   1111	ocfs2_control_exit();
   1112}
   1113
   1114MODULE_AUTHOR("Oracle");
   1115MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
   1116MODULE_LICENSE("GPL");
   1117module_init(ocfs2_user_plugin_init);
   1118module_exit(ocfs2_user_plugin_exit);