From f1f4449fc4aa42b2546dee6ba91d1c38499f29ad Mon Sep 17 00:00:00 2001 From: Avi Saranga Date: Tue, 24 Jul 2018 16:01:29 -0700 Subject: [PATCH] Plugin threading changes to fix instancing issues --- .../objects/TelmateFrameGrabberImpl.cpp | 31 +- .../objects/TelmateFrameGrabberImpl.hpp | 5 +- .../objects/TelmateFrameGrabberOpenCVImpl.cpp | 328 ++++++++++-------- .../objects/TelmateFrameGrabberOpenCVImpl.hpp | 82 +++-- .../interface/telmateframegrabber.kmd.json | 2 +- 5 files changed, 244 insertions(+), 204 deletions(-) diff --git a/module/src/server/implementation/objects/TelmateFrameGrabberImpl.cpp b/module/src/server/implementation/objects/TelmateFrameGrabberImpl.cpp index 6bd8779..e561162 100644 --- a/module/src/server/implementation/objects/TelmateFrameGrabberImpl.cpp +++ b/module/src/server/implementation/objects/TelmateFrameGrabberImpl.cpp @@ -19,57 +19,44 @@ namespace module namespace telmateframegrabber { -TelmateFrameGrabberImpl::TelmateFrameGrabberImpl (const boost::property_tree::ptree &config, - std::shared_ptr mediaPipeline) : - OpenCVFilterImpl (config, std::dynamic_pointer_cast (mediaPipeline) ) +TelmateFrameGrabberImpl::TelmateFrameGrabberImpl (const boost::property_tree::ptree &config, std::shared_ptr mediaPipeline) : OpenCVFilterImpl (config, std::dynamic_pointer_cast (mediaPipeline) ) { - - GST_DEBUG("TelmateFrameGrabberImpl::TelmateFrameGrabberImpl() called"); - - pTelmateFrameGrabberOpenCVImpl = new TelmateFrameGrabberOpenCVImpl(); - } -int TelmateFrameGrabberImpl::cleanup() +int TelmateFrameGrabberImpl::cleanup () { - pTelmateFrameGrabberOpenCVImpl->cleanup(); - delete pTelmateFrameGrabberOpenCVImpl; - pTelmateFrameGrabberOpenCVImpl = NULL; - return 1; + return TelmateFrameGrabberOpenCVImpl::cleanup (); } int TelmateFrameGrabberImpl::getSnapInterval () { - return pTelmateFrameGrabberOpenCVImpl->snapInterval; + return TelmateFrameGrabberOpenCVImpl::getSnapInterval (); } void TelmateFrameGrabberImpl::setSnapInterval (int snapInterval) { - pTelmateFrameGrabberOpenCVImpl->snapInterval = snapInterval; - return; + return TelmateFrameGrabberOpenCVImpl::setSnapInterval(snapInterval); } std::string TelmateFrameGrabberImpl::getStoragePath () { - return pTelmateFrameGrabberOpenCVImpl->storagePath; + return TelmateFrameGrabberOpenCVImpl::getStoragePath (); } void TelmateFrameGrabberImpl::setStoragePath (const std::string &path) { - pTelmateFrameGrabberOpenCVImpl->storagePath = path; + return TelmateFrameGrabberOpenCVImpl::setStoragePath (path); } void TelmateFrameGrabberImpl::setWebRtcEpName (const std::string &epName) { - pTelmateFrameGrabberOpenCVImpl->epName = epName; - return; + return TelmateFrameGrabberOpenCVImpl::setWebRtcEpName (epName); } void TelmateFrameGrabberImpl::setOutputFormat (int outputFormat) { - pTelmateFrameGrabberOpenCVImpl->outputFormat = outputFormat; - return; + return TelmateFrameGrabberOpenCVImpl::setOutputFormat (outputFormat); } MediaObjectImpl * diff --git a/module/src/server/implementation/objects/TelmateFrameGrabberImpl.hpp b/module/src/server/implementation/objects/TelmateFrameGrabberImpl.hpp index 4b0fbe7..42c18e3 100644 --- a/module/src/server/implementation/objects/TelmateFrameGrabberImpl.hpp +++ b/module/src/server/implementation/objects/TelmateFrameGrabberImpl.hpp @@ -46,13 +46,13 @@ class TelmateFrameGrabberImpl : public OpenCVFilterImpl, public virtual TelmateF virtual ~TelmateFrameGrabberImpl () {}; + int cleanup (); int getSnapInterval (); void setSnapInterval (int snapInterval); std::string getStoragePath (); void setStoragePath (const std::string &path); void setWebRtcEpName (const std::string &epName); void setOutputFormat (int outputFormat); - int cleanup(); /* Next methods are automatically implemented by code generator */ virtual bool connect (const std::string &eventType, std::shared_ptr handler); @@ -64,9 +64,6 @@ class TelmateFrameGrabberImpl : public OpenCVFilterImpl, public virtual TelmateF private: - /*GstElement *telmateframegrabberopencvimpl{};*/ - TelmateFrameGrabberOpenCVImpl *pTelmateFrameGrabberOpenCVImpl; - class StaticConstructor { public: diff --git a/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.cpp b/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.cpp index a4c0279..59d5742 100644 --- a/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.cpp +++ b/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.cpp @@ -1,91 +1,94 @@ /* Autogenerated with kurento-module-creator */ #include "TelmateFrameGrabberOpenCVImpl.hpp" -#include #include -#include -#include - -#define GST_CAT_DEFAULT kurento_telmate_frame_grabber_opencv_impl +#define GST_CAT_DEFAULT telmate_frame_grabber_opencv_impl GST_DEBUG_CATEGORY_STATIC(GST_CAT_DEFAULT); -#define GST_DEFAULT_NAME "KurentoTelmateFrameGrabberOpenCVImpl" - -namespace kurento { +#define GST_DEFAULT_NAME "TelmateFrameGrabberOpenCVImpl" -TelmateFrameGrabberOpenCVImpl::TelmateFrameGrabberOpenCVImpl() { - GST_DEBUG_CATEGORY_INIT(GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, - GST_DEFAULT_NAME); +namespace kurento +{ +namespace module +{ +namespace telmateframegrabber +{ +TelmateFrameGrabberOpenCVImpl::TelmateFrameGrabberOpenCVImpl () +{ - this->thrLoop = true; - this->snapInterval = 1000; - this->epName = "EP_NAME_UNINITIALIZED"; - this->storagePath = "/tmp/"; - this->prevStoragePath = this->storagePath; - this->framesCounter = 0; - this->outputFormat = FGFMT_JPEG; - this->lastQueueTimeStamp = 0; - this->queueLength = 0; - this->frameQueue = new avis_blocking_queue; + GST_DEBUG_CATEGORY_INIT(GST_CAT_DEFAULT, GST_DEFAULT_NAME, 0, + GST_DEFAULT_NAME); - this->thr = new boost::thread(boost::bind( - &TelmateFrameGrabberOpenCVImpl::queueHandler, this)); - this->thr->detach(); + this->thrLoop = true; + this->snapInterval = 1000; + this->epName = "EP_NAME_UNINITIALIZED"; + this->storagePath = "/tmp/"; + this->framesCounter = 0; + this->outputFormat = FGFMT_JPEG; + this->lastQueueTimeStamp = 0; + this->queueLength = 0; + this->frameQueue = new avis_blocking_queue; - GST_INFO("Constructor was called for %s", this->epName.c_str()); -} + this->thr = new boost::thread(boost::bind( + &TelmateFrameGrabberOpenCVImpl::queueHandler, this)); + this->thr->detach(); + GST_INFO("Constructor was called for %s", this->epName.c_str()); -TelmateFrameGrabberOpenCVImpl::~TelmateFrameGrabberOpenCVImpl() { - GST_INFO("Destructor was called for %s", this->epName.c_str()); } -void TelmateFrameGrabberOpenCVImpl::cleanup() { +TelmateFrameGrabberOpenCVImpl::~TelmateFrameGrabberOpenCVImpl() { - VideoFrame *ptrVf; + VideoFrame *ptrVf; + this->thrLoop = false; + boost::this_thread::sleep_for(boost::chrono::milliseconds(250)); - while(!this->frameQueue->empty()) { - this->frameQueue->pop(ptrVf); // blocks - --this->queueLength; + + while(this->queueLength > 0) { + this->frameQueue->pop(ptrVf); // blocks + --this->queueLength; + if(ptrVf != NULL) { delete ptrVf; ptrVf = NULL; - } + } - this->thrLoop = false; - boost::this_thread::sleep_for(boost::chrono::milliseconds(250)); /* Give the processing thread some time to exit() */ - GST_INFO("Called release() for %s :: Dequeue completed.", this->epName.c_str()); + delete this->frameQueue; + this->frameQueue = NULL; + + GST_INFO("Destructor was called for %s", this->epName.c_str()); - delete this->frameQueue; - this->frameQueue = NULL; - return; } +int TelmateFrameGrabberOpenCVImpl::cleanup() { + /* for now do nothing, we may need to trigger our destructor from this */ + return 1; +} /* - * This function will be called with each new frame. mat variable - * contains the current frame. You should insert your image processing code - * here. Any changes in mat, will be sent through the Media Pipeline. - */ +* This function will be called with each new frame. mat variable +* contains the current frame. You should insert your image processing code +* here. Any changes in mat, will be sent through the Media Pipeline. +*/ void TelmateFrameGrabberOpenCVImpl::process(cv::Mat &mat) { - if ((this->getCurrentTimestampLong() - this->lastQueueTimeStamp) >= this->snapInterval) { - - if(this->thrLoop) { // do not push into the queue if the destructor was called. - this->lastQueueTimeStamp = this->getCurrentTimestampLong(); - VideoFrame *ptrVf = new VideoFrame(); - ptrVf->mat = mat.clone(); - ptrVf->ts = std::to_string(this->lastQueueTimeStamp); - - this->frameQueue->push(ptrVf); - ++this->queueLength; - ++this->framesCounter; - } - } + if ((this->getCurrentTimestampLong() - this->lastQueueTimeStamp) >= this->snapInterval) { + + if(this->thrLoop) { // do not push into the queue if the destructor was called. + this->lastQueueTimeStamp = this->getCurrentTimestampLong(); + VideoFrame *ptrVf = new VideoFrame(); + ptrVf->mat = mat.clone(); + ptrVf->ts = std::to_string(this->lastQueueTimeStamp); + + this->frameQueue->push(ptrVf); + ++this->queueLength; + ++this->framesCounter; + } + } } /* * This function is executed inside the queueHandler thread as a main() function. @@ -95,100 +98,145 @@ void TelmateFrameGrabberOpenCVImpl::process(cv::Mat &mat) { * to ensure the cpu isn't exhausted while the queue is empty. */ void TelmateFrameGrabberOpenCVImpl::queueHandler() { - VideoFrame *ptrVf; - cv::Mat image; - std::vector params; - std::string image_extension; - - while (this->thrLoop) { - - this->frameQueue->pop(ptrVf); // blocks - params.clear(); // clear the vector since the last iteration. - this->lastQueueTimeStamp = this->getCurrentTimestampLong(); - --this->queueLength; - - switch (this->outputFormat) { - case FGFMT_JPEG: - /* Set jpeg params */ - params.push_back(CV_IMWRITE_JPEG_QUALITY); - params.push_back(FG_JPEG_QUALITY); - image_extension = ".jpeg"; - break; - case FGFMT_PNG: - /* Set PNG parameters, compression etc. */ - params.push_back(CV_IMWRITE_PNG_COMPRESSION); - params.push_back(FG_PNG_QUALITY); - image_extension = ".png"; - break; - default: - /* Defaults to jpeg */ - params.push_back(CV_IMWRITE_JPEG_QUALITY); - params.push_back(FG_JPEG_QUALITY); - image_extension = ".jpeg"; - break; - } - - std::string filename = - std::to_string((long) this->framesCounter) + "_" + ptrVf->ts + image_extension; - - - if(this->storagePath != this->prevStoragePath) { - this->storagePathSubdir.clear(); - GST_INFO("Storage path has changed. old storage path: %s. new storage path: %s",this->prevStoragePath.c_str(), this->storagePathSubdir.c_str()); - - } - - if (this->storagePathSubdir.empty()) { - - this->prevStoragePath = this->storagePath; - this->storagePathSubdir = this->storagePath + "/frames_" + this->getCurrentTimestampString(); - boost::filesystem::path dir(this->storagePathSubdir.c_str()); - GST_INFO("going to create a directory in %s", this->storagePathSubdir.c_str()); - if (!boost::filesystem::create_directories(dir)) { - GST_ERROR("%s create_directories() failed for: %s", this->epName.c_str(), - this->storagePathSubdir.c_str()); - } - - } - - std::string fullpath = this->storagePathSubdir + "/" + filename; - - try { - cv::imwrite(fullpath.c_str(), ptrVf->mat, params); - } - catch (...) { - GST_ERROR("::queueHandler() imgwrite() failed."); - throw KurentoException(NOT_IMPLEMENTED, - "TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed. \n"); - } - - ptrVf->mat.release(); // release internal memory allocations - - delete ptrVf; - ptrVf = NULL; + VideoFrame *ptrVf; + cv::Mat image; + std::vector params; + std::string image_extension; + + try { + + while (this->thrLoop) { + + this->frameQueue->pop(ptrVf); // blocks + params.clear(); // clear the vector since the last iteration. + this->lastQueueTimeStamp = this->getCurrentTimestampLong(); + --this->queueLength; + + switch (this->outputFormat) { + case FGFMT_JPEG: + /* Set jpeg params */ + params.push_back(CV_IMWRITE_JPEG_QUALITY); + params.push_back(FG_JPEG_QUALITY); + image_extension = ".jpeg"; + break; + case FGFMT_PNG: + /* Set PNG parameters, compression etc. */ + params.push_back(CV_IMWRITE_PNG_COMPRESSION); + params.push_back(FG_PNG_QUALITY); + image_extension = ".png"; + break; + default: + /* Defaults to jpeg */ + params.push_back(CV_IMWRITE_JPEG_QUALITY); + params.push_back(FG_JPEG_QUALITY); + image_extension = ".jpeg"; + break; } + std::string filename = + std::to_string((long) this->framesCounter) + "_" + ptrVf->ts + image_extension; + if (this->storagePathSubdir.empty()) { + + this->storagePathSubdir = this->storagePath + "/frames_" + this->getCurrentTimestampString(); + boost::filesystem::path dir(this->storagePathSubdir.c_str()); + GST_INFO("going to create a directory in %s", this->storagePathSubdir.c_str()); + if (!boost::filesystem::create_directories(dir)) { + GST_ERROR("%s create_directories() failed for: %s", this->epName.c_str(), + this->storagePathSubdir.c_str()); + } + + } + + std::string fullpath = this->storagePathSubdir + "/" + filename; + + try { + cv::imwrite(fullpath.c_str(), ptrVf->mat, params); + } + catch (...) { + GST_ERROR("::queueHandler() imgwrite() failed."); + throw KurentoException(NOT_IMPLEMENTED, + "TelmateFrameGrabberOpenCVImpl::queueHandler() imgwrite() failed. \n"); + } + + ptrVf->mat.release(); // release internal memory allocations + + delete ptrVf; + ptrVf = NULL; + //GST_INFO("STILL PROCCESSING."); + + } + + } catch(boost::thread_interrupted interrupt) { + + delete ptrVf; + ptrVf = NULL; + ptrVf->mat.release(); + + } + + GST_ERROR("queueHandler() of %s Thread exiting...",this->epName.c_str()); + } std::string TelmateFrameGrabberOpenCVImpl::getCurrentTimestampString() { - struct timeval tp; - long int ms; - std::stringstream sstr_ts; - - gettimeofday(&tp, NULL); - ms = tp.tv_sec * 1000 + tp.tv_usec / 1000; - sstr_ts << ms; - return sstr_ts.str(); + struct timeval tp; + long int ms; + std::stringstream sstr_ts; + + gettimeofday(&tp, NULL); + ms = tp.tv_sec * 1000 + tp.tv_usec / 1000; + sstr_ts << ms; + return sstr_ts.str(); } long TelmateFrameGrabberOpenCVImpl::getCurrentTimestampLong() { - struct timeval tp; + struct timeval tp; + + gettimeofday(&tp, NULL); + return (tp.tv_sec * 1000 + tp.tv_usec / 1000); +} - gettimeofday(&tp, NULL); - return (tp.tv_sec * 1000 + tp.tv_usec / 1000); +int TelmateFrameGrabberOpenCVImpl::getSnapInterval () +{ + return this->snapInterval; +} + +void TelmateFrameGrabberOpenCVImpl::setSnapInterval (int snapInterval) +{ + this->snapInterval = snapInterval; + GST_INFO("Snapshot interval was set to: %d", this->snapInterval); + return; +} + +std::string TelmateFrameGrabberOpenCVImpl::getStoragePath () +{ + return this->storagePath; +} + +void TelmateFrameGrabberOpenCVImpl::setStoragePath (const std::string &path) +{ + this->storagePath = path; + GST_INFO("Storage Path was set to: %s", this->storagePath.c_str()); + return; +} + +void TelmateFrameGrabberOpenCVImpl::setWebRtcEpName (const std::string &epName) +{ + this->epName = epName; + GST_INFO("Endpoint name was set to: %s", this->epName.c_str()); + return; +} + +void TelmateFrameGrabberOpenCVImpl::setOutputFormat (int outputFormat) +{ + this->outputFormat = outputFormat; + GST_INFO("Snapshot output format was set to: %d", this->outputFormat); + return; } -} // namespace kurento +} /* telmateframegrabber */ +} /* module */ +} /* kurento */ diff --git a/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.hpp b/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.hpp index 11b84ef..d892c54 100644 --- a/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.hpp +++ b/module/src/server/implementation/objects/TelmateFrameGrabberOpenCVImpl.hpp @@ -5,6 +5,7 @@ #define NDEBUG 1 + #include #include #include @@ -30,6 +31,13 @@ #include + +#include +#include +#include +#include + + #define FGFMT_JPEG 0x0 #define FGFMT_PNG 0x1 @@ -39,59 +47,59 @@ #define MAX_IDLE_QUEUE_TIME_NS 30000 #define QUEUE_BASE_ELEMENT_ALLOC 1000 -//using namespace moodycamel; - - - - -namespace kurento { -class TelmateFrameGrabberOpenCVImpl : public virtual OpenCVProcess { - public: - TelmateFrameGrabberOpenCVImpl(); - virtual ~TelmateFrameGrabberOpenCVImpl(); - virtual void process(cv::Mat &mat); +namespace kurento +{ +namespace module +{ +namespace telmateframegrabber +{ - void release(); - void cleanup(); +class TelmateFrameGrabberOpenCVImpl : public virtual OpenCVProcess +{ - boost::atomic framesCounter; - int snapInterval; - std::string storagePath; - std::string prevStoragePath; - std::string epName; - int outputFormat; // 0x0=JPEG 0x1=PNG +public: + TelmateFrameGrabberOpenCVImpl (); - protected: + virtual ~TelmateFrameGrabberOpenCVImpl (); - TelmateFrameGrabberOpenCVImpl* getFrameGrabberPtr() { - return this; - } + virtual void process (cv::Mat &mat); - private: + int cleanup (); + int getSnapInterval (); + void setSnapInterval (int snapInterval); + std::string getStoragePath (); + void setStoragePath (const std::string &path); + void setWebRtcEpName (const std::string &epName); + void setOutputFormat (int outputFormat); - boost::asio::io_service ioService; - boost::thread_group tp; +private: + boost::atomic framesCounter; + int snapInterval; + std::string storagePath; + std::string epName; + int outputFormat; - avis_blocking_queue *frameQueue; - boost::thread* thr; - boost::atomic thrLoop; + avis_blocking_queue *frameQueue; + boost::thread* thr; + boost::atomic thrLoop; - boost::atomic lastQueueTimeStamp; - boost::atomic queueLength; - std::string storagePathSubdir; + boost::atomic lastQueueTimeStamp; + boost::atomic queueLength; + std::string storagePathSubdir; - void queueHandler(); - std::string getCurrentTimestampString(); - int64 getCurrentTimestampLong(); + void queueHandler(); + std::string getCurrentTimestampString(); + int64 getCurrentTimestampLong(); - boost::mutex workerThreadMutex; }; -} // namespace kurento +} /* telmateframegrabber */ +} /* module */ +} /* kurento */ #endif /* __TELMATE_FRAME_GRABBER_OPENCV_IMPL_HPP__ */ diff --git a/module/src/server/interface/telmateframegrabber.kmd.json b/module/src/server/interface/telmateframegrabber.kmd.json index 6508c2d..20d1604 100644 --- a/module/src/server/interface/telmateframegrabber.kmd.json +++ b/module/src/server/interface/telmateframegrabber.kmd.json @@ -1,5 +1,5 @@ { "name": "telmateframegrabber", - "version": "0.9.0", + "version": "0.9.1", "kurentoVersion": "^6.7.1" }