[RFC] net/tipc: reduce tipc_node lock holding time in tipc_rcv

Message ID 20231123024510.2037882-1-xu.xin.sc@gmail.com
State New
Headers
Series [RFC] net/tipc: reduce tipc_node lock holding time in tipc_rcv |

Commit Message

xu Nov. 23, 2023, 2:45 a.m. UTC
  From: xu xin <xu.xin.sc@gmail.com>

Background
==========
As we know, for now, TIPC doesn't support RPS balance based on the port
of tipc skb destination. The basic reason is the increased contention of
node lock when the packets from the same link are distributed to
different CPUs for processing, as mentioned in [1].

Questions to talk
=================
Does tipc_link_rcv() really need hold the tipc_node's read or write lock?
I tried to look through the procudure code of tipc_link_rcv, I didn't find
the reason why it needs the lock. If tipc_link_rcv does need it, Can anyone
tells me the reason, and if we can reduce it's holding time more.

Advantage
=========
If tipc_link_rcv does not need the lock, with this patch applied, enabling
RPS based destination port (my experimental code) can increase the tipc
throughput by approximately 25% (in a 4-core cpu).

[1] commit 08bfc9cb76e2 ("flow_dissector: add tipc support")

Signed-off-by: xu xin <xu.xin.sc@gmail.com>
---
 net/tipc/node.c | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
  

Comments

Tung Quang Nguyen Nov. 23, 2023, 4:12 a.m. UTC | #1
> net/tipc/node.c | 10 ++++++----
> 1 file changed, 6 insertions(+), 4 deletions(-)
>
>diff --git a/net/tipc/node.c b/net/tipc/node.c index 3105abe97bb9..2a036b8a7da3 100644
>--- a/net/tipc/node.c
>+++ b/net/tipc/node.c
>@@ -2154,14 +2154,15 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
> 	/* Receive packet directly if conditions permit */
> 	tipc_node_read_lock(n);
> 	if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
>+		tipc_node_read_unlock(n);
> 		spin_lock_bh(&le->lock);
> 		if (le->link) {
> 			rc = tipc_link_rcv(le->link, skb, &xmitq);
> 			skb = NULL;
> 		}
> 		spin_unlock_bh(&le->lock);
>-	}
>-	tipc_node_read_unlock(n);
>+	} else
>+		tipc_node_read_unlock(n);
>
> 	/* Check/update node state before receiving */
> 	if (unlikely(skb)) {
>@@ -2169,12 +2170,13 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
> 			goto out_node_put;
> 		tipc_node_write_lock(n);
> 		if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
>+			tipc_node_write_unlock(n);
> 			if (le->link) {
> 				rc = tipc_link_rcv(le->link, skb, &xmitq);
> 				skb = NULL;
> 			}
>-		}
>-		tipc_node_write_unlock(n);
>+		} else
>+			tipc_node_write_unlock(n);
> 	}
>
> 	if (unlikely(rc & TIPC_LINK_UP_EVT))
>--
>2.15.2
>
>
This patch is wrong. le->link and link status must be protected by node lock. See what happens if tipc_node_timeout() is called, and the link goes down:
tipc_node_timeout()
   tipc_node_link_down()
   {
      struct tipc_link *l = le->link;
      ...
      if (delete) {
         kfree(l);
         le->link = NULL;
      }
      ...
   }
  
