fixed crdt

This commit is contained in:
2025-08-28 12:16:22 +02:00
parent 0c304d7900
commit 91db0475cd
11 changed files with 703 additions and 305 deletions

View File

@ -50,6 +50,7 @@
"zod": "^3.25.67"
},
"devDependencies": {
"@iconify-json/proicons": "^1.2.17",
"@iconify/json": "^2.2.351",
"@iconify/tailwind4": "^1.0.6",
"@tauri-apps/cli": "^2.5.0",

10
pnpm-lock.yaml generated
View File

@ -105,6 +105,9 @@ importers:
specifier: ^3.25.67
version: 3.25.67
devDependencies:
'@iconify-json/proicons':
specifier: ^1.2.17
version: 1.2.17
'@iconify/json':
specifier: ^2.2.351
version: 2.2.355
@ -687,6 +690,9 @@ packages:
resolution: {integrity: sha512-bV0Tgo9K4hfPCek+aMAn81RppFKv2ySDQeMoSZuvTASywNTnVJCArCZE2FWqpvIatKu7VMRLWlR1EazvVhDyhQ==}
engines: {node: '>=18.18'}
'@iconify-json/proicons@1.2.17':
resolution: {integrity: sha512-FYnIsbhj91Epr62+QuyVXkljizcqWBHWrK3KGMdKwYxan3PxDZQiOOfKQepLR02Y9PJ4/4N8N22P5F6XS8eCFw==}
'@iconify/collections@1.0.560':
resolution: {integrity: sha512-sXm0Io67deUIqWmaSGb+JJ4Y9DJvgT/seaQlCOHUuHhs3qcPuLY6WMlfoUSmmXV5yHFhZprRZEs1np2BZ/BCJw==}
@ -5863,6 +5869,10 @@ snapshots:
'@humanwhocodes/retry@0.4.3': {}
'@iconify-json/proicons@1.2.17':
dependencies:
'@iconify/types': 2.0.0
'@iconify/collections@1.0.560':
dependencies:
'@iconify/types': 2.0.0

33
src-tauri/Cargo.lock generated
View File

@ -1563,7 +1563,9 @@ dependencies = [
"tauri-plugin-os",
"tauri-plugin-persisted-scope",
"tauri-plugin-store",
"thiserror 2.0.12",
"tokio",
"ts-rs",
"uhlc",
"uuid",
]
@ -4582,6 +4584,15 @@ dependencies = [
"utf-8",
]
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.69"
@ -4874,6 +4885,28 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "ts-rs"
version = "11.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ef1b7a6d914a34127ed8e1fa927eb7088903787bcded4fa3eef8f85ee1568be"
dependencies = [
"thiserror 2.0.12",
"ts-rs-macros",
]
[[package]]
name = "ts-rs-macros"
version = "11.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9d4ed7b4c18cc150a6a0a1e9ea1ecfa688791220781af6e119f9599a8502a0a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
"termcolor",
]
[[package]]
name = "typeid"
version = "1.0.3"

View File

@ -44,8 +44,9 @@ tauri-plugin-store = "2.3"
tauri-plugin-http = "2.5"
tauri-plugin-notification = "2.3"
tauri-plugin-persisted-scope = "2.0.0"
tauri-plugin-android-fs = "9.5.0"
tauri-plugin-android-fs = "9.5.0"
uuid = { version = "1.17.0", features = ["v4"] }
ts-rs = "11.0.1"
thiserror = "2.0.12"
#tauri-plugin-sql = { version = "2", features = ["sqlite"] }

View File

