From 3c89e425c48f214b8bd3b0cd5c9f31593105c32b Mon Sep 17 00:00:00 2001 From: Filippo Mutta Date: Tue, 16 Jul 2024 16:38:27 +0200 Subject: [PATCH 1/3] Fixed shdmem bugs and handled fd streaming --- examples/xlink_client_local.cpp | 37 +++++++++-------- examples/xlink_server_local.cpp | 36 ++++++++-------- src/pc/PlatformData.c | 45 -------------------- src/pc/protocols/local_memshd.cpp | 32 +++++++++------ src/pc/protocols/tcpip_memshd.cpp | 2 - src/shared/XLinkData.c | 47 +++++++++++++++++++-- src/shared/XLinkDispatcher.c | 1 + src/shared/XLinkDispatcherImpl.c | 68 +++++++++++++++++++++++++++---- 8 files changed, 161 insertions(+), 107 deletions(-) diff --git a/examples/xlink_client_local.cpp b/examples/xlink_client_local.cpp index 604cf18..eb0f0e2 100644 --- a/examples/xlink_client_local.cpp +++ b/examples/xlink_client_local.cpp @@ -40,25 +40,25 @@ int main(int argc, const char** argv){ streamPacketDesc_t *packet; - auto s = XLinkOpenStream(0, "test", 1024); + auto s = XLinkOpenStream(0, "test", 1024 * 1024); assert(s != INVALID_STREAM_ID); // Read the data packet containing the FD auto r = XLinkReadData(s, &packet); assert(r == X_LINK_SUCCESS); + void *sharedMemAddr; long receivedFd = packet->fd; if (receivedFd < 0) { printf("Not a valid FD, data streamed through message\n"); - return 1; - } - - // Map the shared memory - void *sharedMemAddr = - mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0); - if (sharedMemAddr == MAP_FAILED) { + sharedMemAddr = packet->data; + } else { + // Map the shared memory + sharedMemAddr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0); + if (sharedMemAddr == MAP_FAILED) { perror("mmap"); return 1; + } } // Read and print the message from shared memory @@ -69,20 +69,20 @@ int main(int argc, const char** argv){ assert(w == X_LINK_SUCCESS); const char *shmName = SHARED_MEMORY_NAME; - long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666); + long shmFd = memfd_create(shmName, 0); if (shmFd < 0) { - perror("shm_open"); - return 1; + perror("shm_open"); + return 1; } ftruncate(shmFd, MAXIMUM_SHM_SIZE); void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0); if (addr == MAP_FAILED) { - perror("mmap"); - close(shmFd); - shm_unlink(shmName); - return 1; + perror("mmap"); + close(shmFd); + shm_unlink(shmName); + return 1; } // Write a message to the shared memory @@ -94,12 +94,13 @@ int main(int argc, const char** argv){ assert(w == X_LINK_SUCCESS); r = XLinkReadData(s, &packet); - assert(w == X_LINK_SUCCESS); + assert(r == X_LINK_SUCCESS); printf("Message from Process A: %s\n", (char *)(packet->data)); - - munmap(sharedMemAddr, MAXIMUM_SHM_SIZE); + if (receivedFd >= 0) { + munmap(sharedMemAddr, MAXIMUM_SHM_SIZE); + } munmap(addr, MAXIMUM_SHM_SIZE); close(shmFd); diff --git a/examples/xlink_server_local.cpp b/examples/xlink_server_local.cpp index 074da73..e2b4d64 100644 --- a/examples/xlink_server_local.cpp +++ b/examples/xlink_server_local.cpp @@ -37,24 +37,24 @@ int main(int argc, const char** argv){ return 1; } - auto s = XLinkOpenStream(0, "test", 1024); + auto s = XLinkOpenStream(0, "test", 1024 * 1024); assert(s != INVALID_STREAM_ID); const char *shmName = SHARED_MEMORY_NAME; - long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666); + long shmFd = memfd_create(shmName, 0); if (shmFd < 0) { - perror("shm_open"); - return 1; + perror("shm_open"); + return 1; } ftruncate(shmFd, MAXIMUM_SHM_SIZE); void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0); if (addr == MAP_FAILED) { - perror("mmap"); - close(shmFd); - shm_unlink(shmName); - return 1; + perror("mmap"); + close(shmFd); + shm_unlink(shmName); + return 1; } // Write a message to the shared memory @@ -67,7 +67,7 @@ int main(int argc, const char** argv){ streamPacketDesc_t *packet; auto r = XLinkReadData(s, &packet); - assert(w == X_LINK_SUCCESS); + assert(r == X_LINK_SUCCESS); printf("Message from Process B: %s\n", (char *)(packet->data)); @@ -75,18 +75,18 @@ int main(int argc, const char** argv){ r = XLinkReadData(s, &packet); assert(r == X_LINK_SUCCESS); + void *sharedMemAddr; long receivedFd = packet->fd; if (receivedFd < 0) { printf("Not a valid FD, data streamed through message\n"); - return 1; - } - - // Map the shared memory - void *sharedMemAddr = - mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0); - if (sharedMemAddr == MAP_FAILED) { + sharedMemAddr = packet->data; + } else { + // Map the shared memory + sharedMemAddr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0); + if (sharedMemAddr == MAP_FAILED) { perror("mmap"); return 1; + } } // Read and print the message from shared memory @@ -96,7 +96,9 @@ int main(int argc, const char** argv){ w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1); assert(w == X_LINK_SUCCESS); - munmap(sharedMemAddr, MAXIMUM_SHM_SIZE); + if (receivedFd >= 0) { + munmap(sharedMemAddr, MAXIMUM_SHM_SIZE); + } munmap(addr, MAXIMUM_SHM_SIZE); close(shmFd); diff --git a/src/pc/PlatformData.c b/src/pc/PlatformData.c index 10918c4..c56cc89 100644 --- a/src/pc/PlatformData.c +++ b/src/pc/PlatformData.c @@ -117,51 +117,6 @@ int XLinkPlatformWriteFd(xLinkDeviceHandle_t *deviceHandle, const long fd, void #if defined(__unix__) case X_LINK_LOCAL_SHDMEM: return shdmemPlatformWriteFd(deviceHandle->xLinkFD, fd, data2, size2); - - case X_LINK_USB_VSC: - case X_LINK_USB_CDC: - case X_LINK_PCIE: - case X_LINK_TCP_IP: - { - if (fd <= 0) { - return X_LINK_ERROR; - } - - // Determine file size through fstat - struct stat fileStats; - fstat(fd, &fileStats); - int size = fileStats.st_size; - - // mmap the fine in memory - void *addr = mmap(NULL, 4096, PROT_READ, MAP_SHARED, fd, 0); - if (addr == MAP_FAILED) { - mvLog(MVLOG_ERROR, "Failed to mmap file to stream it over\n"); - return X_LINK_ERROR; - } - - // Use the respective write function to copy and send the message - int result = X_LINK_ERROR; - switch(deviceHandle->protocol) { - case X_LINK_USB_VSC: - case X_LINK_USB_CDC: - result = usbPlatformWrite(deviceHandle->xLinkFD, addr, size); - break; - case X_LINK_PCIE: - result = pciePlatformWrite(deviceHandle->xLinkFD, addr, size); - break; - case X_LINK_TCP_IP: - result = tcpipPlatformWrite(deviceHandle->xLinkFD, addr, size); - break; - default: - result = X_LINK_PLATFORM_INVALID_PARAMETERS; - break; - } - - // Unmap file - munmap(addr, size); - - return result; - } #endif case X_LINK_TCP_IP_OR_LOCAL_SHDMEM: mvLog(MVLOG_ERROR, "Failed to write FD with TCP_IP_OR_LOCAL_SHDMEM\n"); diff --git a/src/pc/protocols/local_memshd.cpp b/src/pc/protocols/local_memshd.cpp index d7b4161..74811f9 100644 --- a/src/pc/protocols/local_memshd.cpp +++ b/src/pc/protocols/local_memshd.cpp @@ -18,6 +18,7 @@ #include #include #include +#include int shdmem_initialize() { mvLog(MVLOG_DEBUG, "Shared memory initialized\n"); @@ -116,10 +117,12 @@ int shdmemPlatformRead(void *desc, void *data, int size, long *fd) { struct msghdr msg = {}; struct iovec iov; - iov.iov_base = data; - iov.iov_len = size; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; + if (data != NULL && size > 0) { + iov.iov_base = data; + iov.iov_len = size; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + } char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))]; msg.msg_control = ancillaryElementBuffer; @@ -127,7 +130,7 @@ int shdmemPlatformRead(void *desc, void *data, int size, long *fd) { int bytes; if(bytes = recvmsg(socketFd, &msg, MSG_WAITALL) < 0) { - mvLog(MVLOG_ERROR, "Failed to recieve message"); + mvLog(MVLOG_ERROR, "Failed to recieve message: %s", strerror(errno)); return X_LINK_ERROR; } @@ -141,7 +144,7 @@ int shdmemPlatformRead(void *desc, void *data, int size, long *fd) { *fd = recvFd; } - return bytes; + return 0; } int shdmemPlatformWrite(void *desc, void *data, int size) { @@ -160,11 +163,11 @@ int shdmemPlatformWrite(void *desc, void *data, int size) { int bytes; if(bytes = sendmsg(socketFd, &msg, 0) < 0) { - mvLog(MVLOG_ERROR, "Failed to send message\n"); + mvLog(MVLOG_ERROR, "Failed to send message: %s\n", strerror(errno)); return X_LINK_ERROR; } - return bytes; + return 0; } int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) { @@ -176,19 +179,22 @@ int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) { struct msghdr msg = {}; struct iovec iov; - char buf[1] = {0}; // Buffer for single byte of data to send + long buf[1] = {0}; // Buffer for single byte of data to send if (data2 != NULL && size2 > 0) { iov.iov_base = data2; iov.iov_len = size2; - } else { + } else if (fd >= 0) { iov.iov_base = buf; iov.iov_len = sizeof(buf); + } else { + return 0; } + msg.msg_iov = &iov; msg.msg_iovlen = 1; + char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))]; if (fd >= 0) { - char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))]; msg.msg_control = ancillaryElementBuffer; msg.msg_controllen = sizeof(ancillaryElementBuffer); @@ -202,11 +208,11 @@ int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) { int bytes; if(bytes = sendmsg(socketFd, &msg, 0) < 0) { - mvLog(MVLOG_ERROR, "Failed to send message"); + mvLog(MVLOG_ERROR, "Failed to send message: %s", strerror(errno)); return X_LINK_ERROR; } - return bytes; + return 0; } int shdmemSetProtocol(XLinkProtocol_t *protocol, const char* devPathRead, const char* devPathWrite) { diff --git a/src/pc/protocols/tcpip_memshd.cpp b/src/pc/protocols/tcpip_memshd.cpp index db2be93..b31a6c7 100644 --- a/src/pc/protocols/tcpip_memshd.cpp +++ b/src/pc/protocols/tcpip_memshd.cpp @@ -118,12 +118,10 @@ int tcpipOrLocalShdmemPlatformServer(XLinkProtocol_t *protocol, const char *devP int tcpipOrLocalShdmemPlatformConnect(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd) { if(shdmemPlatformConnect(SHDMEM_DEFAULT_SOCKET, SHDMEM_DEFAULT_SOCKET, fd) == X_LINK_SUCCESS) { - mvLog(MVLOG_ERROR, "Failed to connect with SHDMEM"); return shdmemSetProtocol(protocol, devPathRead, devPathWrite); } if (tcpipPlatformConnect(devPathRead, devPathWrite, fd) == X_LINK_SUCCESS) { - mvLog(MVLOG_ERROR, "Failed to connect with TCP/IP"); *protocol = X_LINK_TCP_IP; return X_LINK_SUCCESS; } diff --git a/src/shared/XLinkData.c b/src/shared/XLinkData.c index 4c5a858..bf3a885 100644 --- a/src/shared/XLinkData.c +++ b/src/shared/XLinkData.c @@ -25,6 +25,10 @@ #include "XLinkLog.h" #include "XLinkStringUtils.h" +#ifdef __unix__ +#include +#endif + // ------------------------------------ // Helpers declaration. Begin. // ------------------------------------ @@ -158,14 +162,32 @@ XLinkError_t XLinkWriteFd_(streamId_t streamId, const long fd, XLinkTimespec* ou event.data2 = (void*)NULL; event.data2Size = -1; + int size = sizeof(long); +#if defined(__unix__) + if (event.deviceHandle.protocol != X_LINK_LOCAL_SHDMEM && + event.header.type == XLINK_WRITE_FD_REQ) { + + if (fd >= 0) { + // Determine file size through fstat + struct stat fileStats; + fstat(fd, &fileStats); + size = fileStats.st_size; + + if (size > 0) { + event.header.size = size; + } + } + } +#endif + XLINK_RET_IF(addEventWithPerf_(&event, &opTime, XLINK_NO_RW_TIMEOUT, outTSend)); if( glHandler->profEnable) { - glHandler->profilingData.totalWriteBytes += sizeof(long); + glHandler->profilingData.totalWriteBytes += size; glHandler->profilingData.totalWriteTime += opTime; } - link->profilingData.totalWriteBytes += sizeof(long); - link->profilingData.totalWriteTime += sizeof(long); + link->profilingData.totalWriteBytes += size; + link->profilingData.totalWriteTime += size; return X_LINK_SUCCESS; } @@ -182,9 +204,28 @@ XLinkError_t XLinkWriteFdData(streamId_t streamId, const long fd, const uint8_t* int totalSize = dataSize; xLinkEvent_t event = {0}; XLINK_INIT_EVENT(event, streamId, XLINK_WRITE_FD_REQ, totalSize, (void*)fd, link->deviceHandle); + event.data2 = (void*)dataBuffer; event.data2Size = dataSize; +#if defined(__unix__) + if (event.deviceHandle.protocol != X_LINK_LOCAL_SHDMEM && + event.header.type == XLINK_WRITE_FD_REQ) { + + if (fd >= 0) { + // Determine file size through fstat + struct stat fileStats; + fstat(fd, &fileStats); + int size = fileStats.st_size; + + if (size > 0) { + event.header.size += size; + totalSize += size; + } + } + } +#endif + XLINK_RET_IF(addEventWithPerf(&event, &opTime, XLINK_NO_RW_TIMEOUT)); if( glHandler->profEnable) { diff --git a/src/shared/XLinkDispatcher.c b/src/shared/XLinkDispatcher.c index 01aec98..95e066f 100644 --- a/src/shared/XLinkDispatcher.c +++ b/src/shared/XLinkDispatcher.c @@ -366,6 +366,7 @@ xLinkEvent_t* DispatcherAddEvent_(xLinkEventOrigin_t origin, xLinkEvent_t *event if(curr->resetXLink) { return NULL; } + mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin); int rc; while(((rc = XLink_sem_wait(&curr->addEventSem)) == -1) && errno == EINTR) diff --git a/src/shared/XLinkDispatcherImpl.c b/src/shared/XLinkDispatcherImpl.c index 3696942..c1c39cc 100644 --- a/src/shared/XLinkDispatcherImpl.c +++ b/src/shared/XLinkDispatcherImpl.c @@ -20,6 +20,10 @@ #include "XLinkLog.h" #include "XLinkStringUtils.h" +#ifdef __unix__ +#include +#endif + // ------------------------------------ // Helpers declaration. Begin. // ------------------------------------ @@ -148,15 +152,50 @@ int writeEventMultipart(xLinkDeviceHandle_t* deviceHandle, void* data, int total int writeFdEventMultipart(xLinkDeviceHandle_t* deviceHandle, long fd, int totalSize, void* data2, int data2Size) { - // Regular, single-part case - if(data2 == NULL || data2Size <= 0) { - return XLinkPlatformWriteFd(deviceHandle, fd, NULL, -1); + void *mmapAddr = NULL; + int mmapSize = totalSize - data2Size; + +#ifdef __unix__ + // mmap the fine in memory + if(deviceHandle->protocol != X_LINK_LOCAL_SHDMEM) { + mmapAddr = mmap(NULL, mmapSize, PROT_READ, MAP_SHARED, fd, 0); + if (mmapAddr == MAP_FAILED) { + mvLog(MVLOG_ERROR, "Failed to mmap file to stream it over\n"); + return X_LINK_ERROR; + } + + if(data2 == NULL || data2Size <= 0) { + return XLinkPlatformWrite(deviceHandle, mmapAddr, mmapSize); + } + + fd = -1; + } else { + if(data2 == NULL || data2Size <= 0) { + return XLinkPlatformWriteFd(deviceHandle, fd, NULL, -1); + } } +#endif // Multipart case int errorCode = 0; - void *dataToWrite[] = {data2, NULL}; - int sizeToWrite[] = {data2Size, 0}; + void *dataToWrite[3]; + int sizeToWrite[3]; + + if(deviceHandle->protocol != X_LINK_LOCAL_SHDMEM) { + dataToWrite[0] = mmapAddr; + sizeToWrite[0] = mmapSize; + dataToWrite[1] = data2; + sizeToWrite[1] = data2Size; + dataToWrite[2] = NULL; + sizeToWrite[2] = 0; + } else { + dataToWrite[0] = data2; + sizeToWrite[0] = data2Size; + dataToWrite[1] = NULL; + sizeToWrite[1] = 0; + dataToWrite[2] = NULL; + sizeToWrite[2] = 0; + } int writtenByteCount = 0, toWrite = 0, rc = 0; @@ -202,8 +241,12 @@ int writeFdEventMultipart(xLinkDeviceHandle_t* deviceHandle, long fd, int totalS ? pktlen : (totalSizeToWrite - writtenByteCount); - rc = XLinkPlatformWriteFd(deviceHandle, fd, &((char *)currentPacket)[writtenByteCount - byteCountRelativeOffset + previousSplitWriteSize], toWrite); - fd = -1; + if(deviceHandle->protocol != X_LINK_LOCAL_SHDMEM || fd == -1) { + rc = XLinkPlatformWrite(deviceHandle, &((char *)currentPacket)[writtenByteCount - byteCountRelativeOffset + previousSplitWriteSize], toWrite); + } else { + rc = XLinkPlatformWriteFd(deviceHandle, fd, &((char *)currentPacket)[writtenByteCount - byteCountRelativeOffset + previousSplitWriteSize], toWrite); + fd = -1; + } if (rc < 0) { @@ -227,8 +270,14 @@ int writeFdEventMultipart(xLinkDeviceHandle_t* deviceHandle, long fd, int totalS } toWrite = remainingToWriteCurrent + remainingToWriteNext; if(toWrite > xlinkPacketSizeMultiply) ASSERT_XLINK(0); - rc = XLinkPlatformWriteFd(deviceHandle, fd, swapSpace, toWrite); - fd = -1; + + if(deviceHandle->protocol != X_LINK_LOCAL_SHDMEM || fd == -1) { + rc = XLinkPlatformWrite(deviceHandle, swapSpace, toWrite); + } else { + rc = XLinkPlatformWriteFd(deviceHandle, fd, swapSpace, toWrite); + fd = -1; + } + if (rc < 0) { errorCode = rc; @@ -246,6 +295,7 @@ int writeFdEventMultipart(xLinkDeviceHandle_t* deviceHandle, long fd, int totalS } function_epilogue: + if (mmapAddr != NULL) munmap(mmapAddr, mmapSize); if (errorCode) return errorCode; return writtenByteCount; } From 9aab22206b3bb930e76b02a01fd9cf15545e1ec0 Mon Sep 17 00:00:00 2001 From: Filippo Mutta Date: Tue, 16 Jul 2024 16:42:53 +0200 Subject: [PATCH 2/3] Added conditional compilation. --- src/shared/XLinkDispatcherImpl.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/shared/XLinkDispatcherImpl.c b/src/shared/XLinkDispatcherImpl.c index c1c39cc..e0d8428 100644 --- a/src/shared/XLinkDispatcherImpl.c +++ b/src/shared/XLinkDispatcherImpl.c @@ -295,7 +295,9 @@ int writeFdEventMultipart(xLinkDeviceHandle_t* deviceHandle, long fd, int totalS } function_epilogue: +#ifdef __unix__ if (mmapAddr != NULL) munmap(mmapAddr, mmapSize); +#endif if (errorCode) return errorCode; return writtenByteCount; } From 188bce950f17f0e8c6f285e84bf9bac06038353a Mon Sep 17 00:00:00 2001 From: Filippo Mutta Date: Tue, 16 Jul 2024 16:57:16 +0200 Subject: [PATCH 3/3] Conditional compilation for escluding some examples on MacOS --- examples/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index e1691ab..65b690e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -49,8 +49,8 @@ add_example(xlink_server2 xlink_server2.cpp) add_example(device_connect_reset device_connect_reset.cpp) # Local shared memory example -if (UNIX) +if (UNIX AND !APPLE) add_example(xlink_server_local xlink_server_local.cpp) add_example(xlink_client_local xlink_client_local.cpp) -endif (UNIX) +endif (UNIX AND !APPLE)