xu Nov. 23, 2023, 6:22 a.m. UTC | #2
>>diff --git a/net/tipc/node.c b/net/tipc/node.c index 3105abe97bb9..2a036b8a7da3 100644
>>--- a/net/tipc/node.c
>>+++ b/net/tipc/node.c
>>@@ -2154,14 +2154,15 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
>> 	/* Receive packet directly if conditions permit */
>> 	tipc_node_read_lock(n);
>> 	if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
>>+		tipc_node_read_unlock(n);
>> 		spin_lock_bh(&le->lock);
>> 		if (le->link) {
>> 			rc = tipc_link_rcv(le->link, skb, &xmitq);
>> 			skb = NULL;
>> 		}
>> 		spin_unlock_bh(&le->lock);
>>-	}
>>-	tipc_node_read_unlock(n);
>>+	} else
>>+		tipc_node_read_unlock(n);
>>
>> 	/* Check/update node state before receiving */
>> 	if (unlikely(skb)) {
>>@@ -2169,12 +2170,13 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
>> 			goto out_node_put;
>> 		tipc_node_write_lock(n);
>> 		if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
>>+			tipc_node_write_unlock(n);
>> 			if (le->link) {
>> 				rc = tipc_link_rcv(le->link, skb, &xmitq);
>> 				skb = NULL;
>> 			}
>>-		}
>>-		tipc_node_write_unlock(n);
>>+		} else
>>+			tipc_node_write_unlock(n);
>> 	}
>>
>> 	if (unlikely(rc & TIPC_LINK_UP_EVT))
>>--
>>2.15.2
>>
>>
>This patch is wrong. le->link and link status must be protected by node lock. See what happens if tipc_node_timeout() is called, and the link goes down:
>tipc_node_timeout()
>   tipc_node_link_down()
>   {
>      struct tipc_link *l = le->link;
>      ...
>      if (delete) {
>         kfree(l);
>         le->link = NULL;
>      }
>      ...
>   }

Happy to see your reply. But Why? 'delete' is false from tipc_node_timeout(). Refer to:
https://elixir.bootlin.com/linux/v6.7-rc2/source/net/tipc/node.c#L844
  
Tung Quang Nguyen Nov. 23, 2023, 6:54 a.m. UTC | #3
>>This patch is wrong. le->link and link status must be protected by node lock. See what happens if tipc_node_timeout() is called, and
>the link goes down:
>>tipc_node_timeout()
>>   tipc_node_link_down()
>>   {
>>      struct tipc_link *l = le->link;
>>      ...
>>      if (delete) {
>>         kfree(l);
>>         le->link = NULL;
>>      }
>>      ...
>>   }
>
>Happy to see your reply. But Why? 'delete' is false from tipc_node_timeout(). Refer to:
>https://elixir.bootlin.com/linux/v6.7-rc2/source/net/tipc/node.c#L844
I should have explained it clearly:
1/ link status must be protected.
tipc_node_timeout()
   tipc_node_link_down()
   {
      struct tipc_link *l = le->link;
   
      ...
     __tipc_node_link_down(); <-- link status is referred. 
      ...
     if (delete) {
        kfree(l);
        le->link = NULL;
     }
     ...
   }

__tipc_node_link_down()
{
    ...
   if (!l || tipc_link_is_reset(l)) <-- read link status
    ...
    tipc_link_reset(l); <--- this function will reset all things related to link.
}

2/ le->link must be protected.
bearer_disable()
{
   ...
   tipc_node_delete_links(net, bearer_id); <--- this will delete all links.
   ...
}

tipc_node_delete_links()
{
   ...
  tipc_node_link_down(n, bearer_id, true);
   ...
}
  
xu Nov. 23, 2023, 12:34 p.m. UTC | #4
>>>This patch is wrong. le->link and link status must be protected by node lock. See what happens if tipc_node_timeout() is called, and
>>the link goes down:
>>>tipc_node_timeout()
>>>   tipc_node_link_down()
>>>   {
>>>      struct tipc_link *l = le->link;
>>>      ...
>>>      if (delete) {
>>>         kfree(l);
>>>         le->link = NULL;
>>>      }
>>>      ...
>>>   }
>>
>>Happy to see your reply. But Why? 'delete' is false from tipc_node_timeout(). Refer to:
>>https://elixir.bootlin.com/linux/v6.7-rc2/source/net/tipc/node.c#L844
>I should have explained it clearly:

Thanks. I see, so the root reason for holding node lock before tipc_link_rcv is to protect
links from being reset or deleted when tipc_link_rcv().

