Refactor network implementations to be more reusable and less buggy (#2107)

encapsulate network interfaces
This commit is contained in:
Jesse Talavera
2024-08-01 16:02:45 -04:00
committed by GitHub
parent c6bf5d5181
commit 327ce45124
20 changed files with 715 additions and 439 deletions

View File

@ -28,99 +28,31 @@ using namespace melonDS::Platform;
using Platform::Log;
using Platform::LogLevel;
namespace LocalMP
namespace melonDS
{
struct MPStatusData
LocalMP::LocalMP() noexcept :
MPQueueLock(Mutex_Create())
{
u16 ConnectedBitmask; // bitmask of which instances are ready to send/receive packets
u32 PacketWriteOffset;
u32 ReplyWriteOffset;
u16 MPHostinst; // instance ID from which the last CMD frame was sent
u16 MPReplyBitmask; // bitmask of which clients replied in time
};
struct MPPacketHeader
{
u32 Magic;
u32 SenderID;
u32 Type; // 0=regular 1=CMD 2=reply 3=ack
u32 Length;
u64 Timestamp;
};
const u32 kPacketQueueSize = 0x10000;
const u32 kReplyQueueSize = 0x10000;
const u32 kMaxFrameSize = 0x948;
Mutex* MPQueueLock;
MPStatusData MPStatus;
u8 MPPacketQueue[kPacketQueueSize];
u8 MPReplyQueue[kReplyQueueSize];
u32 PacketReadOffset[16];
u32 ReplyReadOffset[16];
int RecvTimeout;
int LastHostID;
Semaphore* SemPool[32];
void SemPoolInit()
{
for (int i = 0; i < 32; i++)
{
SemPool[i] = Semaphore_Create();
}
}
bool SemPost(int num)
{
Semaphore_Post(SemPool[num]);
return true;
}
bool SemWait(int num, int timeout)
{
return Semaphore_TryWait(SemPool[num], timeout);
}
void SemReset(int num)
{
Semaphore_Reset(SemPool[num]);
}
bool Init()
{
MPQueueLock = Mutex_Create();
Mutex_Lock(MPQueueLock);
memset(MPPacketQueue, 0, kPacketQueueSize);
memset(MPReplyQueue, 0, kReplyQueueSize);
memset(&MPStatus, 0, sizeof(MPStatus));
memset(PacketReadOffset, 0, sizeof(PacketReadOffset));
memset(ReplyReadOffset, 0, sizeof(ReplyReadOffset));
Mutex_Unlock(MPQueueLock);
// prepare semaphores
// semaphores 0-15: regular frames; semaphore I is posted when instance I needs to process a new frame
// semaphores 16-31: MP replies; semaphore I is posted when instance I needs to process a new MP reply
SemPoolInit();
LastHostID = -1;
for (int i = 0; i < 32; i++)
{
SemPool[i] = Semaphore_Create();
}
Log(LogLevel::Info, "MP comm init OK\n");
RecvTimeout = 25;
return true;
}
void DeInit()
LocalMP::~LocalMP() noexcept
{
for (int i = 0; i < 32; i++)
{
@ -131,30 +63,25 @@ void DeInit()
Mutex_Free(MPQueueLock);
}
void SetRecvTimeout(int timeout)
{
RecvTimeout = timeout;
}
void Begin(int inst)
void LocalMP::Begin(int inst)
{
Mutex_Lock(MPQueueLock);
PacketReadOffset[inst] = MPStatus.PacketWriteOffset;
ReplyReadOffset[inst] = MPStatus.ReplyWriteOffset;
SemReset(inst);
SemReset(16+inst);
Semaphore_Reset(SemPool[inst]);
Semaphore_Reset(SemPool[16 + inst]);
MPStatus.ConnectedBitmask |= (1 << inst);
Mutex_Unlock(MPQueueLock);
}
void End(int inst)
void LocalMP::End(int inst)
{
Mutex_Lock(MPQueueLock);
MPStatus.ConnectedBitmask &= ~(1 << inst);
Mutex_Unlock(MPQueueLock);
}
void FIFORead(int inst, int fifo, void* buf, int len)
void LocalMP::FIFORead(int inst, int fifo, void* buf, int len) noexcept
{
u8* data;
@ -189,7 +116,7 @@ void FIFORead(int inst, int fifo, void* buf, int len)
else ReplyReadOffset[inst] = offset;
}
void FIFOWrite(int inst, int fifo, void* buf, int len)
void LocalMP::FIFOWrite(int inst, int fifo, void* buf, int len) noexcept
{
u8* data;
@ -224,7 +151,7 @@ void FIFOWrite(int inst, int fifo, void* buf, int len)
else MPStatus.ReplyWriteOffset = offset;
}
int SendPacketGeneric(int inst, u32 type, u8* packet, int len, u64 timestamp)
int LocalMP::SendPacketGeneric(int inst, u32 type, u8* packet, int len, u64 timestamp) noexcept
{
if (len > kMaxFrameSize)
{
@ -258,7 +185,7 @@ int SendPacketGeneric(int inst, u32 type, u8* packet, int len, u64 timestamp)
MPStatus.MPHostinst = inst;
MPStatus.MPReplyBitmask = 0;
ReplyReadOffset[inst] = MPStatus.ReplyWriteOffset;
SemReset(16 + inst);
Semaphore_Reset(SemPool[16 + inst]);
}
else if (type == 2)
{
@ -269,39 +196,39 @@ int SendPacketGeneric(int inst, u32 type, u8* packet, int len, u64 timestamp)
if (type == 2)
{
SemPost(16 + MPStatus.MPHostinst);
Semaphore_Post(SemPool[16 + MPStatus.MPHostinst]);
}
else
{
for (int i = 0; i < 16; i++)
{
if (mask & (1<<i))
SemPost(i);
Semaphore_Post(SemPool[i]);
}
}
return len;
}
int RecvPacketGeneric(int inst, u8* packet, bool block, u64* timestamp)
int LocalMP::RecvPacketGeneric(int inst, u8* packet, bool block, u64* timestamp) noexcept
{
for (;;)
{
if (!SemWait(inst, block ? RecvTimeout : 0))
if (!Semaphore_TryWait(SemPool[inst], block ? RecvTimeout : 0))
{
return 0;
}
Mutex_Lock(MPQueueLock);
MPPacketHeader pktheader;
MPPacketHeader pktheader = {};
FIFORead(inst, 0, &pktheader, sizeof(pktheader));
if (pktheader.Magic != 0x4946494E)
{
Log(LogLevel::Warn, "PACKET FIFO OVERFLOW\n");
PacketReadOffset[inst] = MPStatus.PacketWriteOffset;
SemReset(inst);
Semaphore_Reset(SemPool[inst]);
Mutex_Unlock(MPQueueLock);
return 0;
}
@ -331,33 +258,32 @@ int RecvPacketGeneric(int inst, u8* packet, bool block, u64* timestamp)
}
}
int SendPacket(int inst, u8* packet, int len, u64 timestamp)
int LocalMP::SendPacket(int inst, u8* packet, int len, u64 timestamp)
{
return SendPacketGeneric(inst, 0, packet, len, timestamp);
}
int RecvPacket(int inst, u8* packet, u64* timestamp)
int LocalMP::RecvPacket(int inst, u8* packet, u64* timestamp)
{
return RecvPacketGeneric(inst, packet, false, timestamp);
}
int SendCmd(int inst, u8* packet, int len, u64 timestamp)
int LocalMP::SendCmd(int inst, u8* packet, int len, u64 timestamp)
{
return SendPacketGeneric(inst, 1, packet, len, timestamp);
}
int SendReply(int inst, u8* packet, int len, u64 timestamp, u16 aid)
int LocalMP::SendReply(int inst, u8* packet, int len, u64 timestamp, u16 aid)
{
return SendPacketGeneric(inst, 2 | (aid<<16), packet, len, timestamp);
}
int SendAck(int inst, u8* packet, int len, u64 timestamp)
int LocalMP::SendAck(int inst, u8* packet, int len, u64 timestamp)
{
return SendPacketGeneric(inst, 3, packet, len, timestamp);
}
int RecvHostPacket(int inst, u8* packet, u64* timestamp)
int LocalMP::RecvHostPacket(int inst, u8* packet, u64* timestamp)
{
if (LastHostID != -1)
{
@ -372,7 +298,7 @@ int RecvHostPacket(int inst, u8* packet, u64* timestamp)
return RecvPacketGeneric(inst, packet, true, timestamp);
}
u16 RecvReplies(int inst, u8* packets, u64 timestamp, u16 aidmask)
u16 LocalMP::RecvReplies(int inst, u8* packets, u64 timestamp, u16 aidmask)
{
u16 ret = 0;
u16 myinstmask = (1 << inst);
@ -386,7 +312,7 @@ u16 RecvReplies(int inst, u8* packets, u64 timestamp, u16 aidmask)
for (;;)
{
if (!SemWait(16+inst, RecvTimeout))
if (!Semaphore_TryWait(SemPool[16+inst], RecvTimeout))
{
// no more replies available
return ret;
@ -394,14 +320,14 @@ u16 RecvReplies(int inst, u8* packets, u64 timestamp, u16 aidmask)
Mutex_Lock(MPQueueLock);
MPPacketHeader pktheader;
MPPacketHeader pktheader = {};
FIFORead(inst, 1, &pktheader, sizeof(pktheader));
if (pktheader.Magic != 0x4946494E)
{
Log(LogLevel::Warn, "REPLY FIFO OVERFLOW\n");
ReplyReadOffset[inst] = MPStatus.ReplyWriteOffset;
SemReset(16+inst);
Semaphore_Reset(SemPool[16 + inst]);
Mutex_Unlock(MPQueueLock);
return 0;
}