From 91db0475cd575809d53c578378e92703e9d0bca2 Mon Sep 17 00:00:00 2001 From: haex Date: Thu, 28 Aug 2025 12:16:22 +0200 Subject: [PATCH] fixed crdt --- package.json | 1 + pnpm-lock.yaml | 10 + src-tauri/Cargo.lock | 33 ++ src-tauri/Cargo.toml | 7 +- src-tauri/database/schemas/crdt.ts | 9 +- src-tauri/database/schemas/vault.ts | 9 + src-tauri/src/crdt/hlc.rs | 186 +++++++---- src-tauri/src/crdt/log.rs | 22 -- src-tauri/src/crdt/mod.rs | 1 - src-tauri/src/crdt/proxy.rs | 502 +++++++++++++++++++++------- src-tauri/src/crdt/trigger.rs | 228 ++++++++----- 11 files changed, 703 insertions(+), 305 deletions(-) delete mode 100644 src-tauri/src/crdt/log.rs diff --git a/package.json b/package.json index 8c117ef..65f341d 100644 --- a/package.json +++ b/package.json @@ -50,6 +50,7 @@ "zod": "^3.25.67" }, "devDependencies": { + "@iconify-json/proicons": "^1.2.17", "@iconify/json": "^2.2.351", "@iconify/tailwind4": "^1.0.6", "@tauri-apps/cli": "^2.5.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a7b2be9..1d33dde 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -105,6 +105,9 @@ importers: specifier: ^3.25.67 version: 3.25.67 devDependencies: + '@iconify-json/proicons': + specifier: ^1.2.17 + version: 1.2.17 '@iconify/json': specifier: ^2.2.351 version: 2.2.355 @@ -687,6 +690,9 @@ packages: resolution: {integrity: sha512-bV0Tgo9K4hfPCek+aMAn81RppFKv2ySDQeMoSZuvTASywNTnVJCArCZE2FWqpvIatKu7VMRLWlR1EazvVhDyhQ==} engines: {node: '>=18.18'} + '@iconify-json/proicons@1.2.17': + resolution: {integrity: sha512-FYnIsbhj91Epr62+QuyVXkljizcqWBHWrK3KGMdKwYxan3PxDZQiOOfKQepLR02Y9PJ4/4N8N22P5F6XS8eCFw==} + '@iconify/collections@1.0.560': resolution: {integrity: sha512-sXm0Io67deUIqWmaSGb+JJ4Y9DJvgT/seaQlCOHUuHhs3qcPuLY6WMlfoUSmmXV5yHFhZprRZEs1np2BZ/BCJw==} @@ -5863,6 +5869,10 @@ snapshots: '@humanwhocodes/retry@0.4.3': {} + '@iconify-json/proicons@1.2.17': + dependencies: + '@iconify/types': 2.0.0 + '@iconify/collections@1.0.560': dependencies: '@iconify/types': 2.0.0 diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index d2a04f8..e548ecb 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -1563,7 +1563,9 @@ dependencies = [ "tauri-plugin-os", "tauri-plugin-persisted-scope", "tauri-plugin-store", + "thiserror 2.0.12", "tokio", + "ts-rs", "uhlc", "uuid", ] @@ -4582,6 +4584,15 @@ dependencies = [ "utf-8", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -4874,6 +4885,28 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "ts-rs" +version = "11.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ef1b7a6d914a34127ed8e1fa927eb7088903787bcded4fa3eef8f85ee1568be" +dependencies = [ + "thiserror 2.0.12", + "ts-rs-macros", +] + +[[package]] +name = "ts-rs-macros" +version = "11.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9d4ed7b4c18cc150a6a0a1e9ea1ecfa688791220781af6e119f9599a8502a0a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", + "termcolor", +] + [[package]] name = "typeid" version = "1.0.3" diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index cd69a9e..31bacf0 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -44,8 +44,9 @@ tauri-plugin-store = "2.3" tauri-plugin-http = "2.5" tauri-plugin-notification = "2.3" tauri-plugin-persisted-scope = "2.0.0" -tauri-plugin-android-fs = "9.5.0" +tauri-plugin-android-fs = "9.5.0" uuid = { version = "1.17.0", features = ["v4"] } +ts-rs = "11.0.1" +thiserror = "2.0.12" + #tauri-plugin-sql = { version = "2", features = ["sqlite"] } - - diff --git a/src-tauri/database/schemas/crdt.ts b/src-tauri/database/schemas/crdt.ts index 4fcb7ef..65da939 100644 --- a/src-tauri/database/schemas/crdt.ts +++ b/src-tauri/database/schemas/crdt.ts @@ -1,6 +1,6 @@ import { blob, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core' -export const haexCrdtMessages = sqliteTable('haex_crdt_messages', { +export const haexCrdtLogs = sqliteTable('haex_crdt_logs', { hlc_timestamp: text().primaryKey(), table_name: text(), row_pks: text({ mode: 'json' }), @@ -9,8 +9,8 @@ export const haexCrdtMessages = sqliteTable('haex_crdt_messages', { new_value: text({ mode: 'json' }), old_value: text({ mode: 'json' }), }) -export type InsertHaexCrdtMessages = typeof haexCrdtMessages.$inferInsert -export type SelectHaexCrdtMessages = typeof haexCrdtMessages.$inferSelect +export type InsertHaexCrdtLogs = typeof haexCrdtLogs.$inferInsert +export type SelectHaexCrdtLogs = typeof haexCrdtLogs.$inferSelect export const haexCrdtSnapshots = sqliteTable('haex_crdt_snapshots', { snapshot_id: text().primaryKey(), @@ -21,7 +21,6 @@ export const haexCrdtSnapshots = sqliteTable('haex_crdt_snapshots', { }) export const haexCrdtSettings = sqliteTable('haex_crdt_settings', { - id: text().primaryKey(), - type: text({ enum: ['hlc_timestamp'] }).unique(), + type: text().primaryKey(), value: text(), }) diff --git a/src-tauri/database/schemas/vault.ts b/src-tauri/database/schemas/vault.ts index 4a656e6..b7d997d 100644 --- a/src-tauri/database/schemas/vault.ts +++ b/src-tauri/database/schemas/vault.ts @@ -13,6 +13,7 @@ export const haexSettings = sqliteTable('haex_settings', { key: text(), type: text(), value: text(), + haex_tombstone: integer({ mode: 'boolean' }), }) export type InsertHaexSettings = typeof haexSettings.$inferInsert export type SelectHaexSettings = typeof haexSettings.$inferSelect @@ -25,6 +26,7 @@ export const haexExtensions = sqliteTable('haex_extensions', { name: text(), url: text(), version: text(), + haex_tombstone: integer({ mode: 'boolean' }), }) export type InsertHaexExtensions = typeof haexExtensions.$inferInsert export type SelectHaexExtensions = typeof haexExtensions.$inferSelect @@ -43,6 +45,7 @@ export const haexExtensionsPermissions = sqliteTable( updateAt: integer('updated_at', { mode: 'timestamp' }).$onUpdate( () => new Date(), ), + haex_tombstone: integer({ mode: 'boolean' }), }, (table) => [ unique().on(table.extensionId, table.resource, table.operation, table.path), @@ -66,6 +69,7 @@ export const haexNotifications = sqliteTable('haex_notifications', { type: text({ enum: ['error', 'success', 'warning', 'info', 'log'], }).notNull(), + haex_tombstone: integer({ mode: 'boolean' }), }) export type InsertHaexNotifications = typeof haexNotifications.$inferInsert export type SelectHaexNotifications = typeof haexNotifications.$inferSelect @@ -85,6 +89,7 @@ export const haexPasswordsItemDetails = sqliteTable( updateAt: integer('updated_at', { mode: 'timestamp' }).$onUpdate( () => new Date(), ), + haex_tombstone: integer({ mode: 'boolean' }), }, ) export type InsertHaexPasswordsItemDetails = @@ -104,6 +109,7 @@ export const haexPasswordsItemKeyValues = sqliteTable( updateAt: integer('updated_at', { mode: 'timestamp' }).$onUpdate( () => new Date(), ), + haex_tombstone: integer({ mode: 'boolean' }), }, ) export type InserthaexPasswordsItemKeyValues = @@ -123,6 +129,7 @@ export const haexPasswordsItemHistory = sqliteTable( oldValue: text('old_value'), newValue: text('new_value'), createdAt: text('created_at').default(sql`(CURRENT_TIMESTAMP)`), + haex_tombstone: integer({ mode: 'boolean' }), }, ) export type InserthaexPasswordsItemHistory = @@ -144,6 +151,7 @@ export const haexPasswordsGroups = sqliteTable('haex_passwords_groups', { updateAt: integer('updated_at', { mode: 'timestamp' }).$onUpdate( () => new Date(), ), + haex_tombstone: integer({ mode: 'boolean' }), }) export type InsertHaexPasswordsGroups = typeof haexPasswordsGroups.$inferInsert export type SelectHaexPasswordsGroups = typeof haexPasswordsGroups.$inferSelect @@ -157,6 +165,7 @@ export const haexPasswordsGroupItems = sqliteTable( itemId: text('item_id').references( (): AnySQLiteColumn => haexPasswordsItemDetails.id, ), + haex_tombstone: integer({ mode: 'boolean' }), }, (table) => [primaryKey({ columns: [table.itemId, table.groupId] })], ) diff --git a/src-tauri/src/crdt/hlc.rs b/src-tauri/src/crdt/hlc.rs index 382f1bc..069f165 100644 --- a/src-tauri/src/crdt/hlc.rs +++ b/src-tauri/src/crdt/hlc.rs @@ -1,67 +1,131 @@ -use rusqlite::{params, Connection, Result}; -use std::sync::{Arc, Mutex}; -use uhlc::{Timestamp, HLC}; +// src/hlc_service.rs + +use rusqlite::{params, Connection, Result as RusqliteResult, Transaction}; +use std::{ + fmt::Debug, + str::FromStr, + sync::{Arc, Mutex}, + time::Duration, +}; +use thiserror::Error; +use uhlc::{HLCBuilder, Timestamp, HLC, ID}; use uuid::Uuid; -const HLC_SETTING_TYPE: &str = "hlc_timestamp"; +const HLC_NODE_ID_TYPE: &str = "hlc_node_id"; +const HLC_TIMESTAMP_TYPE: &str = "hlc_timestamp"; -pub const GET_HLC_FUNCTION: &str = "get_hlc_timestamp"; pub const CRDT_SETTINGS_TABLE: &str = "haex_crdt_settings"; -pub struct HlcService(pub Arc>); -pub fn setup_hlc(conn: &mut Connection) -> Result<()> { - // 1. Lade den letzten HLC-Zustand oder erstelle einen neuen. - let hlc = conn - .query_row( - "SELECT value FROM {CRDT_SETTINGS_TABLE} meta WHERE type = ?1", - params![HLC_SETTING_TYPE], - |row| { - let state_str: String = row.get(0)?; - let timestamp = Timestamp::from_str(&state_str) - .map_err(|_| rusqlite::Error::ExecuteReturnedResults)?; // Konvertiere den Fehler - Ok(HLC::new(timestamp)) - }, - ) - .unwrap_or_else(|_| HLC::default()); // Bei Fehler (z.B. nicht gefunden) -> neuen HLC erstellen. - - let hlc_arc = Arc::new(Mutex::new(hlc)); - - // 2. Erstelle eine Klon für die SQL-Funktion und speichere den Zustand bei jeder neuen Timestamp-Generierung. - let hlc_clone = hlc_arc.clone(); - let db_conn_arc = Arc::new(Mutex::new(conn.try_clone()?)); - - conn.create_scalar_function( - GET_HLC_FUNCTION, - 0, - rusqlite::functions::FunctionFlags::SQLITE_UTF8 - | rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC, - move |_| { - let mut hlc = hlc_clone.lock().unwrap(); - let new_timestamp = hlc.new_timestamp(); - let timestamp_str = new_timestamp.to_string(); - - // 3. Speichere den neuen Zustand sofort zurück in die DB. - // UPSERT-Logik: Ersetze den Wert, falls der Schlüssel existiert, sonst füge ihn ein. - let db_conn = db_conn_arc.lock().unwrap(); - db_conn - .execute( - "INSERT INTO {CRDT_SETTINGS_TABLE} (id, type, value) VALUES (?1, ?2, ?3) - ON CONFLICT(type) DO UPDATE SET value = excluded.value", - params![ - Uuid::new_v4().to_string(), // Generiere eine neue ID für den Fall eines INSERTs - HLC_SETTING_TYPE, - ×tamp_str - ], - ) - .expect("HLC state could not be persisted."); // In Prod sollte hier ein besseres Error-Handling hin. - - Ok(timestamp_str) - }, - )?; - - // Hinweis: Den HLC-Service im Tauri-State zu managen ist nicht mehr zwingend, - // da die SQL-Funktion nun alles Notwendige über geklonte Arcs erhält. - // Falls du ihn dennoch für andere Commands brauchst, kannst du ihn im State speichern. - - Ok(()) +#[derive(Error, Debug)] +pub enum HlcError { + #[error("Database error: {0}")] + Database(#[from] rusqlite::Error), + #[error("Failed to parse persisted HLC timestamp: {0}")] + ParseTimestamp(String), + #[error("Failed to parse persisted HLC state: {0}")] + Parse(String), + #[error("HLC mutex was poisoned")] + MutexPoisoned, + #[error("Failed to create node ID: {0}")] + CreateNodeId(#[from] uhlc::SizeError), +} + +/// A thread-safe, persistent HLC service. +#[derive(Clone)] +pub struct HlcService(Arc>); + +impl HlcService { + /// Creates a new HLC service, initializing it from the database or creating a new + /// persistent identity if one does not exist. + pub fn new(conn: &mut Connection) -> Result { + // 1. Manage persistent node identity. + let node_id = Self::get_or_create_node_id(conn)?; + + // 2. Create HLC instance with stable identity using the HLCBuilder. + let hlc = HLCBuilder::new() + .with_id(node_id) + .with_max_delta(Duration::from_secs(1)) // Example of custom configuration + .build(); + + // 3. Load the last persisted timestamp and update the clock. + let last_state_str: RusqliteResult = conn.query_row( + &format!("SELECT value FROM {} WHERE type = ?1", CRDT_SETTINGS_TABLE), + params![HLC_TIMESTAMP_TYPE], + |row| row.get(0), + ); + + if let Ok(state_str) = last_state_str { + let timestamp = + Timestamp::from_str(&state_str).map_err(|e| HlcError::ParseTimestamp(e.cause))?; + + // Update the clock with the persisted state. + // we might want to handle the error case where the clock drifts too far. + hlc.update_with_timestamp(×tamp) + .map_err(|e| HlcError::Parse(e.to_string()))?; + } + + let hlc_arc = Arc::new(Mutex::new(hlc)); + Ok(HlcService(hlc_arc)) + } + + /// Generates a new timestamp and immediately persists the HLC's new state. + /// This method MUST be called within an existing database transaction (`tx`) + /// along with the actual data operation that this timestamp is for. + /// This design ensures atomicity: the data is saved with its timestamp, + /// and the clock state is updated, or none of it is. + pub fn new_timestamp_and_persist<'tx>( + &self, + tx: &Transaction<'tx>, + ) -> Result { + let hlc = self.0.lock().map_err(|_| HlcError::MutexPoisoned)?; + let new_timestamp = hlc.new_timestamp(); + let timestamp_str = new_timestamp.to_string(); + + tx.execute( + &format!( + "INSERT INTO {} (type, value) VALUES (?1,?2) + ON CONFLICT(type) DO UPDATE SET value = excluded.value", + CRDT_SETTINGS_TABLE + ), + params![HLC_TIMESTAMP_TYPE, timestamp_str], + )?; + + Ok(new_timestamp) + } + + /// Retrieves or creates and persists a stable node ID for the HLC. + fn get_or_create_node_id(conn: &mut Connection) -> Result { + let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?; + + let query = format!("SELECT value FROM {} WHERE type =?1", CRDT_SETTINGS_TABLE); + + match tx.query_row(&query, params![HLC_NODE_ID_TYPE], |row| { + row.get::<_, String>(0) + }) { + Ok(id_str) => { + // ID exists, parse and return it. + let id_bytes = hex::decode(id_str).map_err(|e| HlcError::Parse(e.to_string()))?; + let id = ID::try_from(id_bytes.as_slice())?; + tx.commit()?; + Ok(id) + } + Err(rusqlite::Error::QueryReturnedNoRows) => { + // No ID found, create, persist, and return a new one. + let new_id_bytes = Uuid::new_v4().as_bytes().to_vec(); + let new_id = ID::try_from(new_id_bytes.as_slice())?; + let new_id_str = hex::encode(new_id.to_le_bytes()); + + tx.execute( + &format!( + "INSERT INTO {} (type, value) VALUES (?1, ?2)", + CRDT_SETTINGS_TABLE + ), + params![HLC_NODE_ID_TYPE, new_id_str], + )?; + tx.commit()?; + Ok(new_id) + } + Err(e) => Err(HlcError::from(e)), + } + } } diff --git a/src-tauri/src/crdt/log.rs b/src-tauri/src/crdt/log.rs deleted file mode 100644 index d3769aa..0000000 --- a/src-tauri/src/crdt/log.rs +++ /dev/null @@ -1,22 +0,0 @@ -// src/entities/crdt_log.rs -use sea_orm::entity::prelude::*; - -#[sea_orm(table_name = "crdt_log")] -pub struct Model { - #[sea_orm(primary_key, auto_increment = true)] - pub id: i64, - pub hlc_timestamp: String, - pub op_type: String, - pub table_name: String, - pub row_pk: String, // Wird als JSON-String gespeichert - #[sea_orm(nullable)] - pub column_name: Option, - #[sea_orm(nullable)] - pub value: Option, - #[sea_orm(nullable)] - pub old_value: Option, -} - -pub enum Relation {} - -impl ActiveModelBehavior for ActiveModel {} diff --git a/src-tauri/src/crdt/mod.rs b/src-tauri/src/crdt/mod.rs index 01843bd..c897ca3 100644 --- a/src-tauri/src/crdt/mod.rs +++ b/src-tauri/src/crdt/mod.rs @@ -1,4 +1,3 @@ pub mod hlc; -pub mod log; pub mod proxy; pub mod trigger; diff --git a/src-tauri/src/crdt/proxy.rs b/src-tauri/src/crdt/proxy.rs index 1bab7be..25a045a 100644 --- a/src-tauri/src/crdt/proxy.rs +++ b/src-tauri/src/crdt/proxy.rs @@ -1,15 +1,47 @@ -// In src-tauri/src/sql_proxy.rs +// In src-tauri/src/crdt/proxy.rs -use rusqlite::Connection; -use sqlparser::ast::{ColumnDef, DataType, Expr, Ident, Query, Statement, TableWithJoins, Value}; +use crate::crdt::hlc::HlcService; +use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN}; +use serde::{Deserialize, Serialize}; +use sqlparser::ast::{ + Assignment, AssignmentTarget, BinaryOperator, ColumnDef, DataType, Expr, Ident, Insert, + ObjectName, ObjectNamePart, SelectItem, SetExpr, Statement, TableFactor, TableObject, + TableWithJoins, UpdateTableFromKind, Value, ValueWithSpan, +}; use sqlparser::dialect::SQLiteDialect; use sqlparser::parser::Parser; -use sqlparser::visit_mut::{self, VisitorMut}; -use std::ops::ControlFlow; +use std::collections::HashSet; +use ts_rs::TS; +use uhlc::Timestamp; -// Der Name der Tombstone-Spalte als Konstante, um "Magic Strings" zu vermeiden. -pub const TOMBSTONE_COLUMN_NAME: &str = "haex_tombstone"; -const EXCLUDED_TABLES: &[&str] = &["haex_crdt_log"]; +#[derive(Serialize, Deserialize, TS)] +#[ts(export)] +#[serde(tag = "type", content = "details")] +pub enum ProxyError { + /// Der SQL-Code konnte nicht geparst werden. + ParseError { + reason: String, + }, + /// Ein Fehler ist während der Ausführung in der Datenbank aufgetreten. + ExecutionError { + sql: String, + reason: String, + }, + /// Ein Fehler ist beim Verwalten der Transaktion aufgetreten. + TransactionError { + reason: String, + }, + /// Ein SQL-Statement wird vom Proxy nicht unterstützt (z.B. DELETE von einer Subquery). + UnsupportedStatement { + description: String, + }, + HlcError { + reason: String, + }, +} + +// Tabellen, die von der Proxy-Logik ausgeschlossen sind. +const EXCLUDED_TABLES: &[&str] = &["haex_crdt_settings", "haex_crdt_logs"]; pub struct SqlProxy; @@ -18,145 +50,369 @@ impl SqlProxy { Self {} } - // Die zentrale Ausführungsfunktion - pub fn execute(&self, sql: &str, conn: &Connection) -> Result<(), String> { - // 1. Parsen des SQL-Strings in einen oder mehrere ASTs. - // Ein String kann mehrere, durch Semikolon getrennte Anweisungen enthalten. + /// Führt SQL-Anweisungen aus, nachdem sie für CRDT-Konformität transformiert wurden. + pub fn execute( + &self, + sql: &str, + conn: &mut rusqlite::Connection, + hlc_service: &HlcService, + ) -> Result, ProxyError> { let dialect = SQLiteDialect {}; - let mut ast_vec = - Parser::parse_sql(&dialect, sql).map_err(|e| format!("SQL-Parse-Fehler: {}", e))?; + let mut ast_vec = Parser::parse_sql(&dialect, sql).map_err(|e| ProxyError::ParseError { + reason: e.to_string(), + })?; + + let mut modified_schema_tables = HashSet::new(); + + let tx = conn + .transaction() + .map_err(|e| ProxyError::TransactionError { + reason: e.to_string(), + })?; + + let hlc_timestamp = + hlc_service + .new_timestamp_and_persist(&tx) + .map_err(|e| ProxyError::HlcError { + reason: e.to_string(), + })?; - // 2. Wir durchlaufen und transformieren jedes einzelne Statement im AST-Vektor. for statement in &mut ast_vec { - self.transform_statement(statement)?; - } - - // 3. Ausführen der (möglicherweise modifizierten) Anweisungen in einer einzigen Transaktion. - // Dies stellt sicher, dass alle Operationen atomar sind. - let tx = conn.transaction().map_err(|e| e.to_string())?; - for statement in ast_vec { - let final_sql = statement.to_string(); - tx.execute(&final_sql) - .map_err(|e| format!("DB-Ausführungsfehler bei '{}': {}", final_sql, e))?; - - // Wenn es ein CREATE/ALTER TABLE war, müssen die Trigger neu erstellt werden. - // Dies geschieht innerhalb derselben Transaktion. - if let Statement::CreateTable { name, .. } | Statement::AlterTable { name, .. } = - statement - { - let table_name = name.0.last().unwrap().value.clone(); - let trigger_manager = crate::trigger_manager::TriggerManager::new(&tx); - trigger_manager - .setup_triggers_for_table(&table_name) - .map_err(|e| { - format!("Trigger-Setup-Fehler für Tabelle '{}': {}", table_name, e) - })?; + if let Some(table_name) = self.transform_statement(statement, Some(&hlc_timestamp))? { + modified_schema_tables.insert(table_name); } } - tx.commit().map_err(|e| e.to_string())?; - Ok(()) + for statement in ast_vec { + let final_sql = statement.to_string(); + tx.execute(&final_sql, []) + .map_err(|e| ProxyError::ExecutionError { + sql: final_sql, + reason: e.to_string(), + })?; + } + tx.commit().map_err(|e| ProxyError::TransactionError { + reason: e.to_string(), + })?; + + Ok(modified_schema_tables.into_iter().collect()) } - // Diese Methode wendet die Transformation auf ein einzelnes Statement an. - fn transform_statement(&self, statement: &mut Statement) -> Result<(), String> { - let mut visitor = TombstoneVisitor; - // `visit` durchläuft den AST und ruft die entsprechenden `visit_*_mut` Methoden auf. - statement.visit(&mut visitor); - Ok(()) - } -} - -struct TombstoneVisitor; - -impl TombstoneVisitor { - fn is_audited_table(&self, table_name: &str) -> bool { - !EXCLUDED_TABLES.contains(&table_name.to_lowercase().as_str()) - } -} - -impl VisitorMut for TombstoneVisitor { - type Break = (); - - // Diese Methode wird für jedes Statement im AST aufgerufen - fn visit_statement_mut(&mut self, stmt: &mut Statement) -> ControlFlow { + /// Wendet die Transformation auf ein einzelnes Statement an. + fn transform_statement( + &self, + stmt: &mut Statement, + hlc_timestamp: Option<&Timestamp>, + ) -> Result, ProxyError> { match stmt { - // Fall 1: CREATE TABLE - Statement::CreateTable { name, columns, .. } => { - let table_name = name.0.last().unwrap().value.as_str(); - if self.is_audited_table(table_name) { - // Füge die 'tombstone'-Spalte hinzu, wenn sie nicht existiert - if !columns - .iter() - .any(|c| c.name.value.to_lowercase() == TOMBSTONE_COLUMN_NAME) - { - columns.push(ColumnDef { - name: Ident::new(TOMBSTONE_COLUMN_NAME), - data_type: DataType::Integer, - collation: None, - options: vec![], // Default ist 0 - }); + sqlparser::ast::Statement::Query(query) => { + if let SetExpr::Select(select) = &mut *query.body { + let mut tombstone_filters = Vec::new(); + for twj in &select.from { + if let TableFactor::Table { name, alias, .. } = &twj.relation { + if self.is_audited_table(name) { + let table_idents = if let Some(a) = alias { + vec![a.name.clone()] + } else { + name.0 + .iter() + .filter_map(|part| match part { + ObjectNamePart::Identifier(id) => Some(id.clone()), + _ => None, + }) + .collect::>() + }; + let column_ident = Ident::new(TOMBSTONE_COLUMN); + let full_ident = [table_idents, vec![column_ident]].concat(); + let filter = Expr::BinaryOp { + left: Box::new(Expr::CompoundIdentifier(full_ident)), + op: BinaryOperator::Eq, + right: Box::new(Expr::Value( + sqlparser::ast::Value::Number("1".to_string(), false) + .into(), + )), + }; + tombstone_filters.push(filter); + } + } + } + if !tombstone_filters.is_empty() { + let combined_filter = tombstone_filters + .into_iter() + .reduce(|acc, expr| Expr::BinaryOp { + left: Box::new(acc), + op: BinaryOperator::And, + right: Box::new(expr), + }) + .unwrap(); + match &mut select.selection { + Some(existing) => { + *existing = Expr::BinaryOp { + left: Box::new(existing.clone()), + op: BinaryOperator::And, + right: Box::new(combined_filter), + }; + } + None => { + select.selection = Some(combined_filter); + } + } + } + } + + // Hinweis: UNION, EXCEPT etc. werden hier nicht behandelt, was dem bisherigen Code entspricht. + } + Statement::CreateTable(create_table) => { + if self.is_audited_table(&create_table.name) { + self.add_crdt_columns(&mut create_table.columns); + return Ok(Some( + create_table + .name + .to_string() + .trim_matches('`') + .trim_matches('"') + .to_string(), + )); + } + } + Statement::Insert(insert_stmt) => { + if let TableObject::TableName(name) = &insert_stmt.table { + if self.is_audited_table(name) { + if let Some(ts) = hlc_timestamp { + self.add_hlc_to_insert(insert_stmt, ts); + } } } } - - // Fall 2: DELETE - Statement::Delete(del_stmt) => { - // Wandle das DELETE-Statement in ein UPDATE-Statement um - let new_update = Statement::Update { - table: del_stmt.from.clone(), - assignments: vec![], - value: Box::new(Expr::Value(Value::Number("1".to_string(), false))), - from: None, - selection: del_stmt.selection.clone(), - returning: None, + /* Statement::Update(update_stmt) => { + if let TableFactor::Table { name, .. } = &update_stmt.table.relation { + if self.is_audited_table(&name) { + if let Some(ts) = hlc_timestamp { + update_stmt.assignments.push(self.create_hlc_assignment(ts)); + } + } + } + } */ + Statement::Update { + table, + assignments: assignments, + from, + selection, + returning, + or, + } => { + if let TableFactor::Table { name, .. } = &table.relation { + if self.is_audited_table(&name) { + if let Some(ts) = hlc_timestamp { + assignments.push(self.create_hlc_assignment(ts)); + } + } + } + *stmt = Statement::Update { + table: table.clone(), + assignments: assignments.clone(), + from: from.clone(), + selection: selection.clone(), + returning: returning.clone(), + or: *or, }; - // Ersetze das aktuelle Statement im AST - *stmt = new_update; + } + Statement::Delete(del_stmt) => { + let table_name = self.extract_table_name_from_from(&del_stmt.from); + if let Some(name) = table_name { + if self.is_audited_table(&name) { + // GEÄNDERT: Übergibt den Zeitstempel an die Transformationsfunktion + if let Some(ts) = hlc_timestamp { + self.transform_delete_to_update(stmt, ts); + } + } + } else { + return Err(ProxyError::UnsupportedStatement { + description: "DELETE from non-table source or multiple tables".to_string(), + }); + } + } + Statement::AlterTable { name, .. } => { + if self.is_audited_table(name) { + return Ok(Some( + name.to_string() + .trim_matches('`') + .trim_matches('"') + .to_string(), + )); + } } _ => {} } - - // Setze die Traversierung für untergeordnete Knoten fort (z.B. SELECTs) - visit_mut::walk_statement_mut(self, stmt) + Ok(None) } - // Diese Methode wird für jede Query (auch Subqueries) aufgerufen - fn visit_query_mut(&mut self, query: &mut Query) -> ControlFlow { - // Zuerst rekursiv in die Tiefe gehen, um innere Queries zuerst zu bearbeiten - visit_mut::walk_query_mut(self, query); + /// Fügt die Tombstone-Spalte zu einer Liste von Spaltendefinitionen hinzu. + fn add_tombstone_column(&self, columns: &mut Vec) { + if !columns + .iter() + .any(|c| c.name.value.to_lowercase() == TOMBSTONE_COLUMN) + { + columns.push(ColumnDef { + name: Ident::new(TOMBSTONE_COLUMN), + data_type: DataType::Integer(None), + options: vec![], + }); + } + } - // Dann die WHERE-Klausel der aktuellen Query anpassen - if let Some(from_clause) = query.body.as_select_mut().map(|s| &mut s.from) { - // (Hier würde eine komplexere Logik zur Analyse der Joins und Tabellen stehen) - // Vereinfacht nehmen wir an, wir fügen es für die erste Tabelle hinzu. - let table_name = if let Some(relation) = from_clause.get_mut(0) { - // Diese Logik muss verfeinert werden, um Aliase etc. zu behandeln - relation.relation.to_string() + /// Prüft, ob eine Tabelle von der Proxy-Logik betroffen sein soll. + fn is_audited_table(&self, name: &ObjectName) -> bool { + let table_name = name.to_string().to_lowercase(); + let table_name = table_name.trim_matches('`').trim_matches('"'); + !EXCLUDED_TABLES.contains(&table_name) + } + + fn extract_table_name_from_from(&self, from: &sqlparser::ast::FromTable) -> Option { + let tables = match from { + sqlparser::ast::FromTable::WithFromKeyword(from) + | sqlparser::ast::FromTable::WithoutKeyword(from) => from, + }; + if tables.len() == 1 { + if let TableFactor::Table { name, .. } = &tables[0].relation { + Some(name.clone()) } else { - "".to_string() - }; + None + } + } else { + None + } + } - if self.is_audited_table(&table_name) { - let tombstone_check = Expr::BinaryOp { - left: Box::new(Expr::Identifier(Ident::new(TOMBSTONE_COLUMN_NAME))), - op: sqlparser::ast::BinaryOperator::Eq, - right: Box::new(Expr::Value(Value::Number("0".to_string(), false))), - }; + fn extract_table_name(&self, from: &[TableWithJoins]) -> Option { + if from.len() == 1 { + if let TableFactor::Table { name, .. } = &from[0].relation { + Some(name.clone()) + } else { + None + } + } else { + None + } + } - let existing_selection = query.selection.take(); - let new_selection = match existing_selection { - Some(expr) => Expr::BinaryOp { - left: Box::new(expr), - op: sqlparser::ast::BinaryOperator::And, - right: Box::new(tombstone_check), - }, - None => tombstone_check, - }; - query.selection = Some(Box::new(new_selection)); + fn create_tombstone_assignment(&self) -> Assignment { + Assignment { + target: AssignmentTarget::ColumnName(ObjectName(vec![ObjectNamePart::Identifier( + Ident::new(TOMBSTONE_COLUMN), + )])), + value: Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()), + } + } + + fn add_tombstone_filter(&self, selection: &mut Option) { + let tombstone_expr = Expr::BinaryOp { + left: Box::new(Expr::Identifier(Ident::new(TOMBSTONE_COLUMN))), + op: BinaryOperator::Eq, + // HIER IST DIE FINALE KORREKTUR: + right: Box::new(Expr::Value(Value::Number("0".to_string(), false).into())), + }; + + match selection { + Some(existing) => { + // Kombiniere mit AND, wenn eine WHERE-Klausel existiert + *selection = Some(Expr::BinaryOp { + left: Box::new(existing.clone()), + op: BinaryOperator::And, + right: Box::new(tombstone_expr), + }); + } + None => { + // Setze neue WHERE-Klausel, wenn keine existiert + *selection = Some(tombstone_expr); } } + } - ControlFlow::Continue(()) + fn add_crdt_columns(&self, columns: &mut Vec) { + if !columns.iter().any(|c| c.name.value == TOMBSTONE_COLUMN) { + columns.push(ColumnDef { + name: Ident::new(TOMBSTONE_COLUMN), + data_type: DataType::Integer(None), + options: vec![], + }); + } + if !columns.iter().any(|c| c.name.value == HLC_TIMESTAMP_COLUMN) { + columns.push(ColumnDef { + name: Ident::new(HLC_TIMESTAMP_COLUMN), + data_type: DataType::Text, // HLC wird als String gespeichert + options: vec![], + }); + } + } + + fn transform_delete_to_update(&self, stmt: &mut Statement, hlc_timestamp: &Timestamp) { + if let Statement::Delete(del_stmt) = stmt { + let table_to_update = match &del_stmt.from { + sqlparser::ast::FromTable::WithFromKeyword(from) + | sqlparser::ast::FromTable::WithoutKeyword(from) => from[0].clone(), + }; + + // Erstellt beide Zuweisungen + let assignments = vec![ + self.create_tombstone_assignment(), + self.create_hlc_assignment(hlc_timestamp), + ]; + + *stmt = Statement::Update { + table: table_to_update, + assignments, + from: None, + selection: del_stmt.selection.clone(), + returning: None, + or: None, + }; + } + } + + fn add_hlc_to_insert( + &self, + insert_stmt: &mut sqlparser::ast::Insert, + ts: &Timestamp, + ) -> Result<(), ProxyError> { + insert_stmt.columns.push(Ident::new(HLC_TIMESTAMP_COLUMN)); + + match insert_stmt.source.as_mut() { + Some(query) => match &mut *query.body { + // Dereferenziere die Box mit * + SetExpr::Values(values) => { + for row in &mut values.rows { + row.push(Expr::Value( + Value::SingleQuotedString(ts.to_string()).into(), + )); + } + } + SetExpr::Select(select) => { + let hlc_expr = Expr::Value(Value::SingleQuotedString(ts.to_string()).into()); + select.projection.push(SelectItem::UnnamedExpr(hlc_expr)); + } + _ => { + return Err(ProxyError::UnsupportedStatement { + description: "INSERT with unsupported source".to_string(), + }); + } + }, + None => { + return Err(ProxyError::UnsupportedStatement { + description: "INSERT statement has no source".to_string(), + }); + } + } + Ok(()) + } + /// Erstellt eine Zuweisung `haex_modified_hlc = '...'` + // NEU: Hilfsfunktion + fn create_hlc_assignment(&self, ts: &Timestamp) -> Assignment { + Assignment { + target: AssignmentTarget::ColumnName(ObjectName(vec![ObjectNamePart::Identifier( + Ident::new(HLC_TIMESTAMP_COLUMN), + )])), + value: Expr::Value(Value::SingleQuotedString(ts.to_string()).into()), + } } } diff --git a/src-tauri/src/crdt/trigger.rs b/src-tauri/src/crdt/trigger.rs index 531ff76..e5cf842 100644 --- a/src-tauri/src/crdt/trigger.rs +++ b/src-tauri/src/crdt/trigger.rs @@ -1,121 +1,169 @@ -// In src-tauri/src/trigger_manager.rs -> impl<'a> TriggerManager<'a> +use crate::crdt::hlc; +use rusqlite::{Connection, Result, Row}; +use serde::Serialize; +use std::fmt::Write; +use ts_rs::TS; -// In einem neuen Modul, z.B. src-tauri/src/trigger_manager.rs -use crate::crdt::proxy::ColumnInfo; -use rusqlite::{params, Connection, Result, Transaction}; -use std::sync::{Arc, Mutex}; -use tauri::AppHandle; +// the z_ prefix should make sure that these triggers are executed lasts +const INSERT_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_insert"; +const UPDATE_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_update"; -pub struct TriggerManager<'a> { - tx: &'a Transaction<'a>, +pub const LOG_TABLE_NAME: &str = "haex_crdt_logs"; +pub const TOMBSTONE_COLUMN: &str = "haex_tombstone"; +pub const HLC_TIMESTAMP_COLUMN: &str = "haex_hlc_timestamp"; +#[derive(Debug, Serialize, TS)] +#[ts(export)] +#[serde(tag = "status", content = "details")] +pub enum TriggerSetupResult { + Success, + TableNotFound, + TombstoneColumnMissing { column_name: String }, + PrimaryKeyMissing, } -impl<'a> TriggerManager<'a> { - pub fn new(tx: &'a Transaction<'a>) -> Self { - Self { tx } +struct ColumnInfo { + name: String, + is_pk: bool, +} + +impl ColumnInfo { + fn from_row(row: &Row) -> Result { + Ok(ColumnInfo { + name: row.get("name")?, + is_pk: row.get::<_, i64>("pk")? > 0, + }) + } +} + +pub struct TriggerManager; + +impl TriggerManager { + pub fn new() -> Self { + TriggerManager {} } - // Die Hauptfunktion, die alles einrichtet - pub fn setup_triggers_for_table(&self, table_name: &str) -> Result<()> { - let columns = self.get_table_schema(table_name)?; - let pk_cols: Vec<_> = columns + pub fn setup_triggers_for_table( + &self, + conn: &mut Connection, + table_name: &str, + ) -> Result { + let columns = self.get_table_schema(conn, table_name)?; + + if columns.is_empty() { + return Ok(TriggerSetupResult::TableNotFound); + } + + if !columns.iter().any(|c| c.name == TOMBSTONE_COLUMN) { + return Ok(TriggerSetupResult::TombstoneColumnMissing { + column_name: TOMBSTONE_COLUMN.to_string(), + }); + } + + let pks: Vec = columns .iter() .filter(|c| c.is_pk) - .map(|c| c.name.as_str()) + .map(|c| c.name.clone()) .collect(); - let other_cols: Vec<_> = columns - .iter() - .filter(|c| !c.is_pk && c.name != "tombstone") - .map(|c| c.name.as_str()) - .collect(); - - let drop_sql = self.generate_drop_triggers_sql(table_name); - let insert_sql = self.generate_insert_trigger_sql(table_name, &pk_cols, &other_cols); - let update_sql = self.generate_update_trigger_sql(table_name, &pk_cols, &other_cols); - - self.tx.execute_batch(&drop_sql)?; - self.tx.execute_batch(&insert_sql)?; - self.tx.execute_batch(&update_sql)?; - - Ok(()) - } - - fn get_table_schema(&self, table_name: &str) -> Result> { - let sql = format!("PRAGMA table_info('{}')", table_name); - let mut stmt = self.tx.prepare(&sql)?; - let rows = stmt.query_map(|row| { - let pk_val: i64 = row.get(5)?; - Ok(ColumnInfo { - name: row.get(1)?, - is_pk: pk_val > 0, - }) - })?; - - let mut columns = Vec::new(); - for row_result in rows { - columns.push(row_result?); + if pks.is_empty() { + return Ok(TriggerSetupResult::PrimaryKeyMissing); } - Ok(columns) + + let cols_to_track: Vec = columns + .iter() + .filter(|c| !c.is_pk && c.name != TOMBSTONE_COLUMN) + .map(|c| c.name.clone()) + .collect(); + + let insert_trigger_sql = self.generate_insert_trigger_sql(table_name, &pks, &cols_to_track); + let update_trigger_sql = self.generate_update_trigger_sql(table_name, &pks, &cols_to_track); + + let tx = conn.transaction()?; + tx.execute_batch(&format!("{}\n{}", insert_trigger_sql, update_trigger_sql))?; + tx.commit()?; + + Ok(TriggerSetupResult::Success) } - //... Implementierung der SQL-Generierungsfunktionen... + fn get_table_schema(&self, conn: &Connection, table_name: &str) -> Result> { + let sql = format!("PRAGMA table_info('{}');", table_name); + let mut stmt = conn.prepare(&sql)?; + let rows = stmt.query_map([], ColumnInfo::from_row)?; + rows.collect() + } - fn generate_update_trigger_sql(&self, table_name: &str, pks: &[&str], cols: &[&str]) -> String { - // Erstellt dynamisch die Key-Value-Paare für das JSON-Objekt des Primärschlüssels. - let pk_json_payload_new = pks + fn generate_insert_trigger_sql( + &self, + table_name: &str, + pks: &[String], + cols: &[String], + ) -> String { + let pk_json_payload = pks .iter() .map(|pk| format!("'{}', NEW.\"{}\"", pk, pk)) .collect::>() .join(", "); - let pk_json_payload_old = pks + let column_inserts = cols.iter().fold(String::new(), |mut acc, col| { + writeln!(&mut acc, "INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk, column_name, value) VALUES (NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"));", + log_table = LOG_TABLE_NAME, + hlc_col = HLC_TIMESTAMP_COLUMN, + table = table_name, + pk_payload = pk_json_payload, + column = col + ).unwrap(); + acc + }); + + // Verwende die neue Konstante für den Trigger-Namen + let trigger_name = INSERT_TRIGGER_TPL.replace("{TABLE_NAME}", table_name); + + format!( + "CREATE TRIGGER IF NOT EXISTS {trigger_name} + AFTER INSERT ON \"{table_name}\" + FOR EACH ROW + BEGIN + {column_inserts} + END;" + ) + } + + fn generate_update_trigger_sql( + &self, + table_name: &str, + pks: &[String], + cols: &[String], + ) -> String { + let pk_json_payload = pks .iter() - .map(|pk| format!("'{}', OLD.\"{}\"", pk, pk)) + .map(|pk| format!("'{}', NEW.\"{}\"", pk, pk)) .collect::>() .join(", "); - // Erstellt die einzelnen INSERT-Anweisungen für jede Spalte - let column_updates = cols.iter().map(|col| format!( - r#" - -- Protokolliere die Spaltenänderung, wenn sie stattgefunden hat und es kein Soft-Delete ist - INSERT INTO crdt_log (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value) - SELECT - 'placeholder_hlc', -- TODO: HLC-Funktion hier aufrufen - 'UPDATE', - '{table}', - json_object({pk_payload_new}), - '{column}', - json_object('value', NEW."{column}"), - json_object('value', OLD."{column}") - WHERE - NEW."{column}" IS NOT OLD."{column}" - "#, - table = table_name, - pk_payload_new = pk_json_payload_new, - column = col - )).collect::>().join("\n"); + let column_updates = cols.iter().fold(String::new(), |mut acc, col| { + writeln!(&mut acc, "IF NEW.\"{column}\" IS NOT OLD.\"{column}\" THEN INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value) VALUES (NEW.\"{hlc_col}\", 'UPDATE', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"), json_object('value', OLD.\"{column}\")); END IF;", + log_table = LOG_TABLE_NAME, + hlc_col = HLC_TIMESTAMP_COLUMN, + table = table_name, + pk_payload = pk_json_payload, + column = col).unwrap(); + acc + }); - // Erstellt die Logik für den Soft-Delete let soft_delete_logic = format!( - r#" - -- Protokolliere den Soft-Delete - INSERT INTO crdt_log (hlc_timestamp, op_type, table_name, row_pk) - SELECT - 'placeholder_hlc', -- TODO: HLC-Funktion hier aufrufen - 'DELETE', - '{table}', - json_object({pk_payload_old}) - WHERE - OLD.{tombstone_col} = 0 - "#, + "IF NEW.{tombstone_col} = 1 AND OLD.{tombstone_col} = 0 THEN INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk) VALUES (NEW.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload})); END IF;", + log_table = LOG_TABLE_NAME, + hlc_col = HLC_TIMESTAMP_COLUMN, + tombstone_col = TOMBSTONE_COLUMN, table = table_name, - pk_payload_old = pk_json_payload_old + pk_payload = pk_json_payload ); - // Kombiniert alles zu einem einzigen Trigger + // Verwende die neue Konstante für den Trigger-Namen + let trigger_name = UPDATE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name); + format!( - "CREATE TRIGGER IF NOT EXISTS {table_name}_crdt_update - AFTER UPDATE ON {table_name} + "CREATE TRIGGER IF NOT EXISTS {trigger_name} + AFTER UPDATE ON \"{table_name}\" FOR EACH ROW BEGIN {column_updates}