For further discussion, to balance incoming packets (the same links, different ports) to
multi-CPUs, maybe we can try RCU + spinlock here.

>1/ link status must be protected.
>tipc_node_timeout()
>   tipc_node_link_down()
>   {
>      struct tipc_link *l = le->link;
>   
>      ...
>     __tipc_node_link_down(); <-- link status is referred. 
>      ...
>     if (delete) {
>        kfree(l);
>        le->link = NULL;
>     }
>     ...
>   }
>
>__tipc_node_link_down()
>{
>    ...
>   if (!l || tipc_link_is_reset(l)) <-- read link status
>    ...
>    tipc_link_reset(l); <--- this function will reset all things related to link.
>}
>
>2/ le->link must be protected.
>bearer_disable()
>{
>   ...
>   tipc_node_delete_links(net, bearer_id); <--- this will delete all links.
>   ...
>}
>
>tipc_node_delete_links()
>{
>   ...
>  tipc_node_link_down(n, bearer_id, true);
>   ...
>}
  
xu Nov. 24, 2023, 7:31 a.m. UTC | #5
>>Happy to see your reply. But Why? 'delete' is false from tipc_node_timeout(). Refer to:
>>https://elixir.bootlin.com/linux/v6.7-rc2/source/net/tipc/node.c#L844
>I should have explained it clearly:
>1/ link status must be protected.
>tipc_node_timeout()
>   tipc_node_link_down()
>   {
>      struct tipc_link *l = le->link;
>   
>      ...
>     __tipc_node_link_down(); <-- link status is referred. 
>      ...
>     if (delete) {
>        kfree(l);
>        le->link = NULL;
>     }
>     ...
>   }
>
>__tipc_node_link_down()
>{
>    ...
>   if (!l || tipc_link_is_reset(l)) <-- read link status
>    ...
>    tipc_link_reset(l); <--- this function will reset all things related to link.
>}
>
>2/ le->link must be protected.
>bearer_disable()
>{
>   ...
>   tipc_node_delete_links(net, bearer_id); <--- this will delete all links.
>   ...
>}
>
>tipc_node_delete_links()
>{
>   ...
>  tipc_node_link_down(n, bearer_id, true);
>   ...
>}

Could we please solve the problem mentioned above by adding spinlock(&le->lock)?

For example:

(BTW, I have tested it, with this change, enabling RPS based on tipc port can improve 25% of general throughput)

diff --git a/net/tipc/node.c b/net/tipc/node.c
index 3105abe97bb9..470c272d798e 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1079,12 +1079,16 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
                __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
        } else {
                /* Defuse pending tipc_node_link_up() */
+               spin_lock_bh(&le->lock);
                tipc_link_reset(l);
+               spin_unlock_bh(&le->lock);
                tipc_link_fsm_evt(l, LINK_RESET_EVT);
        }
        if (delete) {
+               spin_lock_bh(&le->lock);
                kfree(l);
                le->link = NULL;
+               spin_unlock_bh(&le->lock);
                n->link_cnt--;
        }
        trace_tipc_node_link_down(n, true, "node link down or deleted!");
@@ -2154,14 +2158,15 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
        /* Receive packet directly if conditions permit */
        tipc_node_read_lock(n);
        if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
+               tipc_node_read_unlock(n);
                spin_lock_bh(&le->lock);
                if (le->link) {
                        rc = tipc_link_rcv(le->link, skb, &xmitq);
                        skb = NULL;
                }
                spin_unlock_bh(&le->lock);
-       }
-       tipc_node_read_unlock(n);
+       } else
+               tipc_node_read_unlock(n);
 
        /* Check/update node state before receiving */
        if (unlikely(skb)) {
@@ -2169,12 +2174,13 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
                        goto out_node_put;
                tipc_node_write_lock(n);
                if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
+                       tipc_node_write_unlock(n);
                        if (le->link) {
                                rc = tipc_link_rcv(le->link, skb, &xmitq);
                                skb = NULL;
                        }
-               }
-               tipc_node_write_unlock(n);
+               } else
+                       tipc_node_write_unlock(n);
        }
 
        if (unlikely(rc & TIPC_LINK_UP_EVT))
  
