Phase 1 sub-commit 5: Qt transport types
All checks were successful
CI / Quality (push) Successful in 5s

MercureClient is a single-topic SSE subscriber: opens a long-lived GET
on the hub URL with the topic query and Accept: text/event-stream,
parses the line protocol into update(data, id) signals, and reconnects
with 1s→2s→…→30s exponential backoff on drop. Tracks lastEventId across
reconnects and sends it as Last-Event-ID so the hub can replay missed
messages — backing the "Sleep / wake" path in PLAN.md §3 *Edge cases*.
One client per topic by design; multi-topic aggregation is Phase 2.

RestClient.qml is a Promise-style XMLHttpRequest wrapper. Auto-attaches
an RFC4122-v4 Idempotency-Key to every non-GET request (PLAN.md §4 and
§7) so retries are safe by default. Maps application/problem+json error
bodies into structured rejections for downstream UI.

Standalone CMake build remains green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-02 01:21:59 +02:00
parent 87b5b2283c
commit 75840a240e
4 changed files with 383 additions and 0 deletions

View File

@@ -0,0 +1,197 @@
#include "MercureClient.h"
#include <QNetworkAccessManager>
#include <QNetworkReply>
#include <QNetworkRequest>
#include <QTimer>
#include <QUrl>
#include <QUrlQuery>
#include <algorithm>
namespace PhpQml::Bridge {
MercureClient::MercureClient(QObject* parent)
: QObject(parent)
, m_nam(new QNetworkAccessManager(this))
{
}
MercureClient::~MercureClient()
{
teardownReply();
}
void MercureClient::setHubUrl(const QString& url)
{
if (m_hubUrl == url) return;
m_hubUrl = url;
emit hubUrlChanged();
}
void MercureClient::setTopic(const QString& t)
{
if (m_topic == t) return;
m_topic = t;
emit topicChanged();
}
void MercureClient::setToken(const QString& t)
{
if (m_token == t) return;
m_token = t;
emit tokenChanged();
}
void MercureClient::start()
{
m_userStopped = false;
m_currentBackoffMs = kInitialBackoffMs;
if (m_reconnectTimer) m_reconnectTimer->stop();
doConnect();
}
void MercureClient::stop()
{
m_userStopped = true;
if (m_reconnectTimer) m_reconnectTimer->stop();
teardownReply();
setActive(false);
}
void MercureClient::doConnect()
{
if (m_userStopped) return;
if (m_hubUrl.isEmpty() || m_topic.isEmpty()) return;
if (m_reply) return; // already connecting / connected
QUrl u(m_hubUrl);
QUrlQuery query(u);
query.addQueryItem(QStringLiteral("topic"), m_topic);
u.setQuery(query);
QNetworkRequest req(u);
req.setRawHeader("Accept", "text/event-stream");
req.setRawHeader("Cache-Control", "no-cache");
if (!m_lastEventId.isEmpty()) {
req.setRawHeader("Last-Event-ID", m_lastEventId.toUtf8());
}
if (!m_token.isEmpty()) {
req.setRawHeader("Authorization", "Bearer " + m_token.toUtf8());
}
// Disable transfer timeout — SSE is a long-lived stream.
req.setTransferTimeout(0);
m_reply = m_nam->get(req);
connect(m_reply, &QNetworkReply::readyRead, this, &MercureClient::onReadyRead);
connect(m_reply, &QNetworkReply::finished, this, &MercureClient::onFinished);
setActive(true);
}
void MercureClient::onReadyRead()
{
if (!m_reply) return;
while (m_reply->canReadLine()) {
QByteArray line = m_reply->readLine();
// Strip trailing line terminator (handles both \n and \r\n).
while (line.endsWith('\n') || line.endsWith('\r')) {
line.chop(1);
}
if (line.isEmpty()) {
emitMessage();
continue;
}
if (line.startsWith(':')) continue; // SSE comment
const int colon = line.indexOf(':');
if (colon < 0) continue; // malformed
const QByteArray field = line.left(colon);
QByteArray value = line.mid(colon + 1);
if (value.startsWith(' ')) value = value.mid(1);
if (field == "data") {
m_dataLines << QString::fromUtf8(value);
} else if (field == "id") {
m_pendingEventId = value;
}
// event:, retry: are intentionally ignored for Phase 1.
}
// Successful read — reset backoff so a future drop reconnects fast.
m_currentBackoffMs = kInitialBackoffMs;
}
void MercureClient::emitMessage()
{
if (m_dataLines.isEmpty() && m_pendingEventId.isEmpty()) return;
if (!m_pendingEventId.isEmpty()) {
const QString id = QString::fromUtf8(m_pendingEventId);
if (id != m_lastEventId) {
m_lastEventId = id;
emit lastEventIdChanged();
}
}
if (!m_dataLines.isEmpty()) {
emit update(m_dataLines.join(QChar('\n')), QString::fromUtf8(m_pendingEventId));
}
m_dataLines.clear();
m_pendingEventId.clear();
}
void MercureClient::onFinished()
{
if (!m_reply) return;
const auto err = m_reply->error();
const QString errStr = m_reply->errorString();
teardownReply();
setActive(false);
if (m_userStopped) return;
if (err != QNetworkReply::NoError && err != QNetworkReply::OperationCanceledError) {
emit error(errStr);
}
scheduleReconnect();
}
void MercureClient::scheduleReconnect()
{
if (m_userStopped) return;
if (m_hubUrl.isEmpty() || m_topic.isEmpty()) return;
if (!m_reconnectTimer) {
m_reconnectTimer = new QTimer(this);
m_reconnectTimer->setSingleShot(true);
connect(m_reconnectTimer, &QTimer::timeout, this, &MercureClient::doConnect);
}
m_reconnectTimer->start(m_currentBackoffMs);
m_currentBackoffMs = std::min(m_currentBackoffMs * 2, kMaxBackoffMs);
}
void MercureClient::teardownReply()
{
if (m_reply) {
disconnect(m_reply, nullptr, this, nullptr);
m_reply->abort();
m_reply->deleteLater();
m_reply = nullptr;
}
m_dataLines.clear();
m_pendingEventId.clear();
}
void MercureClient::setActive(bool a)
{
if (m_active == a) return;
m_active = a;
emit activeChanged();
}
} // namespace PhpQml::Bridge