#include "MercureClient.h" #include #include #include #include #include #include #include namespace PhpQml::Bridge { MercureClient::MercureClient(QObject* parent) : QObject(parent) , m_nam(new QNetworkAccessManager(this)) { } MercureClient::~MercureClient() { teardownReply(); } void MercureClient::setHubUrl(const QString& url) { if (m_hubUrl == url) return; m_hubUrl = url; emit hubUrlChanged(); } void MercureClient::setTopic(const QString& t) { if (m_topic == t) return; m_topic = t; emit topicChanged(); } void MercureClient::setToken(const QString& t) { if (m_token == t) return; m_token = t; emit tokenChanged(); } void MercureClient::start() { m_userStopped = false; m_currentBackoffMs = kInitialBackoffMs; if (m_reconnectTimer) m_reconnectTimer->stop(); doConnect(); } void MercureClient::stop() { m_userStopped = true; if (m_reconnectTimer) m_reconnectTimer->stop(); teardownReply(); setActive(false); } void MercureClient::doConnect() { if (m_userStopped) return; if (m_hubUrl.isEmpty() || m_topic.isEmpty()) return; if (m_reply) return; // already connecting / connected QUrl u(m_hubUrl); QUrlQuery query(u); query.addQueryItem(QStringLiteral("topic"), m_topic); u.setQuery(query); QNetworkRequest req(u); req.setRawHeader("Accept", "text/event-stream"); req.setRawHeader("Cache-Control", "no-cache"); if (!m_lastEventId.isEmpty()) { req.setRawHeader("Last-Event-ID", m_lastEventId.toUtf8()); } if (!m_token.isEmpty()) { req.setRawHeader("Authorization", "Bearer " + m_token.toUtf8()); } // Disable transfer timeout — SSE is a long-lived stream. req.setTransferTimeout(0); m_reply = m_nam->get(req); connect(m_reply, &QNetworkReply::readyRead, this, &MercureClient::onReadyRead); connect(m_reply, &QNetworkReply::finished, this, &MercureClient::onFinished); setActive(true); } void MercureClient::onReadyRead() { if (!m_reply) return; while (m_reply->canReadLine()) { QByteArray line = m_reply->readLine(); // Strip trailing line terminator (handles both \n and \r\n). while (line.endsWith('\n') || line.endsWith('\r')) { line.chop(1); } if (line.isEmpty()) { emitMessage(); continue; } if (line.startsWith(':')) continue; // SSE comment const int colon = line.indexOf(':'); if (colon < 0) continue; // malformed const QByteArray field = line.left(colon); QByteArray value = line.mid(colon + 1); if (value.startsWith(' ')) value = value.mid(1); if (field == "data") { m_dataLines << QString::fromUtf8(value); } else if (field == "id") { m_pendingEventId = value; } // event:, retry: are intentionally ignored for Phase 1. } // Successful read — reset backoff so a future drop reconnects fast. m_currentBackoffMs = kInitialBackoffMs; } void MercureClient::emitMessage() { if (m_dataLines.isEmpty() && m_pendingEventId.isEmpty()) return; if (!m_pendingEventId.isEmpty()) { const QString id = QString::fromUtf8(m_pendingEventId); if (id != m_lastEventId) { m_lastEventId = id; emit lastEventIdChanged(); } } if (!m_dataLines.isEmpty()) { emit update(m_dataLines.join(QChar('\n')), QString::fromUtf8(m_pendingEventId)); } m_dataLines.clear(); m_pendingEventId.clear(); } void MercureClient::onFinished() { if (!m_reply) return; const auto err = m_reply->error(); const QString errStr = m_reply->errorString(); teardownReply(); setActive(false); if (m_userStopped) return; if (err != QNetworkReply::NoError && err != QNetworkReply::OperationCanceledError) { emit error(errStr); } scheduleReconnect(); } void MercureClient::scheduleReconnect() { if (m_userStopped) return; if (m_hubUrl.isEmpty() || m_topic.isEmpty()) return; if (!m_reconnectTimer) { m_reconnectTimer = new QTimer(this); m_reconnectTimer->setSingleShot(true); connect(m_reconnectTimer, &QTimer::timeout, this, &MercureClient::doConnect); } m_reconnectTimer->start(m_currentBackoffMs); m_currentBackoffMs = std::min(m_currentBackoffMs * 2, kMaxBackoffMs); } void MercureClient::teardownReply() { if (m_reply) { disconnect(m_reply, nullptr, this, nullptr); m_reply->abort(); m_reply->deleteLater(); m_reply = nullptr; } m_dataLines.clear(); m_pendingEventId.clear(); } void MercureClient::setActive(bool a) { if (m_active == a) return; m_active = a; emit activeChanged(); } } // namespace PhpQml::Bridge