mirror of
https://github.com/haexhub/haex-hub.git
synced 2025-12-17 06:30:50 +01:00
zwischenstand
This commit is contained in:
@ -1,5 +1,5 @@
|
||||
// src/hlc_service.rs
|
||||
|
||||
use crate::table_names::TABLE_CRDT_CONFIGS;
|
||||
use rusqlite::{params, Connection, Result as RusqliteResult, Transaction};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
@ -14,7 +14,7 @@ use uuid::Uuid;
|
||||
const HLC_NODE_ID_TYPE: &str = "hlc_node_id";
|
||||
const HLC_TIMESTAMP_TYPE: &str = "hlc_timestamp";
|
||||
|
||||
pub const CRDT_SETTINGS_TABLE: &str = "haex_crdt_settings";
|
||||
//pub const TABLE_CRDT_CONFIGS: &str = "haex_crdt_settings";
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HlcError {
|
||||
@ -49,7 +49,7 @@ impl HlcService {
|
||||
|
||||
// 3. Load the last persisted timestamp and update the clock.
|
||||
let last_state_str: RusqliteResult<String> = conn.query_row(
|
||||
&format!("SELECT value FROM {} WHERE type = ?1", CRDT_SETTINGS_TABLE),
|
||||
&format!("SELECT value FROM {} WHERE key = ?1", TABLE_CRDT_CONFIGS),
|
||||
params![HLC_TIMESTAMP_TYPE],
|
||||
|row| row.get(0),
|
||||
);
|
||||
@ -83,9 +83,9 @@ impl HlcService {
|
||||
|
||||
tx.execute(
|
||||
&format!(
|
||||
"INSERT INTO {} (type, value) VALUES (?1,?2)
|
||||
ON CONFLICT(type) DO UPDATE SET value = excluded.value",
|
||||
CRDT_SETTINGS_TABLE
|
||||
"INSERT INTO {} (key, value) VALUES (?1,?2)
|
||||
ON CONFLICT(key) DO UPDATE SET value = excluded.value",
|
||||
TABLE_CRDT_CONFIGS
|
||||
),
|
||||
params![HLC_TIMESTAMP_TYPE, timestamp_str],
|
||||
)?;
|
||||
@ -97,7 +97,7 @@ impl HlcService {
|
||||
fn get_or_create_node_id(conn: &mut Connection) -> Result<ID, HlcError> {
|
||||
let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
|
||||
|
||||
let query = format!("SELECT value FROM {} WHERE type =?1", CRDT_SETTINGS_TABLE);
|
||||
let query = format!("SELECT value FROM {} WHERE key =?1", TABLE_CRDT_CONFIGS);
|
||||
|
||||
match tx.query_row(&query, params![HLC_NODE_ID_TYPE], |row| {
|
||||
row.get::<_, String>(0)
|
||||
@ -117,8 +117,8 @@ impl HlcService {
|
||||
|
||||
tx.execute(
|
||||
&format!(
|
||||
"INSERT INTO {} (type, value) VALUES (?1, ?2)",
|
||||
CRDT_SETTINGS_TABLE
|
||||
"INSERT INTO {} (key, value) VALUES (?1, ?2)",
|
||||
TABLE_CRDT_CONFIGS
|
||||
),
|
||||
params![HLC_NODE_ID_TYPE, new_id_str],
|
||||
)?;
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
// In src-tauri/src/crdt/proxy.rs
|
||||
|
||||
use crate::crdt::hlc::HlcService;
|
||||
use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN};
|
||||
use crate::table_names::{TABLE_CRDT_CONFIGS, TABLE_CRDT_LOGS};
|
||||
use rusqlite::Connection;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
use sqlparser::ast::{
|
||||
Assignment, AssignmentTarget, BinaryOperator, ColumnDef, DataType, Expr, Ident, Insert,
|
||||
ObjectName, ObjectNamePart, SelectItem, SetExpr, Statement, TableFactor, TableObject,
|
||||
@ -11,8 +13,11 @@ use sqlparser::ast::{
|
||||
use sqlparser::dialect::SQLiteDialect;
|
||||
use sqlparser::parser::Parser;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tauri::{path::BaseDirectory, AppHandle, Manager, State};
|
||||
use ts_rs::TS;
|
||||
use uhlc::Timestamp;
|
||||
pub struct DbConnection(pub Arc<Mutex<Option<Connection>>>);
|
||||
|
||||
#[derive(Serialize, Deserialize, TS)]
|
||||
#[ts(export)]
|
||||
@ -41,7 +46,7 @@ pub enum ProxyError {
|
||||
}
|
||||
|
||||
// Tabellen, die von der Proxy-Logik ausgeschlossen sind.
|
||||
const EXCLUDED_TABLES: &[&str] = &["haex_crdt_settings", "haex_crdt_logs"];
|
||||
const EXCLUDED_TABLES: &[&str] = &[TABLE_CRDT_CONFIGS, TABLE_CRDT_LOGS];
|
||||
|
||||
pub struct SqlProxy;
|
||||
|
||||
@ -54,7 +59,8 @@ impl SqlProxy {
|
||||
pub fn execute(
|
||||
&self,
|
||||
sql: &str,
|
||||
conn: &mut rusqlite::Connection,
|
||||
params: Vec<JsonValue>,
|
||||
state: State<'_, DbConnection>,
|
||||
hlc_service: &HlcService,
|
||||
) -> Result<Vec<String>, ProxyError> {
|
||||
let dialect = SQLiteDialect {};
|
||||
@ -64,21 +70,27 @@ impl SqlProxy {
|
||||
|
||||
let mut modified_schema_tables = HashSet::new();
|
||||
|
||||
let db_lock = state
|
||||
.0
|
||||
.lock()
|
||||
.map_err(|e| format!("Mutex Lock Fehler: {}", e))?;
|
||||
let conn = db_lock.as_ref().ok_or("Keine Datenbankverbindung")?;
|
||||
|
||||
let tx = conn
|
||||
.transaction()
|
||||
.map_err(|e| ProxyError::TransactionError {
|
||||
reason: e.to_string(),
|
||||
})?;
|
||||
|
||||
let hlc_timestamp =
|
||||
hlc_service
|
||||
.new_timestamp_and_persist(&tx)
|
||||
.map_err(|e| ProxyError::HlcError {
|
||||
reason: e.to_string(),
|
||||
})?;
|
||||
/* let hlc_timestamp =
|
||||
hlc_service
|
||||
.new_timestamp_and_persist(&tx)
|
||||
.map_err(|e| ProxyError::HlcError {
|
||||
reason: e.to_string(),
|
||||
})?; */
|
||||
|
||||
for statement in &mut ast_vec {
|
||||
if let Some(table_name) = self.transform_statement(statement, Some(&hlc_timestamp))? {
|
||||
if let Some(table_name) = self.transform_statement(statement)? {
|
||||
modified_schema_tables.insert(table_name);
|
||||
}
|
||||
}
|
||||
@ -99,15 +111,12 @@ impl SqlProxy {
|
||||
}
|
||||
|
||||
/// Wendet die Transformation auf ein einzelnes Statement an.
|
||||
fn transform_statement(
|
||||
&self,
|
||||
stmt: &mut Statement,
|
||||
hlc_timestamp: Option<&Timestamp>,
|
||||
) -> Result<Option<String>, ProxyError> {
|
||||
fn transform_statement(&self, stmt: &mut Statement) -> Result<Option<String>, ProxyError> {
|
||||
match stmt {
|
||||
Statement::Query(query) => {
|
||||
if let SetExpr::Select(select) = &mut *query.body {
|
||||
let mut tombstone_filters = Vec::new();
|
||||
|
||||
for twj in &select.from {
|
||||
if let TableFactor::Table { name, alias, .. } = &twj.relation {
|
||||
if self.is_audited_table(name) {
|
||||
@ -160,7 +169,7 @@ impl SqlProxy {
|
||||
}
|
||||
}
|
||||
|
||||
// Hinweis: UNION, EXCEPT etc. werden hier nicht behandelt, was dem bisherigen Code entspricht.
|
||||
// TODO: UNION, EXCEPT etc. werden hier nicht behandelt
|
||||
}
|
||||
|
||||
Statement::CreateTable(create_table) => {
|
||||
@ -180,9 +189,7 @@ impl SqlProxy {
|
||||
Statement::Insert(insert_stmt) => {
|
||||
if let TableObject::TableName(name) = &insert_stmt.table {
|
||||
if self.is_audited_table(name) {
|
||||
if let Some(ts) = hlc_timestamp {
|
||||
self.add_hlc_to_insert(insert_stmt, ts);
|
||||
}
|
||||
self.add_hlc_to_insert(insert_stmt);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -217,9 +224,8 @@ impl SqlProxy {
|
||||
if let Some(name) = table_name {
|
||||
if self.is_audited_table(&name) {
|
||||
// GEÄNDERT: Übergibt den Zeitstempel an die Transformationsfunktion
|
||||
if let Some(ts) = hlc_timestamp {
|
||||
self.transform_delete_to_update(stmt, ts);
|
||||
}
|
||||
|
||||
self.transform_delete_to_update(stmt);
|
||||
}
|
||||
} else {
|
||||
return Err(ProxyError::UnsupportedStatement {
|
||||
@ -336,24 +342,20 @@ impl SqlProxy {
|
||||
if !columns.iter().any(|c| c.name.value == HLC_TIMESTAMP_COLUMN) {
|
||||
columns.push(ColumnDef {
|
||||
name: Ident::new(HLC_TIMESTAMP_COLUMN),
|
||||
data_type: DataType::Text, // HLC wird als String gespeichert
|
||||
data_type: DataType::String(None),
|
||||
options: vec![],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn transform_delete_to_update(&self, stmt: &mut Statement, hlc_timestamp: &Timestamp) {
|
||||
fn transform_delete_to_update(&self, stmt: &mut Statement) {
|
||||
if let Statement::Delete(del_stmt) = stmt {
|
||||
let table_to_update = match &del_stmt.from {
|
||||
sqlparser::ast::FromTable::WithFromKeyword(from)
|
||||
| sqlparser::ast::FromTable::WithoutKeyword(from) => from[0].clone(),
|
||||
};
|
||||
|
||||
// Erstellt beide Zuweisungen
|
||||
let assignments = vec![
|
||||
self.create_tombstone_assignment(),
|
||||
self.create_hlc_assignment(hlc_timestamp),
|
||||
];
|
||||
let assignments = vec![self.create_tombstone_assignment()];
|
||||
|
||||
*stmt = Statement::Update {
|
||||
table: table_to_update,
|
||||
|
||||
@ -1,16 +1,15 @@
|
||||
use crate::table_names::{TABLE_CRDT_CONFIGS, TABLE_CRDT_LOGS};
|
||||
use crate::table_names::TABLE_CRDT_LOGS;
|
||||
use rusqlite::{Connection, Result as RusqliteResult, Row, Transaction};
|
||||
use serde::Serialize;
|
||||
use std::error::Error;
|
||||
use std::fmt::{self, Display, Formatter, Write};
|
||||
use std::panic::{self, AssertUnwindSafe};
|
||||
use ts_rs::TS;
|
||||
|
||||
// Die "z_"-Präfix soll sicherstellen, dass diese Trigger als Letzte ausgeführt werden
|
||||
// Der "z_"-Präfix soll sicherstellen, dass diese Trigger als Letzte ausgeführt werden
|
||||
const INSERT_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_insert";
|
||||
const UPDATE_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_update";
|
||||
|
||||
const SYNC_ACTIVE_KEY: &str = "sync_active";
|
||||
//const SYNC_ACTIVE_KEY: &str = "sync_active";
|
||||
pub const TOMBSTONE_COLUMN: &str = "haex_tombstone";
|
||||
pub const HLC_TIMESTAMP_COLUMN: &str = "haex_hlc_timestamp";
|
||||
|
||||
@ -23,6 +22,10 @@ pub enum CrdtSetupError {
|
||||
table_name: String,
|
||||
column_name: String,
|
||||
},
|
||||
HlcColumnMissing {
|
||||
table_name: String,
|
||||
column_name: String,
|
||||
},
|
||||
/// Die Tabelle hat keinen Primärschlüssel, was eine CRDT-Voraussetzung ist.
|
||||
PrimaryKeyMissing { table_name: String },
|
||||
}
|
||||
@ -40,6 +43,14 @@ impl Display for CrdtSetupError {
|
||||
"Table '{}' is missing the required tombstone column '{}'",
|
||||
table_name, column_name
|
||||
),
|
||||
CrdtSetupError::HlcColumnMissing {
|
||||
table_name,
|
||||
column_name,
|
||||
} => write!(
|
||||
f,
|
||||
"Table '{}' is missing the required hlc column '{}'",
|
||||
table_name, column_name
|
||||
),
|
||||
CrdtSetupError::PrimaryKeyMissing { table_name } => {
|
||||
write!(f, "Table '{}' has no primary key", table_name)
|
||||
}
|
||||
@ -66,55 +77,52 @@ pub enum TriggerSetupResult {
|
||||
TableNotFound,
|
||||
}
|
||||
|
||||
fn set_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
/* fn set_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
let sql = format!(
|
||||
"INSERT OR REPLACE INTO \"{meta_table}\" (key, value) VALUES (?, '1');",
|
||||
meta_table = TABLE_CRDT_CONFIGS
|
||||
);
|
||||
conn.execute(&sql, [SYNC_ACTIVE_KEY])?;
|
||||
Ok(())
|
||||
}
|
||||
} */
|
||||
|
||||
fn clear_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
/* fn clear_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
let sql = format!(
|
||||
"DELETE FROM \"{meta_table}\" WHERE key = ?;",
|
||||
meta_table = TABLE_CRDT_CONFIGS
|
||||
);
|
||||
conn.execute(&sql, [SYNC_ACTIVE_KEY])?;
|
||||
Ok(())
|
||||
}
|
||||
} */
|
||||
|
||||
/// Führt eine Aktion aus, während die Trigger temporär deaktiviert sind.
|
||||
/// Diese Funktion stellt sicher, dass die Trigger auch bei einem Absturz (Panic)
|
||||
/// wieder aktiviert werden.
|
||||
pub fn with_triggers_paused<F, R>(conn: &mut Connection, action: F) -> RusqliteResult<R>
|
||||
/* pub fn with_triggers_paused<F, R>(conn: &mut Connection, action: F) -> RusqliteResult<R>
|
||||
where
|
||||
F: FnOnce(&mut Connection) -> RusqliteResult<R>,
|
||||
{
|
||||
set_sync_active(conn)?;
|
||||
|
||||
// AssertUnwindSafe wird benötigt, um den Mutex über eine Panic-Grenze hinweg zu verwenden.
|
||||
// Wir fangen einen möglichen Panic in `action` ab.
|
||||
let result = panic::catch_unwind(AssertUnwindSafe(|| action(conn)));
|
||||
|
||||
// Diese Aktion MUSS immer ausgeführt werden, egal ob `action` erfolgreich war oder nicht.
|
||||
clear_sync_active(conn)?;
|
||||
|
||||
match result {
|
||||
Ok(res) => res, // Alles gut, gib das Ergebnis von `action` zurück.
|
||||
Err(e) => panic::resume_unwind(e), // Ein Panic ist aufgetreten, wir geben ihn weiter, nachdem wir aufgeräumt haben.
|
||||
}
|
||||
}
|
||||
} */
|
||||
|
||||
/// Erstellt die benötigte Meta-Tabelle, falls sie nicht existiert.
|
||||
pub fn setup_meta_table(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
/* pub fn setup_meta_table(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
let sql = format!(
|
||||
"CREATE TABLE IF NOT EXISTS \"{meta_table}\" (key TEXT PRIMARY KEY, value TEXT) WITHOUT ROWID;",
|
||||
meta_table = TABLE_CRDT_CONFIGS
|
||||
);
|
||||
conn.execute(&sql, [])?;
|
||||
Ok(())
|
||||
}
|
||||
} */
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ColumnInfo {
|
||||
@ -139,6 +147,7 @@ fn is_safe_identifier(name: &str) -> bool {
|
||||
pub fn setup_triggers_for_table(
|
||||
conn: &mut Connection,
|
||||
table_name: &str,
|
||||
recreate: &bool,
|
||||
) -> Result<TriggerSetupResult, CrdtSetupError> {
|
||||
if !is_safe_identifier(table_name) {
|
||||
return Err(rusqlite::Error::InvalidParameterName(format!(
|
||||
@ -161,6 +170,13 @@ pub fn setup_triggers_for_table(
|
||||
});
|
||||
}
|
||||
|
||||
if !columns.iter().any(|c| c.name == HLC_TIMESTAMP_COLUMN) {
|
||||
return Err(CrdtSetupError::HlcColumnMissing {
|
||||
table_name: table_name.to_string(),
|
||||
column_name: HLC_TIMESTAMP_COLUMN.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let pks: Vec<String> = columns
|
||||
.iter()
|
||||
.filter(|c| c.is_pk)
|
||||
@ -175,7 +191,7 @@ pub fn setup_triggers_for_table(
|
||||
|
||||
let cols_to_track: Vec<String> = columns
|
||||
.iter()
|
||||
.filter(|c| !c.is_pk && c.name != TOMBSTONE_COLUMN && c.name != HLC_TIMESTAMP_COLUMN)
|
||||
.filter(|c| !c.is_pk) //&& c.name != TOMBSTONE_COLUMN && c.name != HLC_TIMESTAMP_COLUMN
|
||||
.map(|c| c.name.clone())
|
||||
.collect();
|
||||
|
||||
@ -186,6 +202,10 @@ pub fn setup_triggers_for_table(
|
||||
|
||||
// Führe die Erstellung innerhalb einer Transaktion aus
|
||||
let tx = conn.transaction()?;
|
||||
|
||||
if *recreate {
|
||||
drop_triggers_for_table(&tx, table_name)?;
|
||||
}
|
||||
tx.execute_batch(&sql_batch)?;
|
||||
tx.commit()?;
|
||||
|
||||
@ -224,7 +244,7 @@ pub fn drop_triggers_for_table(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn recreate_triggers_for_table(
|
||||
/* pub fn recreate_triggers_for_table(
|
||||
conn: &mut Connection,
|
||||
table_name: &str,
|
||||
) -> Result<TriggerSetupResult, CrdtSetupError> {
|
||||
@ -278,7 +298,7 @@ pub fn recreate_triggers_for_table(
|
||||
|
||||
Ok(TriggerSetupResult::Success)
|
||||
}
|
||||
|
||||
*/
|
||||
/// Generiert das SQL für den INSERT-Trigger.
|
||||
fn generate_insert_trigger_sql(table_name: &str, pks: &[String], cols: &[String]) -> String {
|
||||
let pk_json_payload = pks
|
||||
@ -287,29 +307,39 @@ fn generate_insert_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
let column_inserts = cols.iter().fold(String::new(), |mut acc, col| {
|
||||
writeln!(&mut acc, " INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk, column_name, value) VALUES (NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"));",
|
||||
let column_inserts = if cols.is_empty() {
|
||||
// Nur PKs -> einfacher Insert ins Log
|
||||
format!(
|
||||
"INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pks)
|
||||
VALUES (hlc_new_timestamp(), 'INSERT', '{table}', json_object({pk_payload}));",
|
||||
log_table = TABLE_CRDT_LOGS,
|
||||
hlc_col = HLC_TIMESTAMP_COLUMN,
|
||||
table = table_name,
|
||||
pk_payload = pk_json_payload,
|
||||
column = col
|
||||
).unwrap();
|
||||
acc
|
||||
});
|
||||
pk_payload = pk_json_payload
|
||||
)
|
||||
} else {
|
||||
cols.iter().fold(String::new(), |mut acc, col| {
|
||||
writeln!(
|
||||
&mut acc,
|
||||
"INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pks, column_name, new_value)
|
||||
VALUES (hlc_new_timestamp(), 'INSERT', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"));",
|
||||
log_table = TABLE_CRDT_LOGS,
|
||||
table = table_name,
|
||||
pk_payload = pk_json_payload,
|
||||
column = col
|
||||
).unwrap();
|
||||
acc
|
||||
})
|
||||
};
|
||||
|
||||
let trigger_name = INSERT_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
|
||||
|
||||
format!(
|
||||
"CREATE TRIGGER IF NOT EXISTS \"{trigger_name}\"
|
||||
AFTER INSERT ON \"{table_name}\"
|
||||
WHEN (SELECT value FROM \"{config_table}\" WHERE key = '{sync_key}') IS NOT '1'
|
||||
FOR EACH ROW
|
||||
BEGIN
|
||||
{column_inserts}
|
||||
END;",
|
||||
config_table = TABLE_CRDT_CONFIGS,
|
||||
sync_key = SYNC_ACTIVE_KEY
|
||||
{column_inserts}
|
||||
END;"
|
||||
)
|
||||
}
|
||||
|
||||
@ -326,6 +356,57 @@ fn generate_update_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
let mut body = String::new();
|
||||
|
||||
// Spaltenänderungen loggen
|
||||
if !cols.is_empty() {
|
||||
for col in cols {
|
||||
writeln!(
|
||||
&mut body,
|
||||
"INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pks, column_name, new_value, old_value)
|
||||
SELECT hlc_new_timestamp(), 'UPDATE', '{table}', json_object({pk_payload}), '{column}',
|
||||
json_object('value', NEW.\"{column}\"), json_object('value', OLD.\"{column}\")
|
||||
WHERE NEW.\"{column}\" IS NOT OLD.\"{column}\";",
|
||||
log_table = TABLE_CRDT_LOGS,
|
||||
table = table_name,
|
||||
pk_payload = pk_json_payload,
|
||||
column = col
|
||||
).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Soft-delete loggen
|
||||
writeln!(
|
||||
&mut body,
|
||||
"INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pks)
|
||||
SELECT hlc_new_timestamp(), 'DELETE', '{table}', json_object({pk_payload})
|
||||
WHERE NEW.\"{tombstone_col}\" = 1 AND OLD.\"{tombstone_col}\" = 0;",
|
||||
log_table = TABLE_CRDT_LOGS,
|
||||
table = table_name,
|
||||
pk_payload = pk_json_payload,
|
||||
tombstone_col = TOMBSTONE_COLUMN
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let trigger_name = UPDATE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
|
||||
|
||||
format!(
|
||||
"CREATE TRIGGER IF NOT EXISTS \"{trigger_name}\"
|
||||
AFTER UPDATE ON \"{table_name}\"
|
||||
FOR EACH ROW
|
||||
BEGIN
|
||||
{body}
|
||||
END;"
|
||||
)
|
||||
}
|
||||
|
||||
/* fn generate_update_trigger_sql(table_name: &str, pks: &[String], cols: &[String]) -> String {
|
||||
let pk_json_payload = pks
|
||||
.iter()
|
||||
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
let column_updates = cols.iter().fold(String::new(), |mut acc, col| {
|
||||
writeln!(&mut acc, " IF NEW.\"{column}\" IS NOT OLD.\"{column}\" THEN INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value) VALUES (NEW.\"{hlc_col}\", 'UPDATE', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"), json_object('value', OLD.\"{column}\")); END IF;",
|
||||
log_table = TABLE_CRDT_LOGS,
|
||||
@ -361,7 +442,8 @@ fn generate_update_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
|
||||
sync_key = SYNC_ACTIVE_KEY
|
||||
)
|
||||
}
|
||||
|
||||
*/
|
||||
/*
|
||||
/// Durchläuft alle `haex_`-Tabellen und richtet die CRDT-Trigger ein.
|
||||
pub fn generate_haex_triggers(conn: &mut Connection) -> Result<(), rusqlite::Error> {
|
||||
println!("🔄 Setup CRDT triggers...");
|
||||
@ -387,4 +469,4 @@ pub fn generate_haex_triggers(conn: &mut Connection) -> Result<(), rusqlite::Err
|
||||
}
|
||||
println!("✨ Done setting up CRDT triggers.");
|
||||
Ok(())
|
||||
}
|
||||
} */
|
||||
|
||||
276
src-tauri/src/crdt/trigger_alter.rs
Normal file
276
src-tauri/src/crdt/trigger_alter.rs
Normal file
@ -0,0 +1,276 @@
|
||||
// Wir binden die Konstanten aus unserem generierten Modul ein.
|
||||
// `crate` bezieht sich auf das Wurzelverzeichnis unseres Crates (src-tauri/src).
|
||||
use crate::tableNames::*;
|
||||
|
||||
use rusqlite::{Connection, Result as RusqliteResult, Row};
|
||||
use serde::Serialize;
|
||||
use std::error::Error;
|
||||
use std::fmt::{self, Display, Formatter, Write};
|
||||
use std::panic::{self, AssertUnwindSafe};
|
||||
use ts_rs::TS;
|
||||
|
||||
// Harte Konstanten, die nicht aus der JSON-Datei kommen, da sie Teil der internen Logik sind.
|
||||
const SYNC_ACTIVE_KEY: &str = "sync_active";
|
||||
const TOMBSTONE_COLUMN: &str = "haex_tombstone";
|
||||
const HLC_TIMESTAMP_COLUMN: &str = "haex_hlc_timestamp";
|
||||
const INSERT_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_insert";
|
||||
const UPDATE_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_update";
|
||||
|
||||
// --- Eigener Error-Typ für klares Fehler-Handling ---
|
||||
#[derive(Debug)]
|
||||
pub enum CrdtSetupError {
|
||||
DatabaseError(rusqlite::Error),
|
||||
TombstoneColumnMissing {
|
||||
table_name: String,
|
||||
column_name: String,
|
||||
},
|
||||
PrimaryKeyMissing {
|
||||
table_name: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl Display for CrdtSetupError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
CrdtSetupError::DatabaseError(e) => write!(f, "Database error: {}", e),
|
||||
CrdtSetupError::TombstoneColumnMissing {
|
||||
table_name,
|
||||
column_name,
|
||||
} => write!(
|
||||
f,
|
||||
"Table '{}' is missing the required tombstone column '{}'",
|
||||
table_name, column_name
|
||||
),
|
||||
CrdtSetupError::PrimaryKeyMissing { table_name } => {
|
||||
write!(f, "Table '{}' has no primary key", table_name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Error for CrdtSetupError {}
|
||||
impl From<rusqlite::Error> for CrdtSetupError {
|
||||
fn from(err: rusqlite::Error) -> Self {
|
||||
CrdtSetupError::DatabaseError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Öffentliche Structs und Enums ---
|
||||
#[derive(Debug, Serialize, TS)]
|
||||
#[ts(export)]
|
||||
pub enum TriggerSetupResult {
|
||||
Success,
|
||||
TableNotFound,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ColumnInfo {
|
||||
name: String,
|
||||
is_pk: bool,
|
||||
}
|
||||
impl ColumnInfo {
|
||||
fn from_row(row: &Row) -> RusqliteResult<Self> {
|
||||
Ok(ColumnInfo {
|
||||
name: row.get("name")?,
|
||||
is_pk: row.get::<_, i64>("pk")? > 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// --- Öffentliche Funktionen für die Anwendungslogik ---
|
||||
|
||||
/// Erstellt die benötigten CRDT-Systemtabellen (z.B. die Config-Tabelle), falls sie nicht existieren.
|
||||
/// Sollte beim Anwendungsstart einmalig aufgerufen werden.
|
||||
pub fn setup_crdt_tables(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
let config_sql = format!(
|
||||
"CREATE TABLE IF NOT EXISTS \"{config_table}\" (key TEXT PRIMARY KEY, value TEXT) WITHOUT ROWID;",
|
||||
config_table = TABLE_CRDT_CONFIGS
|
||||
);
|
||||
conn.execute(&config_sql, [])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Führt eine Aktion aus, während die Trigger temporär deaktiviert sind.
|
||||
/// Stellt sicher, dass die Trigger auch bei einem Absturz (Panic) wieder aktiviert werden.
|
||||
pub fn with_triggers_paused<F, R>(conn: &mut Connection, action: F) -> RusqliteResult<R>
|
||||
where
|
||||
F: FnOnce(&mut Connection) -> RusqliteResult<R>,
|
||||
{
|
||||
set_sync_active(conn)?;
|
||||
// `catch_unwind` fängt einen möglichen Panic in `action` ab.
|
||||
let result = panic::catch_unwind(AssertUnwindSafe(|| action(conn)));
|
||||
// Diese Aufräumaktion wird immer ausgeführt.
|
||||
clear_sync_active(conn)?;
|
||||
match result {
|
||||
Ok(res) => res, // Alles gut, gib das Ergebnis von `action` zurück.
|
||||
Err(e) => panic::resume_unwind(e), // Ein Panic ist aufgetreten, wir geben ihn weiter, nachdem wir aufgeräumt haben.
|
||||
}
|
||||
}
|
||||
|
||||
/// Analysiert alle `haex_`-Tabellen in der Datenbank und erstellt die notwendigen CRDT-Trigger.
|
||||
pub fn generate_haex_triggers(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
println!("🔄 Setup CRDT triggers...");
|
||||
let table_names: Vec<String> = {
|
||||
let mut stmt = conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'haex_%' AND name NOT LIKE 'haex_crdt_%';")?;
|
||||
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
|
||||
rows.collect::<RusqliteResult<Vec<String>>>()?
|
||||
};
|
||||
|
||||
for table_name in table_names {
|
||||
// Überspringe die Config-Tabelle selbst, sie braucht keine Trigger.
|
||||
if table_name == TABLE_CRDT_CONFIGS {
|
||||
continue;
|
||||
}
|
||||
println!("➡️ Processing table: {}", table_name);
|
||||
match setup_triggers_for_table(conn, &table_name) {
|
||||
Ok(TriggerSetupResult::Success) => {
|
||||
println!(" ✅ Triggers created for {}", table_name)
|
||||
}
|
||||
Ok(TriggerSetupResult::TableNotFound) => {
|
||||
println!(" ℹ️ Table {} not found, skipping.", table_name)
|
||||
}
|
||||
Err(e) => println!(" ❌ Could not set up triggers for {}: {}", table_name, e),
|
||||
}
|
||||
}
|
||||
println!("✨ Done setting up CRDT triggers.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// --- Private Hilfsfunktionen ---
|
||||
|
||||
fn set_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
let sql = format!(
|
||||
"INSERT OR REPLACE INTO \"{config_table}\" (key, value) VALUES (?, '1');",
|
||||
config_table = TABLE_CRDT_CONFIGS
|
||||
);
|
||||
conn.execute(&sql, [SYNC_ACTIVE_KEY])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn clear_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
|
||||
let sql = format!(
|
||||
"DELETE FROM \"{config_table}\" WHERE key = ?;",
|
||||
config_table = TABLE_CRDT_CONFIGS
|
||||
);
|
||||
conn.execute(&sql, [SYNC_ACTIVE_KEY])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_safe_identifier(name: &str) -> bool {
|
||||
!name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
|
||||
}
|
||||
|
||||
fn setup_triggers_for_table(
|
||||
conn: &mut Connection,
|
||||
table_name: &str,
|
||||
) -> Result<TriggerSetupResult, CrdtSetupError> {
|
||||
if !is_safe_identifier(table_name) {
|
||||
return Err(rusqlite::Error::InvalidParameterName(format!(
|
||||
"Invalid table name: {}",
|
||||
table_name
|
||||
))
|
||||
.into());
|
||||
}
|
||||
let columns = get_table_schema(conn, table_name)?;
|
||||
if columns.is_empty() {
|
||||
return Ok(TriggerSetupResult::TableNotFound);
|
||||
}
|
||||
if !columns.iter().any(|c| c.name == TOMBSTONE_COLUMN) {
|
||||
return Err(CrdtSetupError::TombstoneColumnMissing {
|
||||
table_name: table_name.to_string(),
|
||||
column_name: TOMBSTONE_COLUMN.to_string(),
|
||||
});
|
||||
}
|
||||
let pks: Vec<String> = columns
|
||||
.iter()
|
||||
.filter(|c| c.is_pk)
|
||||
.map(|c| c.name.clone())
|
||||
.collect();
|
||||
if pks.is_empty() {
|
||||
return Err(CrdtSetupError::PrimaryKeyMissing {
|
||||
table_name: table_name.to_string(),
|
||||
});
|
||||
}
|
||||
let cols_to_track: Vec<String> = columns
|
||||
.iter()
|
||||
.filter(|c| !c.is_pk && c.name != TOMBSTONE_COLUMN && c.name != HLC_TIMESTAMP_COLUMN)
|
||||
.map(|c| c.name.clone())
|
||||
.collect();
|
||||
|
||||
let insert_trigger_sql = generate_insert_trigger_sql(table_name, &pks, &cols_to_track);
|
||||
let update_trigger_sql = generate_update_trigger_sql(table_name, &pks, &cols_to_track);
|
||||
let drop_insert_trigger_sql =
|
||||
drop_trigger_sql(INSERT_TRIGGER_TPL.replace("{TABLE_NAME}", table_name));
|
||||
let drop_update_trigger_sql =
|
||||
drop_trigger_sql(UPDATE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name));
|
||||
|
||||
let tx = conn.transaction()?;
|
||||
tx.execute_batch(&format!(
|
||||
"{}\n{}\n{}\n{}",
|
||||
drop_insert_trigger_sql, drop_update_trigger_sql, insert_trigger_sql, update_trigger_sql
|
||||
))?;
|
||||
tx.commit()?;
|
||||
|
||||
Ok(TriggerSetupResult::Success)
|
||||
}
|
||||
|
||||
fn get_table_schema(conn: &Connection, table_name: &str) -> RusqliteResult<Vec<ColumnInfo>> {
|
||||
let sql = format!("PRAGMA table_info(\"{}\");", table_name);
|
||||
let mut stmt = conn.prepare(&sql)?;
|
||||
let rows = stmt.query_map([], ColumnInfo::from_row)?;
|
||||
rows.collect()
|
||||
}
|
||||
|
||||
fn drop_trigger_sql(trigger_name: String) -> String {
|
||||
format!("DROP TRIGGER IF EXISTS \"{}\";", trigger_name)
|
||||
}
|
||||
|
||||
fn generate_insert_trigger_sql(table_name: &str, pks: &[String], cols: &[String]) -> String {
|
||||
let pk_json_payload = pks
|
||||
.iter()
|
||||
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let column_inserts = cols.iter().fold(String::new(), |mut acc, col| {
|
||||
writeln!(&mut acc, " INSERT INTO \"{log_table}\" (hlc_timestamp, op_type, table_name, row_pk, column_name, value) VALUES (NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"));", log_table = TABLE_CRDT_LOGS, hlc_col = HLC_TIMESTAMP_COLUMN, table = table_name, pk_payload = pk_json_payload, column = col).unwrap();
|
||||
acc
|
||||
});
|
||||
let trigger_name = INSERT_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
|
||||
format!(
|
||||
"CREATE TRIGGER IF NOT EXISTS \"{trigger_name}\"\n"
|
||||
+ " AFTER INSERT ON \"{table_name}\"\n"
|
||||
+ " WHEN (SELECT value FROM \"{config_table}\" WHERE key = '{sync_key}') IS NOT '1'\n"
|
||||
+ " FOR EACH ROW\n"
|
||||
+ " BEGIN\n"
|
||||
+ " {column_inserts}\n"
|
||||
+ " END;",
|
||||
config_table = TABLE_CRDT_CONFIGS,
|
||||
sync_key = SYNC_ACTIVE_KEY
|
||||
)
|
||||
}
|
||||
|
||||
fn generate_update_trigger_sql(table_name: &str, pks: &[String], cols: &[String]) -> String {
|
||||
let pk_json_payload = pks
|
||||
.iter()
|
||||
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let column_updates = cols.iter().fold(String::new(), |mut acc, col| {
|
||||
writeln!(&mut acc, " IF NEW.\"{column}\" IS NOT OLD.\"{column}\" THEN INSERT INTO \"{log_table}\" (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value) VALUES (NEW.\"{hlc_col}\", 'UPDATE', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"), json_object('value', OLD.\"{column}\")); END IF;", log_table = TABLE_CRDT_LOGS, hlc_col = HLC_TIMESTAMP_COLUMN, table = table_name, pk_payload = pk_json_payload, column = col).unwrap();
|
||||
acc
|
||||
});
|
||||
let soft_delete_logic = format!(
|
||||
" IF NEW.\"{tombstone_col}\" = 1 AND OLD.\"{tombstone_col}\" = 0 THEN INSERT INTO \"{log_table}\" (hlc_timestamp, op_type, table_name, row_pk) VALUES (NEW.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload})); END IF;", log_table = TABLE_CRDT_LOGS, hlc_col = HLC_TIMESTAMP_COLUMN, tombstone_col = TOMBSTONE_COLUMN, table = table_name, pk_payload = pk_json_payload);
|
||||
let trigger_name = UPDATE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
|
||||
format!(
|
||||
"CREATE TRIGGER IF NOT EXISTS \"{trigger_name}\"\n"
|
||||
+ " AFTER UPDATE ON \"{table_name}\"\n"
|
||||
+ " WHEN (SELECT value FROM \"{config_table}\" WHERE key = '{sync_key}') IS NOT '1'\n"
|
||||
+ " FOR EACH ROW\n"
|
||||
+ " BEGIN\n"
|
||||
+ " {column_updates}\n"
|
||||
+ " {soft_delete_logic}\n"
|
||||
+ " END;",
|
||||
config_table = TABLE_CRDT_CONFIGS,
|
||||
sync_key = SYNC_ACTIVE_KEY
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user