[ad_1]
The question is from 2015; some things have changed since then.
CNetMessage
is a transport protocol agnostic message container. It contains the received message data (DataStream
), time of message receipt, payload size, and other information. It is used to decompose messages from the network. See the following code:
bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
{
complete = false;
const auto time = GetTime<std::chrono::microseconds>();
LOCK(cs_vRecv);
m_last_recv = std::chrono::duration_cast<std::chrono::seconds>(time);
nRecvBytes += msg_bytes.size();
while (msg_bytes.size() > 0) {
// absorb network data
if (!m_transport->ReceivedBytes(msg_bytes)) {
// Serious transport problem, disconnect from the peer.
return false;
}
if (m_transport->ReceivedMessageComplete()) {
// decompose a transport agnostic CNetMessage from the deserializer
bool reject_message{false};
CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message);
if (reject_message) {
// Message deserialization failed. Drop the message but don't disconnect the peer.
// store the size of the corrupt message
mapRecvBytesPerMsgType.at(NET_MESSAGE_TYPE_OTHER) += msg.m_raw_message_size;
continue;
}
// Store received bytes per message type.
// To prevent a memory DOS, only allow known message types.
auto i = mapRecvBytesPerMsgType.find(msg.m_type);
if (i == mapRecvBytesPerMsgType.end()) {
i = mapRecvBytesPerMsgType.find(NET_MESSAGE_TYPE_OTHER);
}
assert(i != mapRecvBytesPerMsgType.end());
i->second += msg.m_raw_message_size;
// push the message to the process queue,
vRecvMsg.push_back(std::move(msg));
complete = true;
}
}
return true;
}
After receiving the message, we push it to vRecvMsg
and then put it in a queue to be processed. In ProcessMessage
, we get the message data (DataStream
) and process it according to the message type.
bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
{
AssertLockHeld(g_msgproc_mutex);
PeerRef peer = GetPeerRef(pfrom->GetId());
if (peer == nullptr) return false;
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) {
ProcessGetData(*pfrom, *peer, interruptMsgProc);
}
}
const bool processed_orphan = ProcessOrphanTx(*peer);
if (pfrom->fDisconnect)
return false;
if (processed_orphan) return true;
// this maintains the order of responses
// and prevents m_getdata_requests to grow unbounded
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) return true;
}
// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend) return false;
auto poll_result{pfrom->PollMessage()};
if (!poll_result) {
// No message to process
return false;
}
CNetMessage& msg{poll_result->first};
bool fMoreWork = poll_result->second;
TRACE6(net, inbound_message,
pfrom->GetId(),
pfrom->m_addr_name.c_str(),
pfrom->ConnectionTypeAsString().c_str(),
msg.m_type.c_str(),
msg.m_recv.size(),
msg.m_recv.data()
);
if (m_opts.capture_messages) {
CaptureMessage(pfrom->addr, msg.m_type, MakeUCharSpan(msg.m_recv), /*is_incoming=*/true);
}
try {
ProcessMessage(*pfrom, msg.m_type, msg.m_recv, msg.m_time, interruptMsgProc);
if (interruptMsgProc) return false;
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) fMoreWork = true;
}
// Does this peer has an orphan ready to reconsider?
// (Note: we may have provided a parent for an orphan provided
// by another peer that was already processed; in that case,
// the extra work may not be noticed, possibly resulting in an
// unnecessary 100ms delay)
if (m_orphanage.HaveTxToReconsider(peer->m_id)) fMoreWork = true;
} catch (const std::exception& e) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size, e.what(), typeid(e).name());
} catch (...) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Unknown exception caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size);
}
return fMoreWork;
}
In the case of vSendMsg
, this is a vector of CSerializedNetMsg
. CSerializedNetMsg
structure is simple. It holds the message data (a vector of unsigned char
) and its type. As its name indicates, it represents the serialized message. You can notice in the codebase that NetMsg::Make
is commonly used to construct it. This function accepts a string parameter representing the message type and any other parameter except that will be used to compose the msg data.
namespace NetMsg {
template <typename... Args>
CSerializedNetMsg Make(std::string msg_type, Args&&... args)
{
CSerializedNetMsg msg;
msg.m_type = std::move(msg_type);
VectorWriter{msg.data, 0, std::forward<Args>(args)...};
return msg;
}
} // namespace NetMsg
Now, what I understand about the difference between CSerializedNetMsg
and CNetMessage
is:
CSerializedNetMsg
seems lighter.CNetMessage
has additional members (m_time
,m_message_size
,m_raw_message_size
)- Only one
CNetMessage
object for the same message will exist.CSerializedNetMsg
has a specific method for copies.
Although they seem similar, they have specific characteristics for their purposes. For example, it is important to deal with received messages with CNetMessage
, among other reasons, to know exactly the size of the data we receive to track the process queue size. Copy
from CSerializedNetMsg
can be useful when sending the same message for more than one node.
m_connman.ForEachNode([this, pindex, &lazy_ser, &hashBlock](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) {
AssertLockHeld(::cs_main);
if (pnode->GetCommonVersion() < INVALID_CB_NO_BAN_VERSION || pnode->fDisconnect)
return;
ProcessBlockAvailability(pnode->GetId());
CNodeState &state = *State(pnode->GetId());
// If the peer has, or we announced to them the previous block already,
// but we don't think they have this one, go ahead and announce it
if (state.m_requested_hb_cmpctblocks && !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->pprev)) {
LogPrint(BCLog::NET, "%s sending header-and-ids %s to peer=%d\n", "PeerManager::NewPoWValidBlock",
hashBlock.ToString(), pnode->GetId());
const CSerializedNetMsg& ser_cmpctblock{lazy_ser.get()};
PushMessage(*pnode, ser_cmpctblock.Copy());
state.pindexBestHeaderSent = pindex;
}
});
About vRecvGetData
, it was removed in #19911. Now we have m_getdata_requests
as a member of Peer
, when a node receives a GETDATA
message, it stores the INVs into m_getdata_requests
. It is used to control what a node requested of us (e.g., a transaction).
[ad_2]
Source link