Tung Quang Nguyen Nov. 24, 2023, 8:28 a.m. UTC | #6
>Could we please solve the problem mentioned above by adding spinlock(&le->lock)?
>

No, you cannot do that. As I said before, the link status (including l->state) needs to be protected by node lock.
What I showed you were just 2 use cases (link reset/delete). There are more use cases (netlink, transmit path etc) that need proper locks.

>For example:
>
>(BTW, I have tested it, with this change, enabling RPS based on tipc port can improve 25% of general throughput)
>
>diff --git a/net/tipc/node.c b/net/tipc/node.c index 3105abe97bb9..470c272d798e 100644
>--- a/net/tipc/node.c
>+++ b/net/tipc/node.c
>@@ -1079,12 +1079,16 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
>                __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
>        } else {
>                /* Defuse pending tipc_node_link_up() */
>+               spin_lock_bh(&le->lock);
>                tipc_link_reset(l);
>+               spin_unlock_bh(&le->lock);
>                tipc_link_fsm_evt(l, LINK_RESET_EVT);
>        }
>        if (delete) {
>+               spin_lock_bh(&le->lock);
>                kfree(l);
>                le->link = NULL;
>+               spin_unlock_bh(&le->lock);
>                n->link_cnt--;
>        }
>        trace_tipc_node_link_down(n, true, "node link down or deleted!"); @@ -2154,14 +2158,15 @@ void tipc_rcv(struct net *net,
>struct sk_buff *skb, struct tipc_bearer *b)
>        /* Receive packet directly if conditions permit */
>        tipc_node_read_lock(n);
>        if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
>+               tipc_node_read_unlock(n);
>                spin_lock_bh(&le->lock);
>                if (le->link) {
>                        rc = tipc_link_rcv(le->link, skb, &xmitq);
>                        skb = NULL;
>                }
>                spin_unlock_bh(&le->lock);
>-       }
>-       tipc_node_read_unlock(n);
>+       } else
>+               tipc_node_read_unlock(n);
>
>        /* Check/update node state before receiving */
>        if (unlikely(skb)) {
>@@ -2169,12 +2174,13 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
>                        goto out_node_put;
>                tipc_node_write_lock(n);
>                if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
>+                       tipc_node_write_unlock(n);
>                        if (le->link) {
>                                rc = tipc_link_rcv(le->link, skb, &xmitq);
>                                skb = NULL;
>                        }
>-               }
>-               tipc_node_write_unlock(n);
>+               } else
>+                       tipc_node_write_unlock(n);
>        }
>
>        if (unlikely(rc & TIPC_LINK_UP_EVT))
  
xu Nov. 24, 2023, 9:49 a.m. UTC | #7
>>Could we please solve the problem mentioned above by adding spinlock(&le->lock)?
>>
>
>No, you cannot do that. As I said before, the link status (including l->state) needs to be protected by node lock.

Why can't use le->lock instead of node's lock to protect it in tipc_link_rcv.

>What I showed you were just 2 use cases (link reset/delete). There are more use cases (netlink, transmit path etc) that need proper locks.

The same. We can also add spin_lock_bh(&le->lock) to protect the link in other places where it changes the
link status in addition to 'reset/delete'. Because using node lock to protect the link in tipc_link_rcv is
really wasting CPU performance.

