@@ -8,6 +8,8 @@ import argparse
import errno
import ipaddress
import logging
+import multiprocessing
+import struct
import sys
import time
@@ -58,6 +60,53 @@ class ovs_dp_msg(genlmsg):
fields = genlmsg.fields + (("dpifindex", "I"),)
+class OvsPacket(GenericNetlinkSocket):
+ OVS_PACKET_CMD_MISS = 1 # Flow table miss
+ OVS_PACKET_CMD_ACTION = 2 # USERSPACE action
+ OVS_PACKET_CMD_EXECUTE = 3 # Apply actions to packet
+
+ class ovs_packet_msg(ovs_dp_msg):
+ nla_map = (
+ ('OVS_PACKET_ATTR_UNSPEC', 'none'),
+ ('OVS_PACKET_ATTR_PACKET', 'array(uint8)'),
+ ('OVS_PACKET_ATTR_KEY', 'nested'),
+ ('OVS_PACKET_ATTR_ACTIONS', 'nested'),
+ ('OVS_PACKET_ATTR_USERDATA', 'nested'),
+ ('OVS_PACKET_ATTR_EGRESS_TUN_KEY', 'nested'),
+ ('OVS_PACKET_ATTR_UNUSED1', 'none'),
+ ('OVS_PACKET_ATTR_UNUSED2', 'none'),
+ ('OVS_PACKET_ATTR_PROBE', 'none'),
+ ('OVS_PACKET_ATTR_MRU', 'uint16'),
+ ('OVS_PACKET_ATTR_LEN', 'uint32'),
+ ('OVS_PACKET_ATTR_HASH', 'uint64'),
+ )
+
+ def __init__(self):
+ GenericNetlinkSocket.__init__(self)
+ print("Binding to packet family")
+ self.bind(OVS_PACKET_FAMILY, OvsPacket.ovs_packet_msg)
+ print("Port", self.epid)
+
+ def upcall_handler(self, up=None):
+ print("listening on upcall packet handler:", self.epid)
+ while True:
+ try:
+ msgs = self.get()
+ for msg in msgs:
+ if not up:
+ continue
+ if msg["cmd"] == OvsPacket.OVS_PACKET_CMD_MISS:
+ up.miss(msg)
+ elif msg["cmd"] == OvsPacket.OVS_PACKET_CMD_ACTION:
+ up.action(msg)
+ elif msg["cmd"] == OvsPacket.OVS_PACKET_CMD_EXECUTE:
+ up.execute(msg)
+ else:
+ print("Unkonwn cmd: %d" % msg["cmd"])
+ except NetlinkError as ne:
+ raise ne
+
+
class OvsDatapath(GenericNetlinkSocket):
OVS_DP_F_VPORT_PIDS = 1 << 1
@@ -122,7 +171,7 @@ class OvsDatapath(GenericNetlinkSocket):
return reply
- def create(self, dpname, shouldUpcall=False, versionStr=None):
+ def create(self, dpname, shouldUpcall=False, versionStr=None, p=OvsPacket()):
msg = OvsDatapath.dp_cmd_msg()
msg["cmd"] = OVS_DP_CMD_NEW
if versionStr is None:
@@ -139,9 +188,19 @@ class OvsDatapath(GenericNetlinkSocket):
else:
dpfeatures = OvsDatapath.OVS_DP_F_VPORT_PIDS
- msg["attrs"].append(["OVS_DP_ATTR_USER_FEATURES", dpfeatures])
if not shouldUpcall:
msg["attrs"].append(["OVS_DP_ATTR_UPCALL_PID", [0]])
+ else:
+ if versionStr is None or versionStr.find(":") == -1:
+ dpfeatures |= OvsDatapath.OVS_DP_F_DISPATCH_UPCALL_PER_CPU
+ dpfeatures &= ~OvsDatapath.OVS_DP_F_VPORT_PIDS
+
+ nproc = multiprocessing.cpu_count()
+ procarray = []
+ for i in range(1, nproc):
+ procarray += [int(p.epid)]
+ msg["attrs"].append(["OVS_DP_ATTR_UPCALL_PID", procarray])
+ msg["attrs"].append(["OVS_DP_ATTR_USER_FEATURES", dpfeatures])
try:
reply = self.nlm_request(
@@ -238,9 +297,10 @@ class OvsVport(GenericNetlinkSocket):
return OvsVport.OVS_VPORT_TYPE_GENEVE
raise ValueError("Unknown vport type: '%s'" % vport_type)
- def __init__(self):
+ def __init__(self, packet=OvsPacket()):
GenericNetlinkSocket.__init__(self)
self.bind(OVS_VPORT_FAMILY, OvsVport.ovs_vport_msg)
+ self.upcall_packet = packet
def info(self, vport_name, dpifindex=0, portno=None):
msg = OvsVport.ovs_vport_msg()
@@ -278,7 +338,36 @@ class OvsVport(GenericNetlinkSocket):
msg["attrs"].append(["OVS_VPORT_ATTR_TYPE", port_type])
msg["attrs"].append(["OVS_VPORT_ATTR_NAME", vport_ifname])
- msg["attrs"].append(["OVS_VPORT_ATTR_UPCALL_PID", [self.pid]])
+ msg["attrs"].append(["OVS_VPORT_ATTR_UPCALL_PID",
+ [self.upcall_packet.epid]])
+
+ try:
+ reply = self.nlm_request(
+ msg, msg_type=self.prid, msg_flags=NLM_F_REQUEST | NLM_F_ACK
+ )
+ reply = reply[0]
+ except NetlinkError as ne:
+ if ne.code == errno.EEXIST:
+ reply = None
+ else:
+ raise ne
+ return reply
+
+ def reset_upcall(self, dpindex, vport_ifname, p=None):
+ msg = OvsVport.ovs_vport_msg()
+
+ msg["cmd"] = OVS_VPORT_CMD_SET
+ msg["version"] = OVS_DATAPATH_VERSION
+ msg["reserved"] = 0
+ msg["dpifindex"] = dpindex
+ msg["attrs"].append(["OVS_VPORT_ATTR_NAME", vport_ifname])
+
+ if p == None:
+ p = self.upcall_packet
+ else:
+ self.upcall_packet = p
+
+ msg["attrs"].append(["OVS_VPORT_ATTR_UPCALL_PID", [p.epid]])
try:
reply = self.nlm_request(
@@ -310,6 +399,9 @@ class OvsVport(GenericNetlinkSocket):
raise ne
return reply
+ def upcall_handler(self, handler=None):
+ self.upcall_packet.upcall_handler(handler)
+
def macstr(mac):
outstr = ":".join(["%02X" % i for i in mac])
@@ -1064,6 +1156,26 @@ class OvsFlow(GenericNetlinkSocket):
raise ne
return rep
+ def miss(self, packetmsg):
+ seq = packetmsg["header"]["sequence_number"]
+ keystr = "(none)"
+ key_field = packetmsg.get_attr("OVS_PACKET_ATTR_KEY")
+ if key_field is not None:
+ keymsg = OvsFlow.ovs_flow_msg.nestedflow(data=key_field)
+ keymsg.decode()
+ keystr = keymsg.dpstr(None, True)
+
+ pktdata = packetmsg.get_attr("OVS_PACKET_ATTR_PACKET")
+ pktpres = "yes" if pktdata is not None else "no"
+
+ print("MISS upcall[%d/%s]: %s" % (seq, pktpres, keystr), flush = True)
+
+ def execute(self, packetmsg):
+ print("userspace execute command")
+
+ def action(self, packetmsg):
+ print("userspace action command")
+
def print_ovsdp_full(dp_lookup_rep, ifindex, ndb=NDB(), vpl=OvsVport()):
dp_name = dp_lookup_rep.get_attr("OVS_DP_ATTR_NAME")
@@ -1141,6 +1253,12 @@ def main(argv):
addifcmd = subparsers.add_parser("add-if")
addifcmd.add_argument("dpname", help="Datapath Name")
addifcmd.add_argument("addif", help="Interface name for adding")
+ addifcmd.add_argument(
+ "-u",
+ "--upcall",
+ action="store_true",
+ help="Leave open a reader for upcalls",
+ )
addifcmd.add_argument(
"-t",
"--ptype",
@@ -1162,8 +1280,9 @@ def main(argv):
if args.verbose > 1:
logging.basicConfig(level=logging.DEBUG)
+ ovspk = OvsPacket()
ovsdp = OvsDatapath()
- ovsvp = OvsVport()
+ ovsvp = OvsVport(ovspk)
ovsflow = OvsFlow()
ndb = NDB()
@@ -1186,11 +1305,13 @@ def main(argv):
msg += ":'%s'" % args.showdp
print(msg)
elif hasattr(args, "adddp"):
- rep = ovsdp.create(args.adddp, args.upcall, args.versioning)
+ rep = ovsdp.create(args.adddp, args.upcall, args.versioning, ovspk)
if rep is None:
print("DP '%s' already exists" % args.adddp)
else:
print("DP '%s' added" % args.adddp)
+ if args.upcall:
+ ovspk.upcall_handler(ovsflow)
elif hasattr(args, "deldp"):
ovsdp.destroy(args.deldp)
elif hasattr(args, "addif"):
@@ -1198,13 +1319,18 @@ def main(argv):
if rep is None:
print("DP '%s' not found." % args.dpname)
return 1
- rep = ovsvp.attach(rep["dpifindex"], args.addif, args.ptype)
+ dpindex = rep["dpifindex"]
+ rep = ovsvp.attach(dpindex, args.addif, args.ptype)
msg = "vport '%s'" % args.addif
if rep and rep["error"] == 0:
msg += " added."
else:
msg += " failed to add."
print(msg)
+ if args.upcall:
+ if rep is None:
+ rep = ovsvp.reset_upcall(dpindex, args.addif, ovspk)
+ ovsvp.upcall_handler(ovsflow)
elif hasattr(args, "delif"):
rep = ovsdp.info(args.dpname, 0)
if rep is None: