2026-05-02 15:12:50 +02:00
|
|
|
#include "ReactiveObject.h"
|
|
|
|
|
|
|
|
|
|
#include "MercureClient.h"
|
|
|
|
|
|
|
|
|
|
#include <QJsonDocument>
|
|
|
|
|
#include <QJsonObject>
|
|
|
|
|
#include <QJsonValue>
|
|
|
|
|
#include <QNetworkAccessManager>
|
|
|
|
|
#include <QNetworkReply>
|
|
|
|
|
#include <QNetworkRequest>
|
|
|
|
|
#include <QRandomGenerator>
|
|
|
|
|
#include <QTimer>
|
|
|
|
|
#include <QUrl>
|
|
|
|
|
|
|
|
|
|
namespace PhpQml::Bridge {
|
|
|
|
|
|
|
|
|
|
ReactiveObject::ReactiveObject(QObject* parent)
|
|
|
|
|
: QObject(parent)
|
|
|
|
|
, m_data(new QQmlPropertyMap(this))
|
|
|
|
|
, m_nam(new QNetworkAccessManager(this))
|
|
|
|
|
, m_mercure(new MercureClient(this))
|
|
|
|
|
{
|
|
|
|
|
connect(m_mercure, &MercureClient::update,
|
|
|
|
|
this, &ReactiveObject::onMercureUpdate);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ReactiveObject::~ReactiveObject()
|
|
|
|
|
{
|
|
|
|
|
qDeleteAll(m_echoTimers);
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-05 19:19:56 +02:00
|
|
|
void ReactiveObject::componentComplete()
|
|
|
|
|
{
|
|
|
|
|
m_complete = true;
|
|
|
|
|
if (!m_baseUrl.isEmpty() && !m_source.isEmpty()) {
|
|
|
|
|
refresh();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-02 15:12:50 +02:00
|
|
|
void ReactiveObject::setBaseUrl(const QString& v)
|
|
|
|
|
{
|
|
|
|
|
if (m_baseUrl == v) return;
|
|
|
|
|
m_baseUrl = v;
|
|
|
|
|
rewireMercure();
|
|
|
|
|
emit baseUrlChanged();
|
2026-05-05 19:19:56 +02:00
|
|
|
if (m_complete && !m_source.isEmpty()) refresh();
|
2026-05-02 15:12:50 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::setToken(const QString& v)
|
|
|
|
|
{
|
|
|
|
|
if (m_token == v) return;
|
|
|
|
|
m_token = v;
|
|
|
|
|
emit tokenChanged();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::setSource(const QString& v)
|
|
|
|
|
{
|
|
|
|
|
if (m_source == v) return;
|
|
|
|
|
m_source = v;
|
|
|
|
|
emit sourceChanged();
|
2026-05-05 19:19:56 +02:00
|
|
|
if (m_complete && !m_baseUrl.isEmpty()) refresh();
|
2026-05-02 15:12:50 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::setTopic(const QString& v)
|
|
|
|
|
{
|
|
|
|
|
if (m_topic == v) return;
|
|
|
|
|
m_topic = v;
|
|
|
|
|
rewireMercure();
|
|
|
|
|
emit topicChanged();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::rewireMercure()
|
|
|
|
|
{
|
|
|
|
|
if (m_baseUrl.isEmpty() || m_topic.isEmpty()) return;
|
|
|
|
|
m_mercure->stop();
|
|
|
|
|
m_mercure->setHubUrl(m_baseUrl + QStringLiteral("/.well-known/mercure"));
|
|
|
|
|
m_mercure->setTopic(m_topic);
|
|
|
|
|
m_mercure->start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::refresh()
|
|
|
|
|
{
|
|
|
|
|
fetchInitial();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::fetchInitial()
|
|
|
|
|
{
|
|
|
|
|
if (m_baseUrl.isEmpty() || m_source.isEmpty()) return;
|
|
|
|
|
if (m_pendingFetch) {
|
|
|
|
|
m_pendingFetch->abort();
|
|
|
|
|
m_pendingFetch->deleteLater();
|
|
|
|
|
m_pendingFetch = nullptr;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QNetworkRequest req(QUrl(m_baseUrl + m_source));
|
|
|
|
|
req.setRawHeader("Accept", "application/json");
|
|
|
|
|
if (!m_token.isEmpty()) {
|
|
|
|
|
req.setRawHeader("Authorization", "Bearer " + m_token.toUtf8());
|
|
|
|
|
}
|
|
|
|
|
req.setTransferTimeout(5000);
|
|
|
|
|
|
|
|
|
|
setReady(false);
|
|
|
|
|
m_pendingFetch = m_nam->get(req);
|
|
|
|
|
connect(m_pendingFetch, &QNetworkReply::finished,
|
|
|
|
|
this, &ReactiveObject::onFetchFinished);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::onFetchFinished()
|
|
|
|
|
{
|
|
|
|
|
QNetworkReply* reply = m_pendingFetch;
|
|
|
|
|
m_pendingFetch = nullptr;
|
|
|
|
|
if (!reply) return;
|
|
|
|
|
reply->deleteLater();
|
|
|
|
|
|
|
|
|
|
const int status = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
|
|
|
|
|
if (reply->error() != QNetworkReply::NoError) {
|
|
|
|
|
setError(reply->errorString());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (status == 404) {
|
|
|
|
|
clearData();
|
|
|
|
|
setExists(false);
|
|
|
|
|
setError(QString());
|
|
|
|
|
setReady(true);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (status < 200 || status >= 300) {
|
|
|
|
|
setError(QStringLiteral("GET %1 returned HTTP %2").arg(m_source).arg(status));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QJsonParseError err{};
|
|
|
|
|
const QJsonDocument doc = QJsonDocument::fromJson(reply->readAll(), &err);
|
|
|
|
|
if (err.error != QJsonParseError::NoError || !doc.isObject()) {
|
|
|
|
|
setError(QStringLiteral("Initial fetch did not return a JSON object."));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
applyPayload(doc.object());
|
|
|
|
|
setExists(true);
|
|
|
|
|
setError(QString());
|
|
|
|
|
setReady(true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::onMercureUpdate(const QString& data, const QString& /*id*/)
|
|
|
|
|
{
|
|
|
|
|
QJsonParseError err{};
|
|
|
|
|
const QJsonDocument doc = QJsonDocument::fromJson(data.toUtf8(), &err);
|
|
|
|
|
if (err.error != QJsonParseError::NoError || !doc.isObject()) return;
|
|
|
|
|
const QJsonObject envelope = doc.object();
|
|
|
|
|
|
|
|
|
|
const int version = envelope.value(QStringLiteral("version")).toInt();
|
|
|
|
|
if (m_lastVersion > 0 && version > m_lastVersion + 1) {
|
|
|
|
|
// gap — re-fetch to recover
|
|
|
|
|
m_lastVersion = 0;
|
|
|
|
|
refresh();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
m_lastVersion = std::max(m_lastVersion, version);
|
|
|
|
|
|
|
|
|
|
const QString op = envelope.value(QStringLiteral("op")).toString();
|
|
|
|
|
const QString key = envelope.value(QStringLiteral("correlationKey")).toString();
|
|
|
|
|
|
|
|
|
|
if (op == QStringLiteral("delete")) {
|
|
|
|
|
clearData();
|
|
|
|
|
setExists(false);
|
|
|
|
|
} else if (op == QStringLiteral("upsert") || op == QStringLiteral("replace")) {
|
|
|
|
|
applyPayload(envelope.value(QStringLiteral("data")).toObject());
|
|
|
|
|
setExists(true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!key.isEmpty()) {
|
|
|
|
|
const QByteArray k = key.toUtf8();
|
|
|
|
|
if (m_inFlight.contains(k)) {
|
|
|
|
|
m_inFlight.remove(k);
|
|
|
|
|
if (auto* t = m_echoTimers.take(k)) t->deleteLater();
|
|
|
|
|
if (m_inFlight.isEmpty()) setPending(false);
|
|
|
|
|
emit commandSucceeded(key, {});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::applyPayload(const QJsonObject& payload)
|
|
|
|
|
{
|
|
|
|
|
for (auto it = payload.constBegin(); it != payload.constEnd(); ++it) {
|
|
|
|
|
m_data->insert(it.key(), it.value().toVariant());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::clearData()
|
|
|
|
|
{
|
|
|
|
|
const QStringList keys = m_data->keys();
|
|
|
|
|
for (const QString& k : keys) {
|
|
|
|
|
m_data->clear(k);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::setReady(bool r)
|
|
|
|
|
{
|
|
|
|
|
if (m_ready == r) return;
|
|
|
|
|
m_ready = r;
|
|
|
|
|
emit readyChanged();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::setPending(bool p)
|
|
|
|
|
{
|
|
|
|
|
if (m_pending == p) return;
|
|
|
|
|
m_pending = p;
|
|
|
|
|
emit pendingChanged();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::setExists(bool e)
|
|
|
|
|
{
|
|
|
|
|
if (m_exists == e) return;
|
|
|
|
|
m_exists = e;
|
|
|
|
|
emit existsChanged();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::setError(const QString& e)
|
|
|
|
|
{
|
|
|
|
|
if (m_error == e) return;
|
|
|
|
|
m_error = e;
|
|
|
|
|
emit errorChanged();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::invoke(const QString& method,
|
|
|
|
|
const QString& path,
|
|
|
|
|
const QVariant& body,
|
|
|
|
|
const QVariantMap& optimistic)
|
|
|
|
|
{
|
|
|
|
|
if (m_baseUrl.isEmpty()) return;
|
|
|
|
|
|
|
|
|
|
const QByteArray key = buildIdempotencyKey();
|
|
|
|
|
|
|
|
|
|
InFlight in;
|
|
|
|
|
for (auto it = optimistic.constBegin(); it != optimistic.constEnd(); ++it) {
|
|
|
|
|
if (m_data->keys().contains(it.key())) {
|
|
|
|
|
in.backup.insert(it.key(), m_data->value(it.key()));
|
|
|
|
|
} else {
|
|
|
|
|
in.addedKeys << it.key();
|
|
|
|
|
}
|
|
|
|
|
m_data->insert(it.key(), it.value());
|
|
|
|
|
}
|
|
|
|
|
m_inFlight.insert(key, in);
|
|
|
|
|
setPending(true);
|
|
|
|
|
|
|
|
|
|
QNetworkRequest req(QUrl(m_baseUrl + path));
|
|
|
|
|
req.setRawHeader("Accept", "application/json");
|
|
|
|
|
req.setRawHeader("Idempotency-Key", key);
|
|
|
|
|
if (!m_token.isEmpty()) {
|
|
|
|
|
req.setRawHeader("Authorization", "Bearer " + m_token.toUtf8());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QByteArray bytes;
|
|
|
|
|
if (body.isValid() && !body.isNull()) {
|
|
|
|
|
bytes = QJsonDocument(QJsonObject::fromVariantMap(body.toMap())).toJson(QJsonDocument::Compact);
|
|
|
|
|
req.setHeader(QNetworkRequest::ContentTypeHeader, "application/json");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QNetworkReply* reply = nullptr;
|
|
|
|
|
if (method == QStringLiteral("POST")) {
|
|
|
|
|
reply = m_nam->post(req, bytes);
|
|
|
|
|
} else if (method == QStringLiteral("PATCH")) {
|
|
|
|
|
reply = m_nam->sendCustomRequest(req, "PATCH", bytes);
|
|
|
|
|
} else if (method == QStringLiteral("DELETE")) {
|
|
|
|
|
reply = m_nam->deleteResource(req);
|
|
|
|
|
} else {
|
|
|
|
|
rollbackOptimistic(key);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
m_replyToKey.insert(reply, key);
|
|
|
|
|
connect(reply, &QNetworkReply::finished,
|
|
|
|
|
this, &ReactiveObject::onCommandReplyFinished);
|
|
|
|
|
|
|
|
|
|
auto* t = new QTimer(this);
|
|
|
|
|
t->setSingleShot(true);
|
|
|
|
|
t->setInterval(kEchoTimeoutMs);
|
|
|
|
|
connect(t, &QTimer::timeout, this, [this, key]() {
|
|
|
|
|
if (!m_inFlight.contains(key)) return;
|
|
|
|
|
rollbackOptimistic(key);
|
|
|
|
|
emit commandTimedOut(QString::fromUtf8(key));
|
|
|
|
|
});
|
|
|
|
|
t->start();
|
|
|
|
|
m_echoTimers.insert(key, t);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::rollbackOptimistic(const QByteArray& key)
|
|
|
|
|
{
|
|
|
|
|
auto it = m_inFlight.find(key);
|
|
|
|
|
if (it == m_inFlight.end()) return;
|
|
|
|
|
InFlight in = it.value();
|
|
|
|
|
m_inFlight.erase(it);
|
|
|
|
|
if (auto* t = m_echoTimers.take(key)) t->deleteLater();
|
|
|
|
|
|
|
|
|
|
for (auto bit = in.backup.constBegin(); bit != in.backup.constEnd(); ++bit) {
|
|
|
|
|
m_data->insert(bit.key(), bit.value());
|
|
|
|
|
}
|
|
|
|
|
for (const QString& k : in.addedKeys) {
|
|
|
|
|
m_data->clear(k);
|
|
|
|
|
}
|
|
|
|
|
if (m_inFlight.isEmpty()) setPending(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReactiveObject::onCommandReplyFinished()
|
|
|
|
|
{
|
|
|
|
|
auto* reply = qobject_cast<QNetworkReply*>(sender());
|
|
|
|
|
if (!reply) return;
|
|
|
|
|
const QByteArray key = m_replyToKey.take(reply);
|
|
|
|
|
reply->deleteLater();
|
|
|
|
|
|
|
|
|
|
const int status = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
|
|
|
|
|
if (reply->error() == QNetworkReply::NoError && status >= 200 && status < 300) {
|
|
|
|
|
// Wait for Mercure echo to clear pending (server's view of truth).
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QJsonParseError err{};
|
|
|
|
|
const QJsonDocument doc = QJsonDocument::fromJson(reply->readAll(), &err);
|
|
|
|
|
QVariant problem;
|
|
|
|
|
if (err.error == QJsonParseError::NoError && doc.isObject()) {
|
|
|
|
|
problem = doc.object().toVariantMap();
|
|
|
|
|
}
|
|
|
|
|
rollbackOptimistic(key);
|
|
|
|
|
emit commandFailed(QString::fromUtf8(key), status, problem);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QByteArray ReactiveObject::buildIdempotencyKey() const
|
|
|
|
|
{
|
|
|
|
|
auto* gen = QRandomGenerator::system();
|
|
|
|
|
quint64 a = gen->generate64();
|
|
|
|
|
quint64 b = gen->generate64();
|
|
|
|
|
a = (a & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL;
|
|
|
|
|
b = (b & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL;
|
|
|
|
|
return QString::asprintf(
|
|
|
|
|
"%08llx-%04llx-%04llx-%04llx-%012llx",
|
|
|
|
|
(unsigned long long) ((a >> 32) & 0xFFFFFFFFULL),
|
|
|
|
|
(unsigned long long) ((a >> 16) & 0xFFFFULL),
|
|
|
|
|
(unsigned long long) (a & 0xFFFFULL),
|
|
|
|
|
(unsigned long long) ((b >> 48) & 0xFFFFULL),
|
|
|
|
|
(unsigned long long) (b & 0xFFFFFFFFFFFFULL)
|
|
|
|
|
).toLatin1();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} // namespace PhpQml::Bridge
|