>
>>For example:
>>
>>(BTW, I have tested it, with this change, enabling RPS based on tipc port can improve 25% of general throughput)
>>
>>diff --git a/net/tipc/node.c b/net/tipc/node.c index 3105abe97bb9..470c272d798e 100644
>>--- a/net/tipc/node.c
>>+++ b/net/tipc/node.c
>>@@ -1079,12 +1079,16 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
>>                __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
>>        } else {
>>                /* Defuse pending tipc_node_link_up() */
>>+               spin_lock_bh(&le->lock);
>>                tipc_link_reset(l);
>>+               spin_unlock_bh(&le->lock);
>>                tipc_link_fsm_evt(l, LINK_RESET_EVT);
>>        }
>>        if (delete) {
>>+               spin_lock_bh(&le->lock);
>>                kfree(l);
>>                le->link = NULL;
>>+               spin_unlock_bh(&le->lock);
>>                n->link_cnt--;
>>        }
>>        trace_tipc_node_link_down(n, true, "node link down or deleted!"); @@ -2154,14 +2158,15 @@ void tipc_rcv(struct net *net,
>>struct sk_buff *skb, struct tipc_bearer *b)
>>        /* Receive packet directly if conditions permit */
>>        tipc_node_read_lock(n);
>>        if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
>>+               tipc_node_read_unlock(n);
>>                spin_lock_bh(&le->lock);
>>                if (le->link) {
>>                        rc = tipc_link_rcv(le->link, skb, &xmitq);
>>                        skb = NULL;
>>                }
>>                spin_unlock_bh(&le->lock);
>>-       }
>>-       tipc_node_read_unlock(n);
>>+       } else
>>+               tipc_node_read_unlock(n);
>>
>>        /* Check/update node state before receiving */
>>        if (unlikely(skb)) {
>>@@ -2169,12 +2174,13 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
>>                        goto out_node_put;
>>                tipc_node_write_lock(n);
>>                if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
>>+                       tipc_node_write_unlock(n);
>>                        if (le->link) {
>>                                rc = tipc_link_rcv(le->link, skb, &xmitq);
>>                                skb = NULL;
>>                        }
>>-               }
>>-               tipc_node_write_unlock(n);
>>+               } else
>>+                       tipc_node_write_unlock(n);
>>        }
>>
>>        if (unlikely(rc & TIPC_LINK_UP_EVT))
  
Tung Quang Nguyen Nov. 24, 2023, 10:22 a.m. UTC | #8
>Why can't use le->lock instead of node's lock to protect it in tipc_link_rcv.
>
I have already explained:
__tipc_node_link_down()
{
    ...
   if (!l || tipc_link_is_reset(l)) <-- read link status
    ...
}
>>What I showed you were just 2 use cases (link reset/delete). There are more use cases (netlink, transmit path etc) that need proper
>locks.
>
>The same. We can also add spin_lock_bh(&le->lock) to protect the link in other places where it changes the link status in addition to
>'reset/delete'. Because using node lock to protect the link in tipc_link_rcv is really wasting CPU performance.
>
If you want to change current lock policy, you need to submit a complete/correct patch. I will acknowledge this patch if I can see a significant improvement in my test.
  

Patch

diff --git a/net/tipc/node.c b/net/tipc/node.c
index 3105abe97bb9..2a036b8a7da3 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -2154,14 +2154,15 @@  void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 	/* Receive packet directly if conditions permit */
 	tipc_node_read_lock(n);
 	if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
+		tipc_node_read_unlock(n);
 		spin_lock_bh(&le->lock);
 		if (le->link) {
 			rc = tipc_link_rcv(le->link, skb, &xmitq);
 			skb = NULL;
 		}
 		spin_unlock_bh(&le->lock);
-	}
-	tipc_node_read_unlock(n);
+	} else
+		tipc_node_read_unlock(n);
 
 	/* Check/update node state before receiving */
 	if (unlikely(skb)) {
@@ -2169,12 +2170,13 @@  void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 			goto out_node_put;
 		tipc_node_write_lock(n);
 		if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
+			tipc_node_write_unlock(n);
 			if (le->link) {
 				rc = tipc_link_rcv(le->link, skb, &xmitq);
 				skb = NULL;
 			}
-		}
-		tipc_node_write_unlock(n);
+		} else
+			tipc_node_write_unlock(n);
 	}
 
 	if (unlikely(rc & TIPC_LINK_UP_EVT))