Initial reworking of js8 decoder threading and scheduling

This commit is contained in:
Jordan Sherer 2019-11-20 00:11:30 -05:00
parent 1a03619a2f
commit 8ec88432bd
8 changed files with 356 additions and 91 deletions

View File

@ -253,6 +253,8 @@ set (wsjtx_CXXSRCS
WaveFile.cpp
AudioDecoder.cpp
NotificationAudio.cpp
ProcessThread.cpp
Decoder.cpp
)
set (wsjt_CXXSRCS

163
Decoder.cpp Normal file
View File

@ -0,0 +1,163 @@
/**
* This file is part of JS8Call.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* (C) 2019 Jordan Sherer <kn4crd@gmail.com> - All Rights Reserved
*
**/
#include "Decoder.h"
#include "commons.h"
#include <QTimer>
Decoder::Decoder(QObject *parent):
QObject(parent)
{
}
Decoder::~Decoder(){
}
//
void Decoder::lock(){
// NOOP
}
//
void Decoder::unlock(){
// NOOP
}
//
Worker* Decoder::createWorker(){
auto worker = new Worker();
worker->moveToThread(&m_thread);
connect(&m_thread, &QThread::finished, worker, &QObject::deleteLater);
connect(this, &Decoder::startWorker, worker, &Worker::start);
connect(this, &Decoder::quitWorker, worker, &Worker::quit);
connect(worker, &Worker::ready, this, &Decoder::ready);
return worker;
}
//
void Decoder::start(QThread::Priority priority){
m_thread.start(priority);
}
//
void Decoder::quit(){
m_thread.quit();
}
//
bool Decoder::wait(){
return m_thread.wait();
}
//
void Decoder::processStart(QString path, QStringList args){
if(m_worker.isNull()){
m_worker = createWorker();
}
emit startWorker(path, args);
}
//
void Decoder::processReady(QByteArray t){
emit ready(t);
}
//
void Decoder::processQuit(){
emit quitWorker();
}
////////////////////////////////////////
//////////////// WORKER ////////////////
////////////////////////////////////////
//
Worker::~Worker(){
}
//
void Worker::setProcess(QProcess *proc, int msecs){
if(!m_proc.isNull()){
bool b = m_proc->waitForFinished(msecs);
if(!b) m_proc->close();
m_proc.reset();
}
if(proc){
m_proc.reset(proc);
}
}
//
void Worker::start(QString path, QStringList args){
if(JS8_DEBUG_DECODE) qDebug() << "decoder process starting...";
auto proc = new QProcess(this);
connect(proc, &QProcess::readyReadStandardOutput,
[this, proc](){
while(proc->canReadLine()){
emit ready(proc->readLine());
}
});
connect(proc, static_cast<void (QProcess::*) (QProcess::ProcessError)> (&QProcess::error),
[this, proc] (QProcess::ProcessError e) {
emit error();
});
connect(proc, static_cast<void (QProcess::*) (int, QProcess::ExitStatus)> (&QProcess::finished),
[this, proc] (int exitCode, QProcess::ExitStatus status) {
emit finished();
});
auto watcher = new QTimer(this);
connect(proc, static_cast<void (QProcess::*) (int, QProcess::ExitStatus)> (&QProcess::finished),
[this, watcher] (int /*exitCode*/, QProcess::ExitStatus /*status*/) {
watcher->stop();
});
connect(watcher, &QTimer::timeout,
[this, proc](){
if(JS8_DEBUG_DECODE) qDebug() << "decode process" << proc->state() << "can readline?" << proc->canReadLine();
});
watcher->setInterval(500);
watcher->start();
QProcessEnvironment env {QProcessEnvironment::systemEnvironment ()};
env.insert ("OMP_STACKSIZE", "4M");
proc->setProcessEnvironment (env);
proc->start(path, args, QIODevice::ReadWrite | QIODevice::Unbuffered);
setProcess(proc);
}
//
void Worker::quit(){
setProcess(nullptr);
}

71
Decoder.h Normal file
View File

@ -0,0 +1,71 @@
#ifndef DECODER_H
#define DECODER_H
/**
* (C) 2019 Jordan Sherer <kn4crd@gmail.com> - All Rights Reserved
**/
#include "ProcessThread.h"
#include <QDebug>
#include <QByteArray>
#include <QPointer>
#include <QProcess>
class Worker : public QObject{
Q_OBJECT
public:
~Worker();
public slots:
void start(QString path, QStringList args);
void quit();
private:
void setProcess(QProcess *proc, int msecs=1000);
signals:
void ready(QByteArray t);
void error();
void finished();
private:
QScopedPointer<QProcess> m_proc;
};
class Decoder: public QObject
{
Q_OBJECT
public:
Decoder(QObject *parent=nullptr);
~Decoder();
void lock();
void unlock();
private:
Worker* createWorker();
public slots:
void start(QThread::Priority priority);
void quit();
bool wait();
void processStart(QString path, QStringList args);
void processReady(QByteArray t);
void processQuit();
signals:
void startWorker(QString path, QStringList args);
void quitWorker();
void ready(QByteArray t);
private:
QPointer<Worker> m_worker;
QThread m_thread;
};
#endif // DECODER_H

46
ProcessThread.cpp Normal file
View File

@ -0,0 +1,46 @@
/**
* This file is part of JS8Call.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* (C) 2019 Jordan Sherer <kn4crd@gmail.com> - All Rights Reserved
*
**/
#include "ProcessThread.h"
ProcessThread::ProcessThread(QObject *parent):
QThread(parent)
{
}
ProcessThread::~ProcessThread(){
setProcess(nullptr);
}
/**
* @brief ProcessThread::setProcess
* @param proc - process to move to this thread and take ownership
*/
void ProcessThread::setProcess(QProcess *proc, int msecs){
if(!m_proc.isNull()){
bool b = m_proc->waitForFinished(msecs);
if(!b) m_proc->close();
m_proc.reset();
}
if(proc){
proc->moveToThread(this);
m_proc.reset(proc);
}
}

29
ProcessThread.h Normal file
View File

@ -0,0 +1,29 @@
#ifndef PROCESSTHREAD_H
#define PROCESSTHREAD_H
/**
* (C) 2019 Jordan Sherer <kn4crd@gmail.com> - All Rights Reserved
**/
#include <QScopedPointer>
#include <QProcess>
#include <QThread>
class ProcessThread : public QThread
{
Q_OBJECT
public:
ProcessThread(QObject *parent=nullptr);
~ProcessThread();
void setProcess(QProcess *proc, int msecs=1000);
QProcess * process(){
return m_proc.data();
}
protected:
QScopedPointer<QProcess> m_proc;
};
#endif // PROCESSTHREAD_H

View File

@ -88,7 +88,10 @@ SOURCES += \
CallsignValidator.cpp \
AudioDecoder.cpp \
WaveFile.cpp \
WaveUtils.cpp
WaveUtils.cpp \
ProcessThread.cpp \
DecoderThread.cpp \
Decoder.cpp
HEADERS += qt_helpers.hpp \
pimpl_h.hpp pimpl_impl.hpp \
@ -127,7 +130,10 @@ HEADERS += qt_helpers.hpp \
NotificationAudio.h \
AudioDecoder.h \
WaveFile.h \
WaveUtils.h
WaveUtils.h \
ProcessThread.h \
DecoderThread.h \
Decoder.h
INCLUDEPATH += qmake_only

View File

@ -419,6 +419,8 @@ MainWindow::MainWindow(QDir const& temp_directory, bool multiple,
m_downSampleFactor (downSampleFactor),
m_audioThreadPriority (QThread::HighPriority),
m_notificationAudioThreadPriority (QThread::LowPriority),
m_decoderThreadPriority (QThread::HighPriority),
m_decoder {this},
m_bandEdited {false},
m_splitMode {false},
m_monitoring {false},
@ -537,7 +539,9 @@ MainWindow::MainWindow(QDir const& temp_directory, bool multiple,
#endif
// decoder queue handler
connect(this, &MainWindow::decodedLineReady, this, &MainWindow::processDecodedLine);
//connect (&m_decodeThread, &QThread::finished, m_notification, &QObject::deleteLater);
//connect(this, &MainWindow::decodedLineReady, this, &MainWindow::processDecodedLine);
connect(&m_decoder, &Decoder::ready, this, &MainWindow::processDecodedLine);
on_EraseButton_clicked ();
@ -807,6 +811,7 @@ MainWindow::MainWindow(QDir const& temp_directory, bool multiple,
m_audioThread.start (m_audioThreadPriority);
m_notificationAudioThread.start(m_notificationAudioThreadPriority);
m_decoder.start(m_decoderThreadPriority);
#ifdef WIN32
if (!m_multiple)
@ -1561,10 +1566,10 @@ MainWindow::MainWindow(QDir const& temp_directory, bool multiple,
}
void MainWindow::initDecoderSubprocess(){
{
//delete any .quit file that might have been left lying around
//since its presence will cause jt9 to exit a soon as we start it
//and decodes will hang
{
QFile quitFile {m_config.temp_dir ().absoluteFilePath (".quit")};
while (quitFile.exists ())
{
@ -1580,7 +1585,11 @@ void MainWindow::initDecoderSubprocess(){
if(JS8_DEBUG_DECODE) qDebug() << "decoder lock create";
QFile {m_config.temp_dir ().absoluteFilePath (".lock")}.open(QIODevice::ReadWrite);
QStringList js8_args {
// create path
QString path = QDir::toNativeSeparators(m_appDir) + QDir::separator() + "js8";
// create args
QStringList args {
"-s", QApplication::applicationName () // shared memory key,
// includes rig
#ifdef NDEBUG
@ -1601,77 +1610,8 @@ void MainWindow::initDecoderSubprocess(){
, "-t", QDir::toNativeSeparators (m_config.temp_dir ().absolutePath ())
};
QProcessEnvironment env {QProcessEnvironment::systemEnvironment ()};
env.insert ("OMP_STACKSIZE", "4M");
if(JS8_DEBUG_DECODE) qDebug() << "decoder subprocess starting...";
#if JS8_DECODE_THREAD
auto thread = new QThread(nullptr);
#endif
auto proc = new QProcess(nullptr);
connect(proc, &QProcess::readyReadStandardOutput, this,
[this, proc](){
#if JS8_DECODE_THREAD
while(proc->canReadLine()){
emit decodedLineReady(proc->readLine());
}
#else
readFromStdout(proc);
#endif
});
connect(proc, static_cast<void (QProcess::*) (QProcess::ProcessError)> (&QProcess::error),
[this, proc] (QProcess::ProcessError error) {
subProcessError (proc, error);
});
connect(proc, static_cast<void (QProcess::*) (int, QProcess::ExitStatus)> (&QProcess::finished),
[this, proc] (int exitCode, QProcess::ExitStatus status) {
#if JS8_DECODE_THREAD
proc->deleteLater();
proc->thread()->quit();
#endif
subProcessFailed (proc, exitCode, status);
});
proc->setProcessEnvironment (env);
proc->start(QDir::toNativeSeparators (m_appDir) + QDir::separator () +
"js8", js8_args, QIODevice::ReadWrite | QIODevice::Unbuffered);
#if JS8_DECODE_THREAD
if(JS8_DEBUG_DECODE) qDebug() << "decoder subprocess moving to new thread...";
// move process handling into its own thread
proc->moveToThread(thread);
connect(thread, &QThread::finished, thread, &QObject::deleteLater);
thread->moveToThread(qApp->thread());
thread->start(QThread::HighPriority);
#endif
// create a process watcher looking for stdout read...
// seems like we're starving the event loop or something?
// auto watcher = new QTimer(proc);
// watcher->setInterval(500);
// connect(watcher, &QTimer::timeout, this,
// [this, proc](){
// if(proc->canReadLine()){
// if(JS8_DEBUG_DECODE) qDebug() << "decode process watcher intercepted readline";
// readFromStdout(proc);
// }
// });
// watcher->start();
// kill the previous proc and set the new one
m_valid = false;
{
if(!proc_js8.isNull()){
proc_js8->kill();
}
proc_js8.reset(proc);
}
m_valid = true;
// initialize
m_decoder.processStart(path, args);
// reset decode busy
if(m_decoderBusy){
@ -2194,6 +2134,9 @@ MainWindow::~MainWindow()
m_notificationAudioThread.quit();
m_notificationAudioThread.wait();
m_decoder.quit();
m_decoder.wait();
remove_child_from_event_filter (this);
}
@ -2468,6 +2411,8 @@ void MainWindow::readSettings()
m_msAudioOutputBuffered = m_settings->value ("Audio/OutputBufferMs").toInt ();
m_framesAudioInputBuffered = m_settings->value ("Audio/InputBufferFrames", RX_SAMPLE_RATE / 10).toInt ();
m_audioThreadPriority = static_cast<QThread::Priority> (m_settings->value ("Audio/ThreadPriority", QThread::HighPriority).toInt () % 8);
m_notificationAudioThreadPriority = static_cast<QThread::Priority> (m_settings->value ("Audio/NotificationThreadPriority", QThread::LowPriority).toInt () % 8);
m_decoderThreadPriority = static_cast<QThread::Priority> (m_settings->value ("Audio/DecoderThreadPriority", QThread::HighPriority).toInt () % 8);
m_settings->endGroup ();
if(m_config.reset_activity()){
@ -3600,14 +3545,12 @@ void MainWindow::closeEvent(QCloseEvent * e)
mem_js8->detach();
QFile quitFile {m_config.temp_dir ().absoluteFilePath (".quit")};
quitFile.open(QIODevice::ReadWrite);
{
if(JS8_DEBUG_DECODE) qDebug() << "decoder lock remove";
QFile {m_config.temp_dir ().absoluteFilePath (".lock")}.remove(); // Allow jt9 to terminate
if(!proc_js8.isNull()){
bool b=proc_js8->waitForFinished(1000);
if(!b) proc_js8->close();
m_decoder.processQuit();
}
quitFile.remove();
Q_EMIT finished ();
QMainWindow::closeEvent (e);

View File

@ -48,6 +48,8 @@
#include "SpotClient.h"
#include "keyeater.h"
#include "NotificationAudio.h"
#include "ProcessThread.h"
#include "Decoder.h"
#define NUM_JT4_SYMBOLS 206 //(72+31)*2, embedded sync
#define NUM_JT65_SYMBOLS 126 //63 data + 63 sync
@ -500,8 +502,10 @@ private:
Modulator * m_modulator;
SoundOutput * m_soundOutput;
NotificationAudio * m_notification;
QThread m_audioThread;
QThread m_notificationAudioThread;
Decoder m_decoder;
qint64 m_msErase;
qint64 m_secBandChanged;
@ -665,7 +669,7 @@ private:
QFutureWatcher<void> watcher3;
QFutureWatcher<QString> m_saveWAVWatcher;
QScopedPointer<QProcess> proc_js8;
//QPointer<QProcess> proc_js8;
QTimer m_guiTimer;
QTimer ptt1Timer; //StartTx delay
@ -897,6 +901,7 @@ private:
unsigned m_downSampleFactor;
QThread::Priority m_audioThreadPriority;
QThread::Priority m_notificationAudioThreadPriority;
QThread::Priority m_decoderThreadPriority;
bool m_bandEdited;
bool m_splitMode;
bool m_monitoring;