From 8ec88432bd1b17ad6e5f8a5314224511837d9847 Mon Sep 17 00:00:00 2001 From: Jordan Sherer Date: Wed, 20 Nov 2019 00:11:30 -0500 Subject: [PATCH] Initial reworking of js8 decoder threading and scheduling --- CMakeLists.txt | 2 + Decoder.cpp | 163 ++++++++++++++++++++++++++++++++++++++++++++++ Decoder.h | 71 ++++++++++++++++++++ ProcessThread.cpp | 46 +++++++++++++ ProcessThread.h | 29 +++++++++ js8call.pro | 10 ++- mainwindow.cpp | 119 +++++++++------------------------ mainwindow.h | 7 +- 8 files changed, 356 insertions(+), 91 deletions(-) create mode 100644 Decoder.cpp create mode 100644 Decoder.h create mode 100644 ProcessThread.cpp create mode 100644 ProcessThread.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 2aa5ab9..06f9796 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -253,6 +253,8 @@ set (wsjtx_CXXSRCS WaveFile.cpp AudioDecoder.cpp NotificationAudio.cpp + ProcessThread.cpp + Decoder.cpp ) set (wsjt_CXXSRCS diff --git a/Decoder.cpp b/Decoder.cpp new file mode 100644 index 0000000..891501a --- /dev/null +++ b/Decoder.cpp @@ -0,0 +1,163 @@ +/** + * This file is part of JS8Call. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (C) 2019 Jordan Sherer - All Rights Reserved + * + **/ + +#include "Decoder.h" + +#include "commons.h" + +#include + + +Decoder::Decoder(QObject *parent): + QObject(parent) +{ +} + +Decoder::~Decoder(){ +} + +// +void Decoder::lock(){ + // NOOP +} + +// +void Decoder::unlock(){ + // NOOP +} + +// +Worker* Decoder::createWorker(){ + auto worker = new Worker(); + worker->moveToThread(&m_thread); + connect(&m_thread, &QThread::finished, worker, &QObject::deleteLater); + connect(this, &Decoder::startWorker, worker, &Worker::start); + connect(this, &Decoder::quitWorker, worker, &Worker::quit); + connect(worker, &Worker::ready, this, &Decoder::ready); + return worker; +} + +// +void Decoder::start(QThread::Priority priority){ + m_thread.start(priority); +} + +// +void Decoder::quit(){ + m_thread.quit(); +} + +// +bool Decoder::wait(){ + return m_thread.wait(); +} + +// +void Decoder::processStart(QString path, QStringList args){ + if(m_worker.isNull()){ + m_worker = createWorker(); + } + + emit startWorker(path, args); +} + +// +void Decoder::processReady(QByteArray t){ + emit ready(t); +} + +// +void Decoder::processQuit(){ + emit quitWorker(); +} + +//////////////////////////////////////// +//////////////// WORKER //////////////// +//////////////////////////////////////// + +// +Worker::~Worker(){ +} + +// +void Worker::setProcess(QProcess *proc, int msecs){ + if(!m_proc.isNull()){ + bool b = m_proc->waitForFinished(msecs); + if(!b) m_proc->close(); + m_proc.reset(); + } + + if(proc){ + m_proc.reset(proc); + } +} + +// +void Worker::start(QString path, QStringList args){ + if(JS8_DEBUG_DECODE) qDebug() << "decoder process starting..."; + + auto proc = new QProcess(this); + + connect(proc, &QProcess::readyReadStandardOutput, + [this, proc](){ + while(proc->canReadLine()){ + emit ready(proc->readLine()); + } + }); + + connect(proc, static_cast (&QProcess::error), + [this, proc] (QProcess::ProcessError e) { + emit error(); + }); + + connect(proc, static_cast (&QProcess::finished), + [this, proc] (int exitCode, QProcess::ExitStatus status) { + emit finished(); + }); + + + auto watcher = new QTimer(this); + + connect(proc, static_cast (&QProcess::finished), + [this, watcher] (int /*exitCode*/, QProcess::ExitStatus /*status*/) { + watcher->stop(); + }); + + connect(watcher, &QTimer::timeout, + [this, proc](){ + if(JS8_DEBUG_DECODE) qDebug() << "decode process" << proc->state() << "can readline?" << proc->canReadLine(); + }); + + watcher->setInterval(500); + watcher->start(); + + + QProcessEnvironment env {QProcessEnvironment::systemEnvironment ()}; + env.insert ("OMP_STACKSIZE", "4M"); + proc->setProcessEnvironment (env); + proc->start(path, args, QIODevice::ReadWrite | QIODevice::Unbuffered); + + setProcess(proc); +} + +// +void Worker::quit(){ + setProcess(nullptr); +} diff --git a/Decoder.h b/Decoder.h new file mode 100644 index 0000000..6dd2756 --- /dev/null +++ b/Decoder.h @@ -0,0 +1,71 @@ +#ifndef DECODER_H +#define DECODER_H + +/** + * (C) 2019 Jordan Sherer - All Rights Reserved + **/ + +#include "ProcessThread.h" + +#include +#include +#include +#include + + +class Worker : public QObject{ + Q_OBJECT +public: + ~Worker(); +public slots: + void start(QString path, QStringList args); + void quit(); + +private: + void setProcess(QProcess *proc, int msecs=1000); + +signals: + void ready(QByteArray t); + void error(); + void finished(); + +private: + QScopedPointer m_proc; +}; + + +class Decoder: public QObject +{ + Q_OBJECT +public: + Decoder(QObject *parent=nullptr); + ~Decoder(); + + void lock(); + void unlock(); + +private: + Worker* createWorker(); + +public slots: + void start(QThread::Priority priority); + void quit(); + bool wait(); + + void processStart(QString path, QStringList args); + void processReady(QByteArray t); + void processQuit(); + +signals: + void startWorker(QString path, QStringList args); + void quitWorker(); + + void ready(QByteArray t); + +private: + QPointer m_worker; + QThread m_thread; +}; + + +#endif // DECODER_H diff --git a/ProcessThread.cpp b/ProcessThread.cpp new file mode 100644 index 0000000..3050c7f --- /dev/null +++ b/ProcessThread.cpp @@ -0,0 +1,46 @@ +/** + * This file is part of JS8Call. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (C) 2019 Jordan Sherer - All Rights Reserved + * + **/ +#include "ProcessThread.h" + +ProcessThread::ProcessThread(QObject *parent): + QThread(parent) +{ +} + +ProcessThread::~ProcessThread(){ + setProcess(nullptr); +} + +/** + * @brief ProcessThread::setProcess + * @param proc - process to move to this thread and take ownership + */ +void ProcessThread::setProcess(QProcess *proc, int msecs){ + if(!m_proc.isNull()){ + bool b = m_proc->waitForFinished(msecs); + if(!b) m_proc->close(); + m_proc.reset(); + } + + if(proc){ + proc->moveToThread(this); + m_proc.reset(proc); + } +} diff --git a/ProcessThread.h b/ProcessThread.h new file mode 100644 index 0000000..e0c578c --- /dev/null +++ b/ProcessThread.h @@ -0,0 +1,29 @@ +#ifndef PROCESSTHREAD_H +#define PROCESSTHREAD_H + +/** + * (C) 2019 Jordan Sherer - All Rights Reserved + **/ + +#include +#include +#include + + +class ProcessThread : public QThread +{ + Q_OBJECT +public: + ProcessThread(QObject *parent=nullptr); + ~ProcessThread(); + + void setProcess(QProcess *proc, int msecs=1000); + QProcess * process(){ + return m_proc.data(); + } + +protected: + QScopedPointer m_proc; +}; + +#endif // PROCESSTHREAD_H diff --git a/js8call.pro b/js8call.pro index 924b2aa..722e710 100644 --- a/js8call.pro +++ b/js8call.pro @@ -88,7 +88,10 @@ SOURCES += \ CallsignValidator.cpp \ AudioDecoder.cpp \ WaveFile.cpp \ - WaveUtils.cpp + WaveUtils.cpp \ + ProcessThread.cpp \ + DecoderThread.cpp \ + Decoder.cpp HEADERS += qt_helpers.hpp \ pimpl_h.hpp pimpl_impl.hpp \ @@ -127,7 +130,10 @@ HEADERS += qt_helpers.hpp \ NotificationAudio.h \ AudioDecoder.h \ WaveFile.h \ - WaveUtils.h + WaveUtils.h \ + ProcessThread.h \ + DecoderThread.h \ + Decoder.h INCLUDEPATH += qmake_only diff --git a/mainwindow.cpp b/mainwindow.cpp index c8d370a..9cbde48 100644 --- a/mainwindow.cpp +++ b/mainwindow.cpp @@ -419,6 +419,8 @@ MainWindow::MainWindow(QDir const& temp_directory, bool multiple, m_downSampleFactor (downSampleFactor), m_audioThreadPriority (QThread::HighPriority), m_notificationAudioThreadPriority (QThread::LowPriority), + m_decoderThreadPriority (QThread::HighPriority), + m_decoder {this}, m_bandEdited {false}, m_splitMode {false}, m_monitoring {false}, @@ -537,7 +539,9 @@ MainWindow::MainWindow(QDir const& temp_directory, bool multiple, #endif // decoder queue handler - connect(this, &MainWindow::decodedLineReady, this, &MainWindow::processDecodedLine); + //connect (&m_decodeThread, &QThread::finished, m_notification, &QObject::deleteLater); + //connect(this, &MainWindow::decodedLineReady, this, &MainWindow::processDecodedLine); + connect(&m_decoder, &Decoder::ready, this, &MainWindow::processDecodedLine); on_EraseButton_clicked (); @@ -807,6 +811,7 @@ MainWindow::MainWindow(QDir const& temp_directory, bool multiple, m_audioThread.start (m_audioThreadPriority); m_notificationAudioThread.start(m_notificationAudioThreadPriority); + m_decoder.start(m_decoderThreadPriority); #ifdef WIN32 if (!m_multiple) @@ -1561,17 +1566,17 @@ MainWindow::MainWindow(QDir const& temp_directory, bool multiple, } void MainWindow::initDecoderSubprocess(){ + //delete any .quit file that might have been left lying around + //since its presence will cause jt9 to exit a soon as we start it + //and decodes will hang { - //delete any .quit file that might have been left lying around - //since its presence will cause jt9 to exit a soon as we start it - //and decodes will hang - QFile quitFile {m_config.temp_dir ().absoluteFilePath (".quit")}; - while (quitFile.exists ()) + QFile quitFile {m_config.temp_dir ().absoluteFilePath (".quit")}; + while (quitFile.exists ()) { - if (!quitFile.remove ()) + if (!quitFile.remove ()) { - MessageBox::query_message (this, tr ("Error removing \"%1\"").arg (quitFile.fileName ()) - , tr ("Click OK to retry")); + MessageBox::query_message (this, tr ("Error removing \"%1\"").arg (quitFile.fileName ()) + , tr ("Click OK to retry")); } } } @@ -1580,7 +1585,11 @@ void MainWindow::initDecoderSubprocess(){ if(JS8_DEBUG_DECODE) qDebug() << "decoder lock create"; QFile {m_config.temp_dir ().absoluteFilePath (".lock")}.open(QIODevice::ReadWrite); - QStringList js8_args { + // create path + QString path = QDir::toNativeSeparators(m_appDir) + QDir::separator() + "js8"; + + // create args + QStringList args { "-s", QApplication::applicationName () // shared memory key, // includes rig #ifdef NDEBUG @@ -1599,79 +1608,10 @@ void MainWindow::initDecoderSubprocess(){ , "-e", QDir::toNativeSeparators (m_appDir) , "-a", QDir::toNativeSeparators (m_config.writeable_data_dir ().absolutePath ()) , "-t", QDir::toNativeSeparators (m_config.temp_dir ().absolutePath ()) - }; + }; - QProcessEnvironment env {QProcessEnvironment::systemEnvironment ()}; - env.insert ("OMP_STACKSIZE", "4M"); - - if(JS8_DEBUG_DECODE) qDebug() << "decoder subprocess starting..."; - -#if JS8_DECODE_THREAD - auto thread = new QThread(nullptr); -#endif - - auto proc = new QProcess(nullptr); - - connect(proc, &QProcess::readyReadStandardOutput, this, - [this, proc](){ -#if JS8_DECODE_THREAD - while(proc->canReadLine()){ - emit decodedLineReady(proc->readLine()); - } -#else - readFromStdout(proc); -#endif - }); - - connect(proc, static_cast (&QProcess::error), - [this, proc] (QProcess::ProcessError error) { - subProcessError (proc, error); - }); - - connect(proc, static_cast (&QProcess::finished), - [this, proc] (int exitCode, QProcess::ExitStatus status) { -#if JS8_DECODE_THREAD - proc->deleteLater(); - proc->thread()->quit(); -#endif - subProcessFailed (proc, exitCode, status); - }); - - proc->setProcessEnvironment (env); - proc->start(QDir::toNativeSeparators (m_appDir) + QDir::separator () + - "js8", js8_args, QIODevice::ReadWrite | QIODevice::Unbuffered); - -#if JS8_DECODE_THREAD - if(JS8_DEBUG_DECODE) qDebug() << "decoder subprocess moving to new thread..."; - // move process handling into its own thread - proc->moveToThread(thread); - connect(thread, &QThread::finished, thread, &QObject::deleteLater); - thread->moveToThread(qApp->thread()); - thread->start(QThread::HighPriority); -#endif - - // create a process watcher looking for stdout read... - // seems like we're starving the event loop or something? - // auto watcher = new QTimer(proc); - // watcher->setInterval(500); - // connect(watcher, &QTimer::timeout, this, - // [this, proc](){ - // if(proc->canReadLine()){ - // if(JS8_DEBUG_DECODE) qDebug() << "decode process watcher intercepted readline"; - // readFromStdout(proc); - // } - // }); - // watcher->start(); - - // kill the previous proc and set the new one - m_valid = false; - { - if(!proc_js8.isNull()){ - proc_js8->kill(); - } - proc_js8.reset(proc); - } - m_valid = true; + // initialize + m_decoder.processStart(path, args); // reset decode busy if(m_decoderBusy){ @@ -2194,6 +2134,9 @@ MainWindow::~MainWindow() m_notificationAudioThread.quit(); m_notificationAudioThread.wait(); + m_decoder.quit(); + m_decoder.wait(); + remove_child_from_event_filter (this); } @@ -2468,6 +2411,8 @@ void MainWindow::readSettings() m_msAudioOutputBuffered = m_settings->value ("Audio/OutputBufferMs").toInt (); m_framesAudioInputBuffered = m_settings->value ("Audio/InputBufferFrames", RX_SAMPLE_RATE / 10).toInt (); m_audioThreadPriority = static_cast (m_settings->value ("Audio/ThreadPriority", QThread::HighPriority).toInt () % 8); + m_notificationAudioThreadPriority = static_cast (m_settings->value ("Audio/NotificationThreadPriority", QThread::LowPriority).toInt () % 8); + m_decoderThreadPriority = static_cast (m_settings->value ("Audio/DecoderThreadPriority", QThread::HighPriority).toInt () % 8); m_settings->endGroup (); if(m_config.reset_activity()){ @@ -3600,14 +3545,12 @@ void MainWindow::closeEvent(QCloseEvent * e) mem_js8->detach(); QFile quitFile {m_config.temp_dir ().absoluteFilePath (".quit")}; quitFile.open(QIODevice::ReadWrite); - if(JS8_DEBUG_DECODE) qDebug() << "decoder lock remove"; - QFile {m_config.temp_dir ().absoluteFilePath (".lock")}.remove(); // Allow jt9 to terminate - if(!proc_js8.isNull()){ - bool b=proc_js8->waitForFinished(1000); - if(!b) proc_js8->close(); + { + if(JS8_DEBUG_DECODE) qDebug() << "decoder lock remove"; + QFile {m_config.temp_dir ().absoluteFilePath (".lock")}.remove(); // Allow jt9 to terminate + m_decoder.processQuit(); } quitFile.remove(); - Q_EMIT finished (); QMainWindow::closeEvent (e); diff --git a/mainwindow.h b/mainwindow.h index 43612bb..63f0abf 100644 --- a/mainwindow.h +++ b/mainwindow.h @@ -48,6 +48,8 @@ #include "SpotClient.h" #include "keyeater.h" #include "NotificationAudio.h" +#include "ProcessThread.h" +#include "Decoder.h" #define NUM_JT4_SYMBOLS 206 //(72+31)*2, embedded sync #define NUM_JT65_SYMBOLS 126 //63 data + 63 sync @@ -500,8 +502,10 @@ private: Modulator * m_modulator; SoundOutput * m_soundOutput; NotificationAudio * m_notification; + QThread m_audioThread; QThread m_notificationAudioThread; + Decoder m_decoder; qint64 m_msErase; qint64 m_secBandChanged; @@ -665,7 +669,7 @@ private: QFutureWatcher watcher3; QFutureWatcher m_saveWAVWatcher; - QScopedPointer proc_js8; + //QPointer proc_js8; QTimer m_guiTimer; QTimer ptt1Timer; //StartTx delay @@ -897,6 +901,7 @@ private: unsigned m_downSampleFactor; QThread::Priority m_audioThreadPriority; QThread::Priority m_notificationAudioThreadPriority; + QThread::Priority m_decoderThreadPriority; bool m_bandEdited; bool m_splitMode; bool m_monitoring;