#include "ReactiveObject.h" #include "MercureClient.h" #include #include #include #include #include #include #include #include #include namespace PhpQml::Bridge { ReactiveObject::ReactiveObject(QObject* parent) : QObject(parent) , m_data(new QQmlPropertyMap(this)) , m_nam(new QNetworkAccessManager(this)) , m_mercure(new MercureClient(this)) { connect(m_mercure, &MercureClient::update, this, &ReactiveObject::onMercureUpdate); } ReactiveObject::~ReactiveObject() { qDeleteAll(m_echoTimers); } void ReactiveObject::setBaseUrl(const QString& v) { if (m_baseUrl == v) return; m_baseUrl = v; rewireMercure(); emit baseUrlChanged(); if (!m_source.isEmpty()) refresh(); } void ReactiveObject::setToken(const QString& v) { if (m_token == v) return; m_token = v; emit tokenChanged(); } void ReactiveObject::setSource(const QString& v) { if (m_source == v) return; m_source = v; emit sourceChanged(); if (!m_baseUrl.isEmpty()) refresh(); } void ReactiveObject::setTopic(const QString& v) { if (m_topic == v) return; m_topic = v; rewireMercure(); emit topicChanged(); } void ReactiveObject::rewireMercure() { if (m_baseUrl.isEmpty() || m_topic.isEmpty()) return; m_mercure->stop(); m_mercure->setHubUrl(m_baseUrl + QStringLiteral("/.well-known/mercure")); m_mercure->setTopic(m_topic); m_mercure->start(); } void ReactiveObject::refresh() { fetchInitial(); } void ReactiveObject::fetchInitial() { if (m_baseUrl.isEmpty() || m_source.isEmpty()) return; if (m_pendingFetch) { m_pendingFetch->abort(); m_pendingFetch->deleteLater(); m_pendingFetch = nullptr; } QNetworkRequest req(QUrl(m_baseUrl + m_source)); req.setRawHeader("Accept", "application/json"); if (!m_token.isEmpty()) { req.setRawHeader("Authorization", "Bearer " + m_token.toUtf8()); } req.setTransferTimeout(5000); setReady(false); m_pendingFetch = m_nam->get(req); connect(m_pendingFetch, &QNetworkReply::finished, this, &ReactiveObject::onFetchFinished); } void ReactiveObject::onFetchFinished() { QNetworkReply* reply = m_pendingFetch; m_pendingFetch = nullptr; if (!reply) return; reply->deleteLater(); const int status = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); if (reply->error() != QNetworkReply::NoError) { setError(reply->errorString()); return; } if (status == 404) { clearData(); setExists(false); setError(QString()); setReady(true); return; } if (status < 200 || status >= 300) { setError(QStringLiteral("GET %1 returned HTTP %2").arg(m_source).arg(status)); return; } QJsonParseError err{}; const QJsonDocument doc = QJsonDocument::fromJson(reply->readAll(), &err); if (err.error != QJsonParseError::NoError || !doc.isObject()) { setError(QStringLiteral("Initial fetch did not return a JSON object.")); return; } applyPayload(doc.object()); setExists(true); setError(QString()); setReady(true); } void ReactiveObject::onMercureUpdate(const QString& data, const QString& /*id*/) { QJsonParseError err{}; const QJsonDocument doc = QJsonDocument::fromJson(data.toUtf8(), &err); if (err.error != QJsonParseError::NoError || !doc.isObject()) return; const QJsonObject envelope = doc.object(); const int version = envelope.value(QStringLiteral("version")).toInt(); if (m_lastVersion > 0 && version > m_lastVersion + 1) { // gap — re-fetch to recover m_lastVersion = 0; refresh(); return; } m_lastVersion = std::max(m_lastVersion, version); const QString op = envelope.value(QStringLiteral("op")).toString(); const QString key = envelope.value(QStringLiteral("correlationKey")).toString(); if (op == QStringLiteral("delete")) { clearData(); setExists(false); } else if (op == QStringLiteral("upsert") || op == QStringLiteral("replace")) { applyPayload(envelope.value(QStringLiteral("data")).toObject()); setExists(true); } if (!key.isEmpty()) { const QByteArray k = key.toUtf8(); if (m_inFlight.contains(k)) { m_inFlight.remove(k); if (auto* t = m_echoTimers.take(k)) t->deleteLater(); if (m_inFlight.isEmpty()) setPending(false); emit commandSucceeded(key, {}); } } } void ReactiveObject::applyPayload(const QJsonObject& payload) { for (auto it = payload.constBegin(); it != payload.constEnd(); ++it) { m_data->insert(it.key(), it.value().toVariant()); } } void ReactiveObject::clearData() { const QStringList keys = m_data->keys(); for (const QString& k : keys) { m_data->clear(k); } } void ReactiveObject::setReady(bool r) { if (m_ready == r) return; m_ready = r; emit readyChanged(); } void ReactiveObject::setPending(bool p) { if (m_pending == p) return; m_pending = p; emit pendingChanged(); } void ReactiveObject::setExists(bool e) { if (m_exists == e) return; m_exists = e; emit existsChanged(); } void ReactiveObject::setError(const QString& e) { if (m_error == e) return; m_error = e; emit errorChanged(); } void ReactiveObject::invoke(const QString& method, const QString& path, const QVariant& body, const QVariantMap& optimistic) { if (m_baseUrl.isEmpty()) return; const QByteArray key = buildIdempotencyKey(); InFlight in; for (auto it = optimistic.constBegin(); it != optimistic.constEnd(); ++it) { if (m_data->keys().contains(it.key())) { in.backup.insert(it.key(), m_data->value(it.key())); } else { in.addedKeys << it.key(); } m_data->insert(it.key(), it.value()); } m_inFlight.insert(key, in); setPending(true); QNetworkRequest req(QUrl(m_baseUrl + path)); req.setRawHeader("Accept", "application/json"); req.setRawHeader("Idempotency-Key", key); if (!m_token.isEmpty()) { req.setRawHeader("Authorization", "Bearer " + m_token.toUtf8()); } QByteArray bytes; if (body.isValid() && !body.isNull()) { bytes = QJsonDocument(QJsonObject::fromVariantMap(body.toMap())).toJson(QJsonDocument::Compact); req.setHeader(QNetworkRequest::ContentTypeHeader, "application/json"); } QNetworkReply* reply = nullptr; if (method == QStringLiteral("POST")) { reply = m_nam->post(req, bytes); } else if (method == QStringLiteral("PATCH")) { reply = m_nam->sendCustomRequest(req, "PATCH", bytes); } else if (method == QStringLiteral("DELETE")) { reply = m_nam->deleteResource(req); } else { rollbackOptimistic(key); return; } m_replyToKey.insert(reply, key); connect(reply, &QNetworkReply::finished, this, &ReactiveObject::onCommandReplyFinished); auto* t = new QTimer(this); t->setSingleShot(true); t->setInterval(kEchoTimeoutMs); connect(t, &QTimer::timeout, this, [this, key]() { if (!m_inFlight.contains(key)) return; rollbackOptimistic(key); emit commandTimedOut(QString::fromUtf8(key)); }); t->start(); m_echoTimers.insert(key, t); } void ReactiveObject::rollbackOptimistic(const QByteArray& key) { auto it = m_inFlight.find(key); if (it == m_inFlight.end()) return; InFlight in = it.value(); m_inFlight.erase(it); if (auto* t = m_echoTimers.take(key)) t->deleteLater(); for (auto bit = in.backup.constBegin(); bit != in.backup.constEnd(); ++bit) { m_data->insert(bit.key(), bit.value()); } for (const QString& k : in.addedKeys) { m_data->clear(k); } if (m_inFlight.isEmpty()) setPending(false); } void ReactiveObject::onCommandReplyFinished() { auto* reply = qobject_cast(sender()); if (!reply) return; const QByteArray key = m_replyToKey.take(reply); reply->deleteLater(); const int status = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); if (reply->error() == QNetworkReply::NoError && status >= 200 && status < 300) { // Wait for Mercure echo to clear pending (server's view of truth). return; } QJsonParseError err{}; const QJsonDocument doc = QJsonDocument::fromJson(reply->readAll(), &err); QVariant problem; if (err.error == QJsonParseError::NoError && doc.isObject()) { problem = doc.object().toVariantMap(); } rollbackOptimistic(key); emit commandFailed(QString::fromUtf8(key), status, problem); } QByteArray ReactiveObject::buildIdempotencyKey() const { auto* gen = QRandomGenerator::system(); quint64 a = gen->generate64(); quint64 b = gen->generate64(); a = (a & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL; b = (b & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL; return QString::asprintf( "%08llx-%04llx-%04llx-%04llx-%012llx", (unsigned long long) ((a >> 32) & 0xFFFFFFFFULL), (unsigned long long) ((a >> 16) & 0xFFFFULL), (unsigned long long) (a & 0xFFFFULL), (unsigned long long) ((b >> 48) & 0xFFFFULL), (unsigned long long) (b & 0xFFFFFFFFFFFFULL) ).toLatin1(); } } // namespace PhpQml::Bridge