@ -1,6 +1,6 @@
import { blob, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core'
export const haexCrdtMessages = sqliteTable('haex_crdt_messages', {
export const haexCrdtLogs = sqliteTable('haex_crdt_logs', {
hlc_timestamp: text().primaryKey(),
table_name: text(),
row_pks: text({ mode: 'json' }),
@ -9,8 +9,8 @@ export const haexCrdtMessages = sqliteTable('haex_crdt_messages', {
new_value: text({ mode: 'json' }),
old_value: text({ mode: 'json' }),
})
export type InsertHaexCrdtMessages = typeof haexCrdtMessages.$inferInsert
export type SelectHaexCrdtMessages = typeof haexCrdtMessages.$inferSelect
export type InsertHaexCrdtLogs = typeof haexCrdtLogs.$inferInsert
export type SelectHaexCrdtLogs = typeof haexCrdtLogs.$inferSelect
export const haexCrdtSnapshots = sqliteTable('haex_crdt_snapshots', {
snapshot_id: text().primaryKey(),
@ -21,7 +21,6 @@ export const haexCrdtSnapshots = sqliteTable('haex_crdt_snapshots', {
})
export const haexCrdtSettings = sqliteTable('haex_crdt_settings', {
id: text().primaryKey(),
type: text({ enum: ['hlc_timestamp'] }).unique(),
type: text().primaryKey(),
value: text(),
})

View File

@ -13,6 +13,7 @@ export const haexSettings = sqliteTable('haex_settings', {
key: text(),
type: text(),
value: text(),
haex_tombstone: integer({ mode: 'boolean' }),
})
export type InsertHaexSettings = typeof haexSettings.$inferInsert
export type SelectHaexSettings = typeof haexSettings.$inferSelect
@ -25,6 +26,7 @@ export const haexExtensions = sqliteTable('haex_extensions', {
name: text(),
url: text(),
version: text(),
haex_tombstone: integer({ mode: 'boolean' }),
})
export type InsertHaexExtensions = typeof haexExtensions.$inferInsert
export type SelectHaexExtensions = typeof haexExtensions.$inferSelect
@ -43,6 +45,7 @@ export const haexExtensionsPermissions = sqliteTable(
updateAt: integer('updated_at', { mode: 'timestamp' }).$onUpdate(
() => new Date(),
),
haex_tombstone: integer({ mode: 'boolean' }),
},
(table) => [
unique().on(table.extensionId, table.resource, table.operation, table.path),
@ -66,6 +69,7 @@ export const haexNotifications = sqliteTable('haex_notifications', {
type: text({
enum: ['error', 'success', 'warning', 'info', 'log'],
}).notNull(),
haex_tombstone: integer({ mode: 'boolean' }),
})
export type InsertHaexNotifications = typeof haexNotifications.$inferInsert
export type SelectHaexNotifications = typeof haexNotifications.$inferSelect
@ -85,6 +89,7 @@ export const haexPasswordsItemDetails = sqliteTable(
updateAt: integer('updated_at', { mode: 'timestamp' }).$onUpdate(
() => new Date(),
),
haex_tombstone: integer({ mode: 'boolean' }),
},
)
export type InsertHaexPasswordsItemDetails =
@ -104,6 +109,7 @@ export const haexPasswordsItemKeyValues = sqliteTable(
updateAt: integer('updated_at', { mode: 'timestamp' }).$onUpdate(
() => new Date(),
),
haex_tombstone: integer({ mode: 'boolean' }),
},
)
export type InserthaexPasswordsItemKeyValues =
@ -123,6 +129,7 @@ export const haexPasswordsItemHistory = sqliteTable(
oldValue: text('old_value'),
newValue: text('new_value'),
createdAt: text('created_at').default(sql`(CURRENT_TIMESTAMP)`),
haex_tombstone: integer({ mode: 'boolean' }),
},
)
export type InserthaexPasswordsItemHistory =
@ -144,6 +151,7 @@ export const haexPasswordsGroups = sqliteTable('haex_passwords_groups', {
updateAt: integer('updated_at', { mode: 'timestamp' }).$onUpdate(
() => new Date(),
),
haex_tombstone: integer({ mode: 'boolean' }),
})
export type InsertHaexPasswordsGroups = typeof haexPasswordsGroups.$inferInsert
export type SelectHaexPasswordsGroups = typeof haexPasswordsGroups.$inferSelect
@ -157,6 +165,7 @@ export const haexPasswordsGroupItems = sqliteTable(
itemId: text('item_id').references(
(): AnySQLiteColumn => haexPasswordsItemDetails.id,
),
haex_tombstone: integer({ mode: 'boolean' }),
},
(table) => [primaryKey({ columns: [table.itemId, table.groupId] })],
)

View File

@ -1,67 +1,131 @@
use rusqlite::{params, Connection, Result};
use std::sync::{Arc, Mutex};
use uhlc::{Timestamp, HLC};
// src/hlc_service.rs
use rusqlite::{params, Connection, Result as RusqliteResult, Transaction};
use std::{
fmt::Debug,
str::FromStr,
sync::{Arc, Mutex},
time::Duration,
};
use thiserror::Error;
use uhlc::{HLCBuilder, Timestamp, HLC, ID};
use uuid::Uuid;
const HLC_SETTING_TYPE: &str = "hlc_timestamp";
const HLC_NODE_ID_TYPE: &str = "hlc_node_id";
const HLC_TIMESTAMP_TYPE: &str = "hlc_timestamp";
pub const GET_HLC_FUNCTION: &str = "get_hlc_timestamp";
pub const CRDT_SETTINGS_TABLE: &str = "haex_crdt_settings";
pub struct HlcService(pub Arc<Mutex<HLC>>);
pub fn setup_hlc(conn: &mut Connection) -> Result<()> {
// 1. Lade den letzten HLC-Zustand oder erstelle einen neuen.
let hlc = conn
.query_row(
"SELECT value FROM {CRDT_SETTINGS_TABLE} meta WHERE type = ?1",
params![HLC_SETTING_TYPE],
|row| {
let state_str: String = row.get(0)?;
let timestamp = Timestamp::from_str(&state_str)
.map_err(|_| rusqlite::Error::ExecuteReturnedResults)?; // Konvertiere den Fehler
Ok(HLC::new(timestamp))
},
)
.unwrap_or_else(|_| HLC::default()); // Bei Fehler (z.B. nicht gefunden) -> neuen HLC erstellen.
let hlc_arc = Arc::new(Mutex::new(hlc));
// 2. Erstelle eine Klon für die SQL-Funktion und speichere den Zustand bei jeder neuen Timestamp-Generierung.
let hlc_clone = hlc_arc.clone();
let db_conn_arc = Arc::new(Mutex::new(conn.try_clone()?));
conn.create_scalar_function(
GET_HLC_FUNCTION,
0,
rusqlite::functions::FunctionFlags::SQLITE_UTF8
| rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC,
move |_| {
let mut hlc = hlc_clone.lock().unwrap();
let new_timestamp = hlc.new_timestamp();
let timestamp_str = new_timestamp.to_string();
// 3. Speichere den neuen Zustand sofort zurück in die DB.
// UPSERT-Logik: Ersetze den Wert, falls der Schlüssel existiert, sonst füge ihn ein.
let db_conn = db_conn_arc.lock().unwrap();
db_conn
.execute(
"INSERT INTO {CRDT_SETTINGS_TABLE} (id, type, value) VALUES (?1, ?2, ?3)
ON CONFLICT(type) DO UPDATE SET value = excluded.value",
params![
Uuid::new_v4().to_string(), // Generiere eine neue ID für den Fall eines INSERTs
HLC_SETTING_TYPE,
&timestamp_str
],
)
.expect("HLC state could not be persisted."); // In Prod sollte hier ein besseres Error-Handling hin.
Ok(timestamp_str)
},
)?;
// Hinweis: Den HLC-Service im Tauri-State zu managen ist nicht mehr zwingend,
// da die SQL-Funktion nun alles Notwendige über geklonte Arcs erhält.
// Falls du ihn dennoch für andere Commands brauchst, kannst du ihn im State speichern.
Ok(())
#[derive(Error, Debug)]
pub enum HlcError {
#[error("Database error: {0}")]
Database(#[from] rusqlite::Error),
#[error("Failed to parse persisted HLC timestamp: {0}")]
ParseTimestamp(String),
#[error("Failed to parse persisted HLC state: {0}")]
Parse(String),
#[error("HLC mutex was poisoned")]
MutexPoisoned,
#[error("Failed to create node ID: {0}")]
CreateNodeId(#[from] uhlc::SizeError),
}
/// A thread-safe, persistent HLC service.
#[derive(Clone)]
pub struct HlcService(Arc<Mutex<HLC>>);
impl HlcService {
/// Creates a new HLC service, initializing it from the database or creating a new
/// persistent identity if one does not exist.
pub fn new(conn: &mut Connection) -> Result<Self, HlcError> {
// 1. Manage persistent node identity.
let node_id = Self::get_or_create_node_id(conn)?;
// 2. Create HLC instance with stable identity using the HLCBuilder.
let hlc = HLCBuilder::new()
.with_id(node_id)
.with_max_delta(Duration::from_secs(1)) // Example of custom configuration
.build();
// 3. Load the last persisted timestamp and update the clock.
let last_state_str: RusqliteResult<String> = conn.query_row(
&format!("SELECT value FROM {} WHERE type = ?1", CRDT_SETTINGS_TABLE),
params![HLC_TIMESTAMP_TYPE],
|row| row.get(0),
);
if let Ok(state_str) = last_state_str {
let timestamp =
Timestamp::from_str(&state_str).map_err(|e| HlcError::ParseTimestamp(e.cause))?;
// Update the clock with the persisted state.
// we might want to handle the error case where the clock drifts too far.
hlc.update_with_timestamp(&timestamp)
.map_err(|e| HlcError::Parse(e.to_string()))?;
}
let hlc_arc = Arc::new(Mutex::new(hlc));
Ok(HlcService(hlc_arc))
}
/// Generates a new timestamp and immediately persists the HLC's new state.
/// This method MUST be called within an existing database transaction (`tx`)
/// along with the actual data operation that this timestamp is for.
/// This design ensures atomicity: the data is saved with its timestamp,
/// and the clock state is updated, or none of it is.
pub fn new_timestamp_and_persist<'tx>(
&self,
tx: &Transaction<'tx>,
) -> Result<Timestamp, HlcError> {
let hlc = self.0.lock().map_err(|_| HlcError::MutexPoisoned)?;
let new_timestamp = hlc.new_timestamp();
let timestamp_str = new_timestamp.to_string();
tx.execute(
&format!(
"INSERT INTO {} (type, value) VALUES (?1,?2)
ON CONFLICT(type) DO UPDATE SET value = excluded.value",
CRDT_SETTINGS_TABLE
),
params![HLC_TIMESTAMP_TYPE, timestamp_str],
)?;
Ok(new_timestamp)
}
/// Retrieves or creates and persists a stable node ID for the HLC.
fn get_or_create_node_id(conn: &mut Connection) -> Result<ID, HlcError> {
let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
let query = format!("SELECT value FROM {} WHERE type =?1", CRDT_SETTINGS_TABLE);
match tx.query_row(&query, params![HLC_NODE_ID_TYPE], |row| {
row.get::<_, String>(0)
}) {
Ok(id_str) => {
// ID exists, parse and return it.
let id_bytes = hex::decode(id_str).map_err(|e| HlcError::Parse(e.to_string()))?;
let id = ID::try_from(id_bytes.as_slice())?;
tx.commit()?;
Ok(id)
}
Err(rusqlite::Error::QueryReturnedNoRows) => {
// No ID found, create, persist, and return a new one.
let new_id_bytes = Uuid::new_v4().as_bytes().to_vec();
let new_id = ID::try_from(new_id_bytes.as_slice())?;
let new_id_str = hex::encode(new_id.to_le_bytes());
tx.execute(
&format!(
"INSERT INTO {} (type, value) VALUES (?1, ?2)",
CRDT_SETTINGS_TABLE
),
params![HLC_NODE_ID_TYPE, new_id_str],
)?;
tx.commit()?;
Ok(new_id)
}
Err(e) => Err(HlcError::from(e)),
}
}
}

View File

@ -1,22 +0,0 @@
// src/entities/crdt_log.rs
use sea_orm::entity::prelude::*;
#[sea_orm(table_name = "crdt_log")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = true)]
pub id: i64,
pub hlc_timestamp: String,
pub op_type: String,
pub table_name: String,
pub row_pk: String, // Wird als JSON-String gespeichert
#[sea_orm(nullable)]
pub column_name: Option<String>,
#[sea_orm(nullable)]
pub value: Option<String>,
#[sea_orm(nullable)]
pub old_value: Option<String>,
}
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,4 +1,3 @@
pub mod hlc;
pub mod log;
pub mod proxy;
pub mod trigger;

View File

@ -1,15 +1,47 @@
// In src-tauri/src/sql_proxy.rs
// In src-tauri/src/crdt/proxy.rs
use rusqlite::Connection;
use sqlparser::ast::{ColumnDef, DataType, Expr, Ident, Query, Statement, TableWithJoins, Value};
use crate::crdt::hlc::HlcService;
use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN};
use serde::{Deserialize, Serialize};
use sqlparser::ast::{
Assignment, AssignmentTarget, BinaryOperator, ColumnDef, DataType, Expr, Ident, Insert,
ObjectName, ObjectNamePart, SelectItem, SetExpr, Statement, TableFactor, TableObject,
TableWithJoins, UpdateTableFromKind, Value, ValueWithSpan,
};
use sqlparser::dialect::SQLiteDialect;
use sqlparser::parser::Parser;
use sqlparser::visit_mut::{self, VisitorMut};
use std::ops::ControlFlow;
use std::collections::HashSet;
use ts_rs::TS;
use uhlc::Timestamp;
// Der Name der Tombstone-Spalte als Konstante, um "Magic Strings" zu vermeiden.
pub const TOMBSTONE_COLUMN_NAME: &str = "haex_tombstone";
const EXCLUDED_TABLES: &[&str] = &["haex_crdt_log"];
#[derive(Serialize, Deserialize, TS)]
#[ts(export)]
#[serde(tag = "type", content = "details")]
pub enum ProxyError {
/// Der SQL-Code konnte nicht geparst werden.
ParseError {
reason: String,
},
/// Ein Fehler ist während der Ausführung in der Datenbank aufgetreten.
ExecutionError {
sql: String,
reason: String,
},
/// Ein Fehler ist beim Verwalten der Transaktion aufgetreten.
TransactionError {
reason: String,
},
/// Ein SQL-Statement wird vom Proxy nicht unterstützt (z.B. DELETE von einer Subquery).
UnsupportedStatement {
description: String,
},
HlcError {
reason: String,
},
}
// Tabellen, die von der Proxy-Logik ausgeschlossen sind.
const EXCLUDED_TABLES: &[&str] = &["haex_crdt_settings", "haex_crdt_logs"];
pub struct SqlProxy;
@ -18,145 +50,369 @@ impl SqlProxy {
Self {}
}
// Die zentrale Ausführungsfunktion
pub fn execute(&self, sql: &str, conn: &Connection) -> Result<(), String> {
// 1. Parsen des SQL-Strings in einen oder mehrere ASTs.
// Ein String kann mehrere, durch Semikolon getrennte Anweisungen enthalten.
/// Führt SQL-Anweisungen aus, nachdem sie für CRDT-Konformität transformiert wurden.
pub fn execute(
&self,
sql: &str,
conn: &mut rusqlite::Connection,
hlc_service: &HlcService,
) -> Result<Vec<String>, ProxyError> {
let dialect = SQLiteDialect {};
let mut ast_vec =
Parser::parse_sql(&dialect, sql).map_err(|e| format!("SQL-Parse-Fehler: {}", e))?;
let mut ast_vec = Parser::parse_sql(&dialect, sql).map_err(|e| ProxyError::ParseError {
reason: e.to_string(),
})?;
let mut modified_schema_tables = HashSet::new();
let tx = conn
.transaction()
.map_err(|e| ProxyError::TransactionError {
reason: e.to_string(),
})?;
let hlc_timestamp =
hlc_service
.new_timestamp_and_persist(&tx)
.map_err(|e| ProxyError::HlcError {
reason: e.to_string(),
})?;
// 2. Wir durchlaufen und transformieren jedes einzelne Statement im AST-Vektor.
for statement in &mut ast_vec {
self.transform_statement(statement)?;
}
// 3. Ausführen der (möglicherweise modifizierten) Anweisungen in einer einzigen Transaktion.
// Dies stellt sicher, dass alle Operationen atomar sind.
let tx = conn.transaction().map_err(|e| e.to_string())?;
for statement in ast_vec {
let final_sql = statement.to_string();
tx.execute(&final_sql)
.map_err(|e| format!("DB-Ausführungsfehler bei '{}': {}", final_sql, e))?;
// Wenn es ein CREATE/ALTER TABLE war, müssen die Trigger neu erstellt werden.
// Dies geschieht innerhalb derselben Transaktion.
if let Statement::CreateTable { name, .. } | Statement::AlterTable { name, .. } =
statement
{
let table_name = name.0.last().unwrap().value.clone();
let trigger_manager = crate::trigger_manager::TriggerManager::new(&tx);
trigger_manager
.setup_triggers_for_table(&table_name)
.map_err(|e| {
format!("Trigger-Setup-Fehler für Tabelle '{}': {}", table_name, e)
})?;
if let Some(table_name) = self.transform_statement(statement, Some(&hlc_timestamp))? {
modified_schema_tables.insert(table_name);
}
}
tx.commit().map_err(|e| e.to_string())?;
Ok(())
for statement in ast_vec {
let final_sql = statement.to_string();
tx.execute(&final_sql, [])
.map_err(|e| ProxyError::ExecutionError {
sql: final_sql,
reason: e.to_string(),
})?;
}
tx.commit().map_err(|e| ProxyError::TransactionError {
reason: e.to_string(),
})?;
Ok(modified_schema_tables.into_iter().collect())
}
// Diese Methode wendet die Transformation auf ein einzelnes Statement an.
fn transform_statement(&self, statement: &mut Statement) -> Result<(), String> {
let mut visitor = TombstoneVisitor;
// `visit` durchläuft den AST und ruft die entsprechenden `visit_*_mut` Methoden auf.
statement.visit(&mut visitor);
Ok(())
}
}
struct TombstoneVisitor;
impl TombstoneVisitor {
fn is_audited_table(&self, table_name: &str) -> bool {
!EXCLUDED_TABLES.contains(&table_name.to_lowercase().as_str())
}
}
impl VisitorMut for TombstoneVisitor {
type Break = ();
// Diese Methode wird für jedes Statement im AST aufgerufen
fn visit_statement_mut(&mut self, stmt: &mut Statement) -> ControlFlow<Self::Break> {
/// Wendet die Transformation auf ein einzelnes Statement an.
fn transform_statement(
&self,
stmt: &mut Statement,
hlc_timestamp: Option<&Timestamp>,
) -> Result<Option<String>, ProxyError> {
match stmt {
// Fall 1: CREATE TABLE
Statement::CreateTable { name, columns, .. } => {
let table_name = name.0.last().unwrap().value.as_str();
if self.is_audited_table(table_name) {
// Füge die 'tombstone'-Spalte hinzu, wenn sie nicht existiert
if !columns
.iter()
.any(|c| c.name.value.to_lowercase() == TOMBSTONE_COLUMN_NAME)
{
columns.push(ColumnDef {
name: Ident::new(TOMBSTONE_COLUMN_NAME),
data_type: DataType::Integer,
collation: None,
options: vec![], // Default ist 0
});
sqlparser::ast::Statement::Query(query) => {
if let SetExpr::Select(select) = &mut *query.body {
let mut tombstone_filters = Vec::new();
for twj in &select.from {
if let TableFactor::Table { name, alias, .. } = &twj.relation {
if self.is_audited_table(name) {
let table_idents = if let Some(a) = alias {
vec![a.name.clone()]
} else {
name.0
.iter()
.filter_map(|part| match part {
ObjectNamePart::Identifier(id) => Some(id.clone()),
_ => None,
})
.collect::<Vec<_>>()
};
let column_ident = Ident::new(TOMBSTONE_COLUMN);
let full_ident = [table_idents, vec![column_ident]].concat();
let filter = Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(full_ident)),
op: BinaryOperator::Eq,
right: Box::new(Expr::Value(
sqlparser::ast::Value::Number("1".to_string(), false)
.into(),
)),
};
tombstone_filters.push(filter);
}
}
}
if !tombstone_filters.is_empty() {
let combined_filter = tombstone_filters
.into_iter()
.reduce(|acc, expr| Expr::BinaryOp {
left: Box::new(acc),
op: BinaryOperator::And,
right: Box::new(expr),
})
.unwrap();
match &mut select.selection {
Some(existing) => {
*existing = Expr::BinaryOp {
left: Box::new(existing.clone()),
op: BinaryOperator::And,
right: Box::new(combined_filter),
};
}
None => {
select.selection = Some(combined_filter);
}
}
}
}
// Hinweis: UNION, EXCEPT etc. werden hier nicht behandelt, was dem bisherigen Code entspricht.
}
Statement::CreateTable(create_table) => {
if self.is_audited_table(&create_table.name) {
self.add_crdt_columns(&mut create_table.columns);
return Ok(Some(
create_table
.name
.to_string()
.trim_matches('`')
.trim_matches('"')
.to_string(),
));
}
}
Statement::Insert(insert_stmt) => {
if let TableObject::TableName(name) = &insert_stmt.table {
if self.is_audited_table(name) {
if let Some(ts) = hlc_timestamp {
self.add_hlc_to_insert(insert_stmt, ts);
}
}
}
}
// Fall 2: DELETE
Statement::Delete(del_stmt) => {
// Wandle das DELETE-Statement in ein UPDATE-Statement um
let new_update = Statement::Update {
table: del_stmt.from.clone(),
assignments: vec![],
value: Box::new(Expr::Value(Value::Number("1".to_string(), false))),
from: None,
selection: del_stmt.selection.clone(),
returning: None,
/* Statement::Update(update_stmt) => {
if let TableFactor::Table { name, .. } = &update_stmt.table.relation {
if self.is_audited_table(&name) {
if let Some(ts) = hlc_timestamp {
update_stmt.assignments.push(self.create_hlc_assignment(ts));
}
}
}
} */
Statement::Update {
table,
assignments: assignments,
from,
selection,
returning,
or,
} => {
if let TableFactor::Table { name, .. } = &table.relation {
if self.is_audited_table(&name) {
if let Some(ts) = hlc_timestamp {
assignments.push(self.create_hlc_assignment(ts));
}
}
}
*stmt = Statement::Update {
table: table.clone(),
assignments: assignments.clone(),
from: from.clone(),
selection: selection.clone(),
returning: returning.clone(),
or: *or,
};
// Ersetze das aktuelle Statement im AST
*stmt = new_update;
}
Statement::Delete(del_stmt) => {
let table_name = self.extract_table_name_from_from(&del_stmt.from);
if let Some(name) = table_name {
if self.is_audited_table(&name) {
// GEÄNDERT: Übergibt den Zeitstempel an die Transformationsfunktion
if let Some(ts) = hlc_timestamp {
self.transform_delete_to_update(stmt, ts);
}
}
} else {
return Err(ProxyError::UnsupportedStatement {
description: "DELETE from non-table source or multiple tables".to_string(),
});
}
}
Statement::AlterTable { name, .. } => {
if self.is_audited_table(name) {
return Ok(Some(
name.to_string()
.trim_matches('`')
.trim_matches('"')
.to_string(),
));
}
}
_ => {}
}
// Setze die Traversierung für untergeordnete Knoten fort (z.B. SELECTs)
visit_mut::walk_statement_mut(self, stmt)
Ok(None)
}
// Diese Methode wird für jede Query (auch Subqueries) aufgerufen
fn visit_query_mut(&mut self, query: &mut Query) -> ControlFlow<Self::Break> {
// Zuerst rekursiv in die Tiefe gehen, um innere Queries zuerst zu bearbeiten
visit_mut::walk_query_mut(self, query);
/// Fügt die Tombstone-Spalte zu einer Liste von Spaltendefinitionen hinzu.
fn add_tombstone_column(&self, columns: &mut Vec<ColumnDef>) {
if !columns
.iter()
.any(|c| c.name.value.to_lowercase() == TOMBSTONE_COLUMN)
{
columns.push(ColumnDef {
name: Ident::new(TOMBSTONE_COLUMN),
data_type: DataType::Integer(None),
options: vec![],
});
}
}
// Dann die WHERE-Klausel der aktuellen Query anpassen
if let Some(from_clause) = query.body.as_select_mut().map(|s| &mut s.from) {
// (Hier würde eine komplexere Logik zur Analyse der Joins und Tabellen stehen)
// Vereinfacht nehmen wir an, wir fügen es für die erste Tabelle hinzu.
let table_name = if let Some(relation) = from_clause.get_mut(0) {
// Diese Logik muss verfeinert werden, um Aliase etc. zu behandeln
relation.relation.to_string()
/// Prüft, ob eine Tabelle von der Proxy-Logik betroffen sein soll.
fn is_audited_table(&self, name: &ObjectName) -> bool {
let table_name = name.to_string().to_lowercase();
let table_name = table_name.trim_matches('`').trim_matches('"');
!EXCLUDED_TABLES.contains(&table_name)
}
fn extract_table_name_from_from(&self, from: &sqlparser::ast::FromTable) -> Option<ObjectName> {
let tables = match from {
sqlparser::ast::FromTable::WithFromKeyword(from)
| sqlparser::ast::FromTable::WithoutKeyword(from) => from,
};
if tables.len() == 1 {
if let TableFactor::Table { name, .. } = &tables[0].relation {
Some(name.clone())
} else {
"".to_string()
};
None
}
} else {
None
}
}
if self.is_audited_table(&table_name) {
let tombstone_check = Expr::BinaryOp {
left: Box::new(Expr::Identifier(Ident::new(TOMBSTONE_COLUMN_NAME))),
op: sqlparser::ast::BinaryOperator::Eq,
right: Box::new(Expr::Value(Value::Number("0".to_string(), false))),
};
fn extract_table_name(&self, from: &[TableWithJoins]) -> Option<ObjectName> {
if from.len() == 1 {
if let TableFactor::Table { name, .. } = &from[0].relation {
Some(name.clone())
} else {
None
}
} else {
None
}
}
let existing_selection = query.selection.take();
let new_selection = match existing_selection {
Some(expr) => Expr::BinaryOp {
left: Box::new(expr),
op: sqlparser::ast::BinaryOperator::And,
right: Box::new(tombstone_check),
},
None => tombstone_check,
};
query.selection = Some(Box::new(new_selection));
fn create_tombstone_assignment(&self) -> Assignment {
Assignment {
target: AssignmentTarget::ColumnName(ObjectName(vec![ObjectNamePart::Identifier(
Ident::new(TOMBSTONE_COLUMN),
)])),
value: Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
}
}
fn add_tombstone_filter(&self, selection: &mut Option<Expr>) {
let tombstone_expr = Expr::BinaryOp {
left: Box::new(Expr::Identifier(Ident::new(TOMBSTONE_COLUMN))),
op: BinaryOperator::Eq,
// HIER IST DIE FINALE KORREKTUR:
right: Box::new(Expr::Value(Value::Number("0".to_string(), false).into())),
};
match selection {
Some(existing) => {
// Kombiniere mit AND, wenn eine WHERE-Klausel existiert
*selection = Some(Expr::BinaryOp {
left: Box::new(existing.clone()),
op: BinaryOperator::And,
right: Box::new(tombstone_expr),
});
}
None => {
// Setze neue WHERE-Klausel, wenn keine existiert
*selection = Some(tombstone_expr);
}
}
}
ControlFlow::Continue(())
fn add_crdt_columns(&self, columns: &mut Vec<ColumnDef>) {
if !columns.iter().any(|c| c.name.value == TOMBSTONE_COLUMN) {
columns.push(ColumnDef {
name: Ident::new(TOMBSTONE_COLUMN),
data_type: DataType::Integer(None),
options: vec![],
});
}
if !columns.iter().any(|c| c.name.value == HLC_TIMESTAMP_COLUMN) {
columns.push(ColumnDef {
name: Ident::new(HLC_TIMESTAMP_COLUMN),
data_type: DataType::Text, // HLC wird als String gespeichert
options: vec![],
});
}
}
fn transform_delete_to_update(&self, stmt: &mut Statement, hlc_timestamp: &Timestamp) {
if let Statement::Delete(del_stmt) = stmt {
let table_to_update = match &del_stmt.from {
sqlparser::ast::FromTable::WithFromKeyword(from)
| sqlparser::ast::FromTable::WithoutKeyword(from) => from[0].clone(),
};
// Erstellt beide Zuweisungen
let assignments = vec![
self.create_tombstone_assignment(),
self.create_hlc_assignment(hlc_timestamp),
];
*stmt = Statement::Update {
table: table_to_update,
assignments,
from: None,
selection: del_stmt.selection.clone(),
returning: None,
or: None,
};
}
}
fn add_hlc_to_insert(
&self,
insert_stmt: &mut sqlparser::ast::Insert,
ts: &Timestamp,
) -> Result<(), ProxyError> {
insert_stmt.columns.push(Ident::new(HLC_TIMESTAMP_COLUMN));
match insert_stmt.source.as_mut() {
Some(query) => match &mut *query.body {
// Dereferenziere die Box mit *
SetExpr::Values(values) => {
for row in &mut values.rows {
row.push(Expr::Value(
Value::SingleQuotedString(ts.to_string()).into(),
));
}
}
SetExpr::Select(select) => {
let hlc_expr = Expr::Value(Value::SingleQuotedString(ts.to_string()).into());
select.projection.push(SelectItem::UnnamedExpr(hlc_expr));
}
_ => {
return Err(ProxyError::UnsupportedStatement {
description: "INSERT with unsupported source".to_string(),
});
}
},
None => {
return Err(ProxyError::UnsupportedStatement {
description: "INSERT statement has no source".to_string(),
});
}
}
Ok(())
}
/// Erstellt eine Zuweisung `haex_modified_hlc = '...'`
// NEU: Hilfsfunktion
fn create_hlc_assignment(&self, ts: &Timestamp) -> Assignment {
Assignment {
target: AssignmentTarget::ColumnName(ObjectName(vec![ObjectNamePart::Identifier(
Ident::new(HLC_TIMESTAMP_COLUMN),
)])),
value: Expr::Value(Value::SingleQuotedString(ts.to_string()).into()),
}
}
}

View File

@ -1,121 +1,169 @@
// In src-tauri/src/trigger_manager.rs -> impl<'a> TriggerManager<'a>
use crate::crdt::hlc;
use rusqlite::{Connection, Result, Row};
use serde::Serialize;
use std::fmt::Write;
use ts_rs::TS;
// In einem neuen Modul, z.B. src-tauri/src/trigger_manager.rs
use crate::crdt::proxy::ColumnInfo;
use rusqlite::{params, Connection, Result, Transaction};
use std::sync::{Arc, Mutex};
use tauri::AppHandle;
// the z_ prefix should make sure that these triggers are executed lasts
const INSERT_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_insert";
const UPDATE_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_update";
pub struct TriggerManager<'a> {
tx: &'a Transaction<'a>,
pub const LOG_TABLE_NAME: &str = "haex_crdt_logs";
pub const TOMBSTONE_COLUMN: &str = "haex_tombstone";
pub const HLC_TIMESTAMP_COLUMN: &str = "haex_hlc_timestamp";
#[derive(Debug, Serialize, TS)]
#[ts(export)]
#[serde(tag = "status", content = "details")]
pub enum TriggerSetupResult {
Success,
TableNotFound,
TombstoneColumnMissing { column_name: String },
PrimaryKeyMissing,
}
impl<'a> TriggerManager<'a> {
pub fn new(tx: &'a Transaction<'a>) -> Self {
Self { tx }
struct ColumnInfo {
name: String,
is_pk: bool,
}
impl ColumnInfo {
fn from_row(row: &Row) -> Result<Self> {
Ok(ColumnInfo {
name: row.get("name")?,
is_pk: row.get::<_, i64>("pk")? > 0,
})
}
}
pub struct TriggerManager;
impl TriggerManager {
pub fn new() -> Self {
TriggerManager {}
}
// Die Hauptfunktion, die alles einrichtet
pub fn setup_triggers_for_table(&self, table_name: &str) -> Result<()> {
let columns = self.get_table_schema(table_name)?;
let pk_cols: Vec<_> = columns
pub fn setup_triggers_for_table(
&self,
conn: &mut Connection,
table_name: &str,
) -> Result<TriggerSetupResult, rusqlite::Error> {
let columns = self.get_table_schema(conn, table_name)?;
if columns.is_empty() {
return Ok(TriggerSetupResult::TableNotFound);
}
if !columns.iter().any(|c| c.name == TOMBSTONE_COLUMN) {
return Ok(TriggerSetupResult::TombstoneColumnMissing {
column_name: TOMBSTONE_COLUMN.to_string(),
});
}
let pks: Vec<String> = columns
.iter()
.filter(|c| c.is_pk)
.map(|c| c.name.as_str())
.map(|c| c.name.clone())
.collect();
let other_cols: Vec<_> = columns
.iter()
.filter(|c| !c.is_pk && c.name != "tombstone")
.map(|c| c.name.as_str())
.collect();
let drop_sql = self.generate_drop_triggers_sql(table_name);
let insert_sql = self.generate_insert_trigger_sql(table_name, &pk_cols, &other_cols);
let update_sql = self.generate_update_trigger_sql(table_name, &pk_cols, &other_cols);
self.tx.execute_batch(&drop_sql)?;
self.tx.execute_batch(&insert_sql)?;
self.tx.execute_batch(&update_sql)?;
Ok(())
}
fn get_table_schema(&self, table_name: &str) -> Result<Vec<ColumnInfo>> {
let sql = format!("PRAGMA table_info('{}')", table_name);
let mut stmt = self.tx.prepare(&sql)?;
let rows = stmt.query_map(|row| {
let pk_val: i64 = row.get(5)?;
Ok(ColumnInfo {
name: row.get(1)?,
is_pk: pk_val > 0,
})
})?;
let mut columns = Vec::new();
for row_result in rows {
columns.push(row_result?);
if pks.is_empty() {
return Ok(TriggerSetupResult::PrimaryKeyMissing);
}
Ok(columns)
let cols_to_track: Vec<String> = columns
.iter()
.filter(|c| !c.is_pk && c.name != TOMBSTONE_COLUMN)
.map(|c| c.name.clone())
.collect();
let insert_trigger_sql = self.generate_insert_trigger_sql(table_name, &pks, &cols_to_track);
let update_trigger_sql = self.generate_update_trigger_sql(table_name, &pks, &cols_to_track);
let tx = conn.transaction()?;
tx.execute_batch(&format!("{}\n{}", insert_trigger_sql, update_trigger_sql))?;
tx.commit()?;
Ok(TriggerSetupResult::Success)
}
//... Implementierung der SQL-Generierungsfunktionen...
fn get_table_schema(&self, conn: &Connection, table_name: &str) -> Result<Vec<ColumnInfo>> {
let sql = format!("PRAGMA table_info('{}');", table_name);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], ColumnInfo::from_row)?;
rows.collect()
}
fn generate_update_trigger_sql(&self, table_name: &str, pks: &[&str], cols: &[&str]) -> String {
// Erstellt dynamisch die Key-Value-Paare für das JSON-Objekt des Primärschlüssels.
let pk_json_payload_new = pks
fn generate_insert_trigger_sql(
&self,
table_name: &str,
pks: &[String],
cols: &[String],
) -> String {
let pk_json_payload = pks
.iter()
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
.collect::<Vec<_>>()
.join(", ");
let pk_json_payload_old = pks
let column_inserts = cols.iter().fold(String::new(), |mut acc, col| {
writeln!(&mut acc, "INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk, column_name, value) VALUES (NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"));",
log_table = LOG_TABLE_NAME,
hlc_col = HLC_TIMESTAMP_COLUMN,
table = table_name,
pk_payload = pk_json_payload,
column = col
).unwrap();
acc
});
// Verwende die neue Konstante für den Trigger-Namen
let trigger_name = INSERT_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
format!(
"CREATE TRIGGER IF NOT EXISTS {trigger_name}
AFTER INSERT ON \"{table_name}\"
FOR EACH ROW
BEGIN
{column_inserts}
END;"
)
}
fn generate_update_trigger_sql(
&self,
table_name: &str,
pks: &[String],
cols: &[String],
) -> String {
let pk_json_payload = pks
.iter()
.map(|pk| format!("'{}', OLD.\"{}\"", pk, pk))
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
.collect::<Vec<_>>()
.join(", ");
// Erstellt die einzelnen INSERT-Anweisungen für jede Spalte
let column_updates = cols.iter().map(|col| format!(
r#"
-- Protokolliere die Spaltenänderung, wenn sie stattgefunden hat und es kein Soft-Delete ist
INSERT INTO crdt_log (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value)
SELECT
'placeholder_hlc', -- TODO: HLC-Funktion hier aufrufen
'UPDATE',
'{table}',
json_object({pk_payload_new}),
'{column}',
json_object('value', NEW."{column}"),
json_object('value', OLD."{column}")
WHERE
NEW."{column}" IS NOT OLD."{column}"
"#,
table = table_name,
pk_payload_new = pk_json_payload_new,
column = col
)).collect::<Vec<_>>().join("\n");
let column_updates = cols.iter().fold(String::new(), |mut acc, col| {
writeln!(&mut acc, "IF NEW.\"{column}\" IS NOT OLD.\"{column}\" THEN INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value) VALUES (NEW.\"{hlc_col}\", 'UPDATE', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"), json_object('value', OLD.\"{column}\")); END IF;",
log_table = LOG_TABLE_NAME,
hlc_col = HLC_TIMESTAMP_COLUMN,
table = table_name,
pk_payload = pk_json_payload,
column = col).unwrap();
acc
});
// Erstellt die Logik für den Soft-Delete
let soft_delete_logic = format!(
r#"
-- Protokolliere den Soft-Delete
INSERT INTO crdt_log (hlc_timestamp, op_type, table_name, row_pk)
SELECT
'placeholder_hlc', -- TODO: HLC-Funktion hier aufrufen
'DELETE',
'{table}',
json_object({pk_payload_old})
WHERE
OLD.{tombstone_col} = 0
"#,
"IF NEW.{tombstone_col} = 1 AND OLD.{tombstone_col} = 0 THEN INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk) VALUES (NEW.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload})); END IF;",
log_table = LOG_TABLE_NAME,
hlc_col = HLC_TIMESTAMP_COLUMN,
tombstone_col = TOMBSTONE_COLUMN,
table = table_name,
pk_payload_old = pk_json_payload_old
pk_payload = pk_json_payload
);
// Kombiniert alles zu einem einzigen Trigger
// Verwende die neue Konstante für den Trigger-Namen
let trigger_name = UPDATE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
format!(
"CREATE TRIGGER IF NOT EXISTS {table_name}_crdt_update
AFTER UPDATE ON {table_name}
"CREATE TRIGGER IF NOT EXISTS {trigger_name}
AFTER UPDATE ON \"{table_name}\"
FOR EACH ROW
BEGIN
{column_updates}