sch_plug.c (6551B)
1// SPDX-License-Identifier: GPL-2.0-or-later 2/* 3 * sch_plug.c Queue traffic until an explicit release command 4 * 5 * There are two ways to use this qdisc: 6 * 1. A simple "instantaneous" plug/unplug operation, by issuing an alternating 7 * sequence of TCQ_PLUG_BUFFER & TCQ_PLUG_RELEASE_INDEFINITE commands. 8 * 9 * 2. For network output buffering (a.k.a output commit) functionality. 10 * Output commit property is commonly used by applications using checkpoint 11 * based fault-tolerance to ensure that the checkpoint from which a system 12 * is being restored is consistent w.r.t outside world. 13 * 14 * Consider for e.g. Remus - a Virtual Machine checkpointing system, 15 * wherein a VM is checkpointed, say every 50ms. The checkpoint is replicated 16 * asynchronously to the backup host, while the VM continues executing the 17 * next epoch speculatively. 18 * 19 * The following is a typical sequence of output buffer operations: 20 * 1.At epoch i, start_buffer(i) 21 * 2. At end of epoch i (i.e. after 50ms): 22 * 2.1 Stop VM and take checkpoint(i). 23 * 2.2 start_buffer(i+1) and Resume VM 24 * 3. While speculatively executing epoch(i+1), asynchronously replicate 25 * checkpoint(i) to backup host. 26 * 4. When checkpoint_ack(i) is received from backup, release_buffer(i) 27 * Thus, this Qdisc would receive the following sequence of commands: 28 * TCQ_PLUG_BUFFER (epoch i) 29 * .. TCQ_PLUG_BUFFER (epoch i+1) 30 * ....TCQ_PLUG_RELEASE_ONE (epoch i) 31 * ......TCQ_PLUG_BUFFER (epoch i+2) 32 * ........ 33 */ 34 35#include <linux/module.h> 36#include <linux/types.h> 37#include <linux/kernel.h> 38#include <linux/errno.h> 39#include <linux/netdevice.h> 40#include <linux/skbuff.h> 41#include <net/pkt_sched.h> 42 43/* 44 * State of the queue, when used for network output buffering: 45 * 46 * plug(i+1) plug(i) head 47 * ------------------+--------------------+----------------> 48 * | | 49 * | | 50 * pkts_current_epoch| pkts_last_epoch |pkts_to_release 51 * ----------------->|<--------+--------->|+---------------> 52 * v v 53 * 54 */ 55 56struct plug_sched_data { 57 /* If true, the dequeue function releases all packets 58 * from head to end of the queue. The queue turns into 59 * a pass-through queue for newly arriving packets. 60 */ 61 bool unplug_indefinite; 62 63 bool throttled; 64 65 /* Queue Limit in bytes */ 66 u32 limit; 67 68 /* Number of packets (output) from the current speculatively 69 * executing epoch. 70 */ 71 u32 pkts_current_epoch; 72 73 /* Number of packets corresponding to the recently finished 74 * epoch. These will be released when we receive a 75 * TCQ_PLUG_RELEASE_ONE command. This command is typically 76 * issued after committing a checkpoint at the target. 77 */ 78 u32 pkts_last_epoch; 79 80 /* 81 * Number of packets from the head of the queue, that can 82 * be released (committed checkpoint). 83 */ 84 u32 pkts_to_release; 85}; 86 87static int plug_enqueue(struct sk_buff *skb, struct Qdisc *sch, 88 struct sk_buff **to_free) 89{ 90 struct plug_sched_data *q = qdisc_priv(sch); 91 92 if (likely(sch->qstats.backlog + skb->len <= q->limit)) { 93 if (!q->unplug_indefinite) 94 q->pkts_current_epoch++; 95 return qdisc_enqueue_tail(skb, sch); 96 } 97 98 return qdisc_drop(skb, sch, to_free); 99} 100 101static struct sk_buff *plug_dequeue(struct Qdisc *sch) 102{ 103 struct plug_sched_data *q = qdisc_priv(sch); 104 105 if (q->throttled) 106 return NULL; 107 108 if (!q->unplug_indefinite) { 109 if (!q->pkts_to_release) { 110 /* No more packets to dequeue. Block the queue 111 * and wait for the next release command. 112 */ 113 q->throttled = true; 114 return NULL; 115 } 116 q->pkts_to_release--; 117 } 118 119 return qdisc_dequeue_head(sch); 120} 121 122static int plug_init(struct Qdisc *sch, struct nlattr *opt, 123 struct netlink_ext_ack *extack) 124{ 125 struct plug_sched_data *q = qdisc_priv(sch); 126 127 q->pkts_current_epoch = 0; 128 q->pkts_last_epoch = 0; 129 q->pkts_to_release = 0; 130 q->unplug_indefinite = false; 131 132 if (opt == NULL) { 133 q->limit = qdisc_dev(sch)->tx_queue_len 134 * psched_mtu(qdisc_dev(sch)); 135 } else { 136 struct tc_plug_qopt *ctl = nla_data(opt); 137 138 if (nla_len(opt) < sizeof(*ctl)) 139 return -EINVAL; 140 141 q->limit = ctl->limit; 142 } 143 144 q->throttled = true; 145 return 0; 146} 147 148/* Receives 4 types of messages: 149 * TCQ_PLUG_BUFFER: Inset a plug into the queue and 150 * buffer any incoming packets 151 * TCQ_PLUG_RELEASE_ONE: Dequeue packets from queue head 152 * to beginning of the next plug. 153 * TCQ_PLUG_RELEASE_INDEFINITE: Dequeue all packets from queue. 154 * Stop buffering packets until the next TCQ_PLUG_BUFFER 155 * command is received (just act as a pass-thru queue). 156 * TCQ_PLUG_LIMIT: Increase/decrease queue size 157 */ 158static int plug_change(struct Qdisc *sch, struct nlattr *opt, 159 struct netlink_ext_ack *extack) 160{ 161 struct plug_sched_data *q = qdisc_priv(sch); 162 struct tc_plug_qopt *msg; 163 164 if (opt == NULL) 165 return -EINVAL; 166 167 msg = nla_data(opt); 168 if (nla_len(opt) < sizeof(*msg)) 169 return -EINVAL; 170 171 switch (msg->action) { 172 case TCQ_PLUG_BUFFER: 173 /* Save size of the current buffer */ 174 q->pkts_last_epoch = q->pkts_current_epoch; 175 q->pkts_current_epoch = 0; 176 if (q->unplug_indefinite) 177 q->throttled = true; 178 q->unplug_indefinite = false; 179 break; 180 case TCQ_PLUG_RELEASE_ONE: 181 /* Add packets from the last complete buffer to the 182 * packets to be released set. 183 */ 184 q->pkts_to_release += q->pkts_last_epoch; 185 q->pkts_last_epoch = 0; 186 q->throttled = false; 187 netif_schedule_queue(sch->dev_queue); 188 break; 189 case TCQ_PLUG_RELEASE_INDEFINITE: 190 q->unplug_indefinite = true; 191 q->pkts_to_release = 0; 192 q->pkts_last_epoch = 0; 193 q->pkts_current_epoch = 0; 194 q->throttled = false; 195 netif_schedule_queue(sch->dev_queue); 196 break; 197 case TCQ_PLUG_LIMIT: 198 /* Limit is supplied in bytes */ 199 q->limit = msg->limit; 200 break; 201 default: 202 return -EINVAL; 203 } 204 205 return 0; 206} 207 208static struct Qdisc_ops plug_qdisc_ops __read_mostly = { 209 .id = "plug", 210 .priv_size = sizeof(struct plug_sched_data), 211 .enqueue = plug_enqueue, 212 .dequeue = plug_dequeue, 213 .peek = qdisc_peek_head, 214 .init = plug_init, 215 .change = plug_change, 216 .reset = qdisc_reset_queue, 217 .owner = THIS_MODULE, 218}; 219 220static int __init plug_module_init(void) 221{ 222 return register_qdisc(&plug_qdisc_ops); 223} 224 225static void __exit plug_module_exit(void) 226{ 227 unregister_qdisc(&plug_qdisc_ops); 228} 229module_init(plug_module_init) 230module_exit(plug_module_exit) 231MODULE_LICENSE("GPL");