mirror of
				https://github.com/RGBCube/serenity
				synced 2025-10-31 07:02:44 +00:00 
			
		
		
		
	Kernel/TCP: Port TCP retransmit queue to ProtectedValue
I had to switch to exclusive locking since ProtectedValue rightly doesn't allow you to mutate protected data with only a shared lock.
This commit is contained in:
		
							parent
							
								
									4c582b57e9
								
							
						
					
					
						commit
						0cb6c3c831
					
				
					 2 changed files with 71 additions and 65 deletions
				
			
		|  | @ -254,10 +254,11 @@ KResult TCPSocket::send_tcp_packet(u16 flags, const UserOrKernelBuffer* payload, | |||
|     m_packets_out++; | ||||
|     m_bytes_out += buffer_size; | ||||
|     if (tcp_packet.has_syn() || payload_size > 0) { | ||||
|         MutexLocker locker(m_not_acked_lock); | ||||
|         m_not_acked.append({ m_sequence_number, move(packet), ipv4_payload_offset, *routing_decision.adapter }); | ||||
|         m_not_acked_size += payload_size; | ||||
|         enqueue_for_retransmit(); | ||||
|         m_unacked_packets.with_exclusive([&](auto& unacked_packets) { | ||||
|             unacked_packets.packets.append({ m_sequence_number, move(packet), ipv4_payload_offset, *routing_decision.adapter }); | ||||
|             unacked_packets.size += payload_size; | ||||
|             enqueue_for_retransmit(); | ||||
|         }); | ||||
|     } else { | ||||
|         routing_decision.adapter->release_packet_buffer(*packet); | ||||
|     } | ||||
|  | @ -273,33 +274,34 @@ void TCPSocket::receive_tcp_packet(const TCPPacket& packet, u16 size) | |||
|         dbgln_if(TCP_SOCKET_DEBUG, "TCPSocket: receive_tcp_packet: {}", ack_number); | ||||
| 
 | ||||
|         int removed = 0; | ||||
|         MutexLocker locker(m_not_acked_lock); | ||||
|         while (!m_not_acked.is_empty()) { | ||||
|             auto& packet = m_not_acked.first(); | ||||
|         m_unacked_packets.with_exclusive([&](auto& unacked_packets) { | ||||
|             while (!unacked_packets.packets.is_empty()) { | ||||
|                 auto& packet = unacked_packets.packets.first(); | ||||
| 
 | ||||
|             dbgln_if(TCP_SOCKET_DEBUG, "TCPSocket: iterate: {}", packet.ack_number); | ||||
|                 dbgln_if(TCP_SOCKET_DEBUG, "TCPSocket: iterate: {}", packet.ack_number); | ||||
| 
 | ||||
|             if (packet.ack_number <= ack_number) { | ||||
|                 auto old_adapter = packet.adapter.strong_ref(); | ||||
|                 if (old_adapter) | ||||
|                     old_adapter->release_packet_buffer(*packet.buffer); | ||||
|                 TCPPacket& tcp_packet = *(TCPPacket*)(packet.buffer->buffer->data() + packet.ipv4_payload_offset); | ||||
|                 auto payload_size = packet.buffer->buffer->data() + packet.buffer->buffer->size() - (u8*)tcp_packet.payload(); | ||||
|                 m_not_acked_size -= payload_size; | ||||
|                 evaluate_block_conditions(); | ||||
|                 m_not_acked.take_first(); | ||||
|                 removed++; | ||||
|             } else { | ||||
|                 break; | ||||
|                 if (packet.ack_number <= ack_number) { | ||||
|                     auto old_adapter = packet.adapter.strong_ref(); | ||||
|                     if (old_adapter) | ||||
|                         old_adapter->release_packet_buffer(*packet.buffer); | ||||
|                     TCPPacket& tcp_packet = *(TCPPacket*)(packet.buffer->buffer->data() + packet.ipv4_payload_offset); | ||||
|                     auto payload_size = packet.buffer->buffer->data() + packet.buffer->buffer->size() - (u8*)tcp_packet.payload(); | ||||
|                     unacked_packets.size -= payload_size; | ||||
|                     evaluate_block_conditions(); | ||||
|                     unacked_packets.packets.take_first(); | ||||
|                     removed++; | ||||
|                 } else { | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         if (m_not_acked.is_empty()) { | ||||
|             m_retransmit_attempts = 0; | ||||
|             dequeue_for_retransmit(); | ||||
|         } | ||||
|             if (unacked_packets.packets.is_empty()) { | ||||
|                 m_retransmit_attempts = 0; | ||||
|                 dequeue_for_retransmit(); | ||||
|             } | ||||
| 
 | ||||
|         dbgln_if(TCP_SOCKET_DEBUG, "TCPSocket: receive_tcp_packet acknowledged {} packets", removed); | ||||
|             dbgln_if(TCP_SOCKET_DEBUG, "TCPSocket: receive_tcp_packet acknowledged {} packets", removed); | ||||
|         }); | ||||
|     } | ||||
| 
 | ||||
|     m_packets_in++; | ||||
|  | @ -560,41 +562,42 @@ void TCPSocket::retransmit_packets() | |||
|     if (routing_decision.is_zero()) | ||||
|         return; | ||||
| 
 | ||||
|     MutexLocker locker(m_not_acked_lock, Mutex::Mode::Shared); | ||||
|     for (auto& packet : m_not_acked) { | ||||
|         packet.tx_counter++; | ||||
|     m_unacked_packets.with_exclusive([&](auto& unacked_packets) { | ||||
|         for (auto& packet : unacked_packets.packets) { | ||||
|             packet.tx_counter++; | ||||
| 
 | ||||
|         if constexpr (TCP_SOCKET_DEBUG) { | ||||
|             auto& tcp_packet = *(const TCPPacket*)(packet.buffer->buffer->data() + packet.ipv4_payload_offset); | ||||
|             dbgln("Sending TCP packet from {}:{} to {}:{} with ({}{}{}{}) seq_no={}, ack_no={}, tx_counter={}", | ||||
|                 local_address(), local_port(), | ||||
|                 peer_address(), peer_port(), | ||||
|                 (tcp_packet.has_syn() ? "SYN " : ""), | ||||
|                 (tcp_packet.has_ack() ? "ACK " : ""), | ||||
|                 (tcp_packet.has_fin() ? "FIN " : ""), | ||||
|                 (tcp_packet.has_rst() ? "RST " : ""), | ||||
|                 tcp_packet.sequence_number(), | ||||
|                 tcp_packet.ack_number(), | ||||
|                 packet.tx_counter); | ||||
|             if constexpr (TCP_SOCKET_DEBUG) { | ||||
|                 auto& tcp_packet = *(const TCPPacket*)(packet.buffer->buffer->data() + packet.ipv4_payload_offset); | ||||
|                 dbgln("Sending TCP packet from {}:{} to {}:{} with ({}{}{}{}) seq_no={}, ack_no={}, tx_counter={}", | ||||
|                     local_address(), local_port(), | ||||
|                     peer_address(), peer_port(), | ||||
|                     (tcp_packet.has_syn() ? "SYN " : ""), | ||||
|                     (tcp_packet.has_ack() ? "ACK " : ""), | ||||
|                     (tcp_packet.has_fin() ? "FIN " : ""), | ||||
|                     (tcp_packet.has_rst() ? "RST " : ""), | ||||
|                     tcp_packet.sequence_number(), | ||||
|                     tcp_packet.ack_number(), | ||||
|                     packet.tx_counter); | ||||
|             } | ||||
| 
 | ||||
|             size_t ipv4_payload_offset = routing_decision.adapter->ipv4_payload_offset(); | ||||
|             if (ipv4_payload_offset != packet.ipv4_payload_offset) { | ||||
|                 // FIXME: Add support for this. This can happen if after a route change
 | ||||
|                 // we ended up on another adapter which doesn't have the same layer 2 type
 | ||||
|                 // like the previous adapter.
 | ||||
|                 VERIFY_NOT_REACHED(); | ||||
|             } | ||||
| 
 | ||||
|             auto packet_buffer = packet.buffer->bytes(); | ||||
| 
 | ||||
|             routing_decision.adapter->fill_in_ipv4_header(*packet.buffer, | ||||
|                 local_address(), routing_decision.next_hop, peer_address(), | ||||
|                 IPv4Protocol::TCP, packet_buffer.size() - ipv4_payload_offset, ttl()); | ||||
|             routing_decision.adapter->send_packet(packet_buffer); | ||||
|             m_packets_out++; | ||||
|             m_bytes_out += packet_buffer.size(); | ||||
|         } | ||||
| 
 | ||||
|         size_t ipv4_payload_offset = routing_decision.adapter->ipv4_payload_offset(); | ||||
|         if (ipv4_payload_offset != packet.ipv4_payload_offset) { | ||||
|             // FIXME: Add support for this. This can happen if after a route change
 | ||||
|             // we ended up on another adapter which doesn't have the same layer 2 type
 | ||||
|             // like the previous adapter.
 | ||||
|             VERIFY_NOT_REACHED(); | ||||
|         } | ||||
| 
 | ||||
|         auto packet_buffer = packet.buffer->bytes(); | ||||
| 
 | ||||
|         routing_decision.adapter->fill_in_ipv4_header(*packet.buffer, | ||||
|             local_address(), routing_decision.next_hop, peer_address(), | ||||
|             IPv4Protocol::TCP, packet_buffer.size() - ipv4_payload_offset, ttl()); | ||||
|         routing_decision.adapter->send_packet(packet_buffer); | ||||
|         m_packets_out++; | ||||
|         m_bytes_out += packet_buffer.size(); | ||||
|     } | ||||
|     }); | ||||
| } | ||||
| 
 | ||||
| bool TCPSocket::can_write(const FileDescription& file_description, size_t size) const | ||||
|  | @ -608,8 +611,8 @@ bool TCPSocket::can_write(const FileDescription& file_description, size_t size) | |||
|     if (!file_description.is_blocking()) | ||||
|         return true; | ||||
| 
 | ||||
|     MutexLocker lock(m_not_acked_lock); | ||||
|     return m_not_acked_size + size <= m_send_window_size; | ||||
|     return m_unacked_packets.with_shared([&](auto& unacked_packets) { | ||||
|         return unacked_packets.size + size <= m_send_window_size; | ||||
|     }); | ||||
| } | ||||
| 
 | ||||
| } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Andreas Kling
						Andreas Kling