refatored vault

This commit is contained in:
2025-09-24 11:32:11 +02:00
parent d5670ca470
commit 1a40f9d2aa
16 changed files with 2364 additions and 1481 deletions

View File

@ -1,6 +1,7 @@
// src/hlc_service.rs
// src-tauri/src/crdt/hlc.rs
use crate::table_names::TABLE_CRDT_CONFIGS;
use rusqlite::{params, Connection, Result as RusqliteResult, Transaction};
use rusqlite::{params, Connection, Transaction};
use std::{
fmt::Debug,
str::FromStr,
@ -14,8 +15,6 @@ use uuid::Uuid;
const HLC_NODE_ID_TYPE: &str = "hlc_node_id";
const HLC_TIMESTAMP_TYPE: &str = "hlc_timestamp";
//pub const TABLE_CRDT_CONFIGS: &str = "haex_crdt_settings";
#[derive(Error, Debug)]
pub enum HlcError {
#[error("Database error: {0}")]
@ -28,104 +27,215 @@ pub enum HlcError {
MutexPoisoned,
#[error("Failed to create node ID: {0}")]
CreateNodeId(#[from] uhlc::SizeError),
#[error("No database connection available")]
NoConnection,
#[error("HLC service not initialized")]
NotInitialized,
#[error("Hex decode error: {0}")]
HexDecode(String),
#[error("UTF-8 conversion error: {0}")]
Utf8Error(String),
}
/// A thread-safe, persistent HLC service.
#[derive(Clone)]
pub struct HlcService(Arc<Mutex<HLC>>);
pub struct HlcService {
hlc: Arc<Mutex<Option<HLC>>>,
}
impl HlcService {
/// Creates a new HLC service, initializing it from the database or creating a new
/// persistent identity if one does not exist.
pub fn new(conn: &mut Connection) -> Result<Self, HlcError> {
// 1. Manage persistent node identity.
let node_id = Self::get_or_create_node_id(conn)?;
// 2. Create HLC instance with stable identity using the HLCBuilder.
let hlc = HLCBuilder::new()
.with_id(node_id)
.with_max_delta(Duration::from_secs(1)) // Example of custom configuration
.build();
// 3. Load the last persisted timestamp and update the clock.
let last_state_str: RusqliteResult<String> = conn.query_row(
&format!("SELECT value FROM {} WHERE key = ?1", TABLE_CRDT_CONFIGS),
params![HLC_TIMESTAMP_TYPE],
|row| row.get(0),
);
if let Ok(state_str) = last_state_str {
let timestamp =
Timestamp::from_str(&state_str).map_err(|e| HlcError::ParseTimestamp(e.cause))?;
// Update the clock with the persisted state.
// we might want to handle the error case where the clock drifts too far.
hlc.update_with_timestamp(&timestamp)
.map_err(|e| HlcError::Parse(e.to_string()))?;
/// Creates a new HLC service. The HLC will be initialized on first database access.
pub fn new() -> Self {
HlcService {
hlc: Arc::new(Mutex::new(None)),
}
let hlc_arc = Arc::new(Mutex::new(hlc));
Ok(HlcService(hlc_arc))
}
/// Generates a new timestamp and immediately persists the HLC's new state.
/// This method MUST be called within an existing database transaction (`tx`)
/// along with the actual data operation that this timestamp is for.
/// This design ensures atomicity: the data is saved with its timestamp,
/// and the clock state is updated, or none of it is.
/// Factory-Funktion: Erstellt und initialisiert einen neuen HLC-Service aus einer bestehenden DB-Verbindung.
/// Dies ist die bevorzugte Methode zur Instanziierung.
pub fn new_from_connection(conn: &mut Connection) -> Result<Self, HlcError> {
// 1. Hole oder erstelle eine persistente Node-ID
let node_id = Self::get_or_create_node_id(conn)?;
// 2. Erstelle eine HLC-Instanz mit stabiler Identität
let hlc = HLCBuilder::new()
.with_id(node_id)
.with_max_delta(Duration::from_secs(1))
.build();
// 3. Lade und wende den letzten persistenten Zeitstempel an
if let Some(last_timestamp) = Self::load_last_timestamp(conn)? {
hlc.update_with_timestamp(&last_timestamp).map_err(|e| {
HlcError::Parse(format!(
"Failed to update HLC with persisted timestamp: {:?}",
e
))
})?;
}
Ok(HlcService {
hlc: Arc::new(Mutex::new(Some(hlc))),
})
}
/* /// Initializes the HLC service with data from the database.
/// This should be called once after the database connection is available.
pub fn initialize(&self, conn: &mut Connection) -> Result<(), HlcError> {
let mut initialized = self
.initialized
.lock()
.map_err(|_| HlcError::MutexPoisoned)?;
if *initialized {
return Ok(()); // Already initialized
}
let mut hlc_guard = self.hlc.lock().map_err(|_| HlcError::MutexPoisoned)?;
// 1. Get or create persistent node ID
let node_id = Self::get_or_create_node_id(conn)?;
// 2. Create HLC instance with stable identity
let hlc = HLCBuilder::new()
.with_id(node_id)
.with_max_delta(Duration::from_secs(1))
.build();
// 3. Load and apply last persisted timestamp
if let Some(last_timestamp) = Self::load_last_timestamp(conn)? {
hlc.update_with_timestamp(&last_timestamp).map_err(|e| {
HlcError::Parse(format!(
"Failed to update HLC with persisted timestamp: {:?}",
e
))
})?;
}
*hlc_guard = Some(hlc);
*initialized = true;
Ok(())
} */
/* /// Ensures the HLC service is initialized, calling initialize if needed.
pub fn ensure_initialized(&self, conn: &mut Connection) -> Result<(), HlcError> {
let initialized = self
.initialized
.lock()
.map_err(|_| HlcError::MutexPoisoned)?;
if !*initialized {
drop(initialized); // Release lock before calling initialize
self.initialize(conn)?;
}
Ok(())
} */
/* /// Checks if the service is initialized without requiring a database connection.
pub fn is_initialized(&self) -> Result<bool, HlcError> {
let initialized = self
.initialized
.lock()
.map_err(|_| HlcError::MutexPoisoned)?;
Ok(*initialized)
} */
/// Generiert einen neuen Zeitstempel und persistiert den neuen Zustand des HLC sofort.
/// Muss innerhalb einer bestehenden Datenbanktransaktion aufgerufen werden.
pub fn new_timestamp_and_persist<'tx>(
&self,
tx: &Transaction<'tx>,
) -> Result<Timestamp, HlcError> {
let hlc = self.0.lock().map_err(|_| HlcError::MutexPoisoned)?;
let new_timestamp = hlc.new_timestamp();
let timestamp_str = new_timestamp.to_string();
let mut hlc_guard = self.hlc.lock().map_err(|_| HlcError::MutexPoisoned)?;
let hlc = hlc_guard.as_mut().ok_or(HlcError::NotInitialized)?;
let new_timestamp = hlc.new_timestamp();
Self::persist_timestamp(tx, &new_timestamp)?;
Ok(new_timestamp)
}
/// Erstellt einen neuen Zeitstempel, ohne ihn zu persistieren (z.B. für Leseoperationen).
pub fn new_timestamp(&self) -> Result<Timestamp, HlcError> {
let mut hlc_guard = self.hlc.lock().map_err(|_| HlcError::MutexPoisoned)?;
let hlc = hlc_guard.as_mut().ok_or(HlcError::NotInitialized)?;
Ok(hlc.new_timestamp())
}
/// Aktualisiert den HLC mit einem externen Zeitstempel (für die Synchronisation).
pub fn update_with_timestamp(&self, timestamp: &Timestamp) -> Result<(), HlcError> {
let mut hlc_guard = self.hlc.lock().map_err(|_| HlcError::MutexPoisoned)?;
let hlc = hlc_guard.as_mut().ok_or(HlcError::NotInitialized)?;
hlc.update_with_timestamp(timestamp)
.map_err(|e| HlcError::Parse(format!("Failed to update HLC: {:?}", e)))
}
/// Lädt den letzten persistierten Zeitstempel aus der Datenbank.
fn load_last_timestamp(conn: &Connection) -> Result<Option<Timestamp>, HlcError> {
let query = format!("SELECT value FROM {} WHERE key = ?1", TABLE_CRDT_CONFIGS);
match conn.query_row(&query, params![HLC_TIMESTAMP_TYPE], |row| {
row.get::<_, String>(0)
}) {
Ok(state_str) => {
let timestamp = Timestamp::from_str(&state_str).map_err(|e| {
HlcError::ParseTimestamp(format!("Invalid timestamp format: {:?}", e))
})?;
Ok(Some(timestamp))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(HlcError::Database(e)),
}
}
/// Persistiert einen Zeitstempel in der Datenbank innerhalb einer Transaktion.
fn persist_timestamp(tx: &Transaction, timestamp: &Timestamp) -> Result<(), HlcError> {
let timestamp_str = timestamp.to_string();
tx.execute(
&format!(
"INSERT INTO {} (key, value) VALUES (?1,?2)
"INSERT INTO {} (key, value) VALUES (?1, ?2)
ON CONFLICT(key) DO UPDATE SET value = excluded.value",
TABLE_CRDT_CONFIGS
),
params![HLC_TIMESTAMP_TYPE, timestamp_str],
)?;
Ok(new_timestamp)
Ok(())
}
/// Retrieves or creates and persists a stable node ID for the HLC.
/// Holt oder erstellt und persistiert eine stabile Node-ID für den HLC.
fn get_or_create_node_id(conn: &mut Connection) -> Result<ID, HlcError> {
let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
let query = format!("SELECT value FROM {} WHERE key = ?1", TABLE_CRDT_CONFIGS);
let query = format!("SELECT value FROM {} WHERE key =?1", TABLE_CRDT_CONFIGS);
match tx.query_row(&query, params![HLC_NODE_ID_TYPE], |row| {
row.get::<_, String>(0)
let id = match tx.query_row(&query, params![HLC_NODE_ID_TYPE], |row| {
row.get::<_, Vec<u8>>(0)
}) {
Ok(id_str) => {
// ID exists, parse and return it.
let id_bytes = hex::decode(id_str).map_err(|e| HlcError::Parse(e.to_string()))?;
let id = ID::try_from(id_bytes.as_slice())?;
tx.commit()?;
Ok(id)
}
Ok(id_bytes) => ID::try_from(id_bytes.as_slice())
.map_err(|e| HlcError::Parse(format!("Invalid node ID format: {:?}", e)))?,
Err(rusqlite::Error::QueryReturnedNoRows) => {
// No ID found, create, persist, and return a new one.
let new_id_bytes = Uuid::new_v4().as_bytes().to_vec();
let new_id = ID::try_from(new_id_bytes.as_slice())?;
let new_id_str = hex::encode(new_id.to_le_bytes());
tx.execute(
&format!(
"INSERT INTO {} (key, value) VALUES (?1, ?2)",
TABLE_CRDT_CONFIGS
),
params![HLC_NODE_ID_TYPE, new_id_str],
params![HLC_NODE_ID_TYPE, new_id_bytes.as_slice()],
)?;
tx.commit()?;
Ok(new_id)
new_id
}
Err(e) => Err(HlcError::from(e)),
}
Err(e) => return Err(HlcError::Database(e)),
};
tx.commit()?;
Ok(id)
}
}
impl Default for HlcService {
fn default() -> Self {
Self::new()
}
}

View File

@ -1,3 +1,3 @@
pub mod hlc;
pub mod proxy;
pub mod transformer;
pub mod trigger;

View File

@ -1,416 +0,0 @@
// In src-tauri/src/crdt/proxy.rs
use crate::crdt::hlc::HlcService;
use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN};
use crate::table_names::{TABLE_CRDT_CONFIGS, TABLE_CRDT_LOGS};
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use sqlparser::ast::{
Assignment, AssignmentTarget, BinaryOperator, ColumnDef, DataType, Expr, Ident, Insert,
ObjectName, ObjectNamePart, SelectItem, SetExpr, Statement, TableFactor, TableObject,
TableWithJoins, UpdateTableFromKind, Value, ValueWithSpan,
};
use sqlparser::dialect::SQLiteDialect;
use sqlparser::parser::Parser;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use tauri::{path::BaseDirectory, AppHandle, Manager, State};
use ts_rs::TS;
use uhlc::Timestamp;
pub struct DbConnection(pub Arc<Mutex<Option<Connection>>>);
#[derive(Serialize, Deserialize, TS)]
#[ts(export)]
#[serde(tag = "type", content = "details")]
pub enum ProxyError {
/// Der SQL-Code konnte nicht geparst werden.
ParseError {
reason: String,
},
/// Ein Fehler ist während der Ausführung in der Datenbank aufgetreten.
ExecutionError {
sql: String,
reason: String,
},
/// Ein Fehler ist beim Verwalten der Transaktion aufgetreten.
TransactionError {
reason: String,
},
/// Ein SQL-Statement wird vom Proxy nicht unterstützt (z.B. DELETE von einer Subquery).
UnsupportedStatement {
description: String,
},
HlcError {
reason: String,
},
}
// Tabellen, die von der Proxy-Logik ausgeschlossen sind.
const EXCLUDED_TABLES: &[&str] = &[TABLE_CRDT_CONFIGS, TABLE_CRDT_LOGS];
pub struct SqlProxy;
impl SqlProxy {
pub fn new() -> Self {
Self {}
}
/// Führt SQL-Anweisungen aus, nachdem sie für CRDT-Konformität transformiert wurden.
pub fn execute(
&self,
sql: &str,
params: Vec<JsonValue>,
state: State<'_, DbConnection>,
hlc_service: &HlcService,
) -> Result<Vec<String>, ProxyError> {
let dialect = SQLiteDialect {};
let mut ast_vec = Parser::parse_sql(&dialect, sql).map_err(|e| ProxyError::ParseError {
reason: e.to_string(),
})?;
let mut modified_schema_tables = HashSet::new();
let db_lock = state
.0
.lock()
.map_err(|e| format!("Mutex Lock Fehler: {}", e))?;
let conn = db_lock.as_ref().ok_or("Keine Datenbankverbindung")?;
let tx = conn
.transaction()
.map_err(|e| ProxyError::TransactionError {
reason: e.to_string(),
})?;
/* let hlc_timestamp =
hlc_service
.new_timestamp_and_persist(&tx)
.map_err(|e| ProxyError::HlcError {
reason: e.to_string(),
})?; */
for statement in &mut ast_vec {
if let Some(table_name) = self.transform_statement(statement)? {
modified_schema_tables.insert(table_name);
}
}
for statement in ast_vec {
let final_sql = statement.to_string();
tx.execute(&final_sql, [])
.map_err(|e| ProxyError::ExecutionError {
sql: final_sql,
reason: e.to_string(),
})?;
}
tx.commit().map_err(|e| ProxyError::TransactionError {
reason: e.to_string(),
})?;
Ok(modified_schema_tables.into_iter().collect())
}
/// Wendet die Transformation auf ein einzelnes Statement an.
fn transform_statement(&self, stmt: &mut Statement) -> Result<Option<String>, ProxyError> {
match stmt {
Statement::Query(query) => {
if let SetExpr::Select(select) = &mut *query.body {
let mut tombstone_filters = Vec::new();
for twj in &select.from {
if let TableFactor::Table { name, alias, .. } = &twj.relation {
if self.is_audited_table(name) {
let table_idents = if let Some(a) = alias {
vec![a.name.clone()]
} else {
name.0
.iter()
.filter_map(|part| match part {
ObjectNamePart::Identifier(id) => Some(id.clone()),
_ => None,
})
.collect::<Vec<_>>()
};
let column_ident = Ident::new(TOMBSTONE_COLUMN);
let full_ident = [table_idents, vec![column_ident]].concat();
let filter = Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(full_ident)),
op: BinaryOperator::Eq,
right: Box::new(Expr::Value(
sqlparser::ast::Value::Number("1".to_string(), false)
.into(),
)),
};
tombstone_filters.push(filter);
}
}
}
if !tombstone_filters.is_empty() {
let combined_filter = tombstone_filters
.into_iter()
.reduce(|acc, expr| Expr::BinaryOp {
left: Box::new(acc),
op: BinaryOperator::And,
right: Box::new(expr),
})
.unwrap();
match &mut select.selection {
Some(existing) => {
*existing = Expr::BinaryOp {
left: Box::new(existing.clone()),
op: BinaryOperator::And,
right: Box::new(combined_filter),
};
}
None => {
select.selection = Some(combined_filter);
}
}
}
}
// TODO: UNION, EXCEPT etc. werden hier nicht behandelt
}
Statement::CreateTable(create_table) => {
if self.is_audited_table(&create_table.name) {
self.add_crdt_columns(&mut create_table.columns);
return Ok(Some(
create_table
.name
.to_string()
.trim_matches('`')
.trim_matches('"')
.to_string(),
));
}
}
Statement::Insert(insert_stmt) => {
if let TableObject::TableName(name) = &insert_stmt.table {
if self.is_audited_table(name) {
self.add_hlc_to_insert(insert_stmt);
}
}
}
Statement::Update {
table,
assignments,
from,
selection,
returning,
or,
} => {
if let TableFactor::Table { name, .. } = &table.relation {
if self.is_audited_table(&name) {
if let Some(ts) = hlc_timestamp {
assignments.push(self.create_hlc_assignment(ts));
}
}
}
*stmt = Statement::Update {
table: table.clone(),
assignments: assignments.clone(),
from: from.clone(),
selection: selection.clone(),
returning: returning.clone(),
or: *or,
};
}
Statement::Delete(del_stmt) => {
let table_name = self.extract_table_name_from_from(&del_stmt.from);
if let Some(name) = table_name {
if self.is_audited_table(&name) {
// GEÄNDERT: Übergibt den Zeitstempel an die Transformationsfunktion
self.transform_delete_to_update(stmt);
}
} else {
return Err(ProxyError::UnsupportedStatement {
description: "DELETE from non-table source or multiple tables".to_string(),
});
}
}
Statement::AlterTable { name, .. } => {
if self.is_audited_table(name) {
return Ok(Some(
name.to_string()
.trim_matches('`')
.trim_matches('"')
.to_string(),
));
}
}
_ => {}
}
Ok(None)
}
/// Fügt die Tombstone-Spalte zu einer Liste von Spaltendefinitionen hinzu.
fn add_tombstone_column(&self, columns: &mut Vec<ColumnDef>) {
if !columns
.iter()
.any(|c| c.name.value.to_lowercase() == TOMBSTONE_COLUMN)
{
columns.push(ColumnDef {
name: Ident::new(TOMBSTONE_COLUMN),
data_type: DataType::Integer(None),
options: vec![],
});
}
}
/// Prüft, ob eine Tabelle von der Proxy-Logik betroffen sein soll.
fn is_audited_table(&self, name: &ObjectName) -> bool {
let table_name = name.to_string().to_lowercase();
let table_name = table_name.trim_matches('`').trim_matches('"');
!EXCLUDED_TABLES.contains(&table_name)
}
fn extract_table_name_from_from(&self, from: &sqlparser::ast::FromTable) -> Option<ObjectName> {
let tables = match from {
sqlparser::ast::FromTable::WithFromKeyword(from)
| sqlparser::ast::FromTable::WithoutKeyword(from) => from,
};
if tables.len() == 1 {
if let TableFactor::Table { name, .. } = &tables[0].relation {
Some(name.clone())
} else {
None
}
} else {
None
}
}
fn extract_table_name(&self, from: &[TableWithJoins]) -> Option<ObjectName> {
if from.len() == 1 {
if let TableFactor::Table { name, .. } = &from[0].relation {
Some(name.clone())
} else {
None
}
} else {
None
}
}
fn create_tombstone_assignment(&self) -> Assignment {
Assignment {
target: AssignmentTarget::ColumnName(ObjectName(vec![ObjectNamePart::Identifier(
Ident::new(TOMBSTONE_COLUMN),
)])),
value: Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
}
}
fn add_tombstone_filter(&self, selection: &mut Option<Expr>) {
let tombstone_expr = Expr::BinaryOp {
left: Box::new(Expr::Identifier(Ident::new(TOMBSTONE_COLUMN))),
op: BinaryOperator::Eq,
// HIER IST DIE FINALE KORREKTUR:
right: Box::new(Expr::Value(Value::Number("0".to_string(), false).into())),
};
match selection {
Some(existing) => {
// Kombiniere mit AND, wenn eine WHERE-Klausel existiert
*selection = Some(Expr::BinaryOp {
left: Box::new(existing.clone()),
op: BinaryOperator::And,
right: Box::new(tombstone_expr),
});
}
None => {
// Setze neue WHERE-Klausel, wenn keine existiert
*selection = Some(tombstone_expr);
}
}
}
fn add_crdt_columns(&self, columns: &mut Vec<ColumnDef>) {
if !columns.iter().any(|c| c.name.value == TOMBSTONE_COLUMN) {
columns.push(ColumnDef {
name: Ident::new(TOMBSTONE_COLUMN),
data_type: DataType::Integer(None),
options: vec![],
});
}
if !columns.iter().any(|c| c.name.value == HLC_TIMESTAMP_COLUMN) {
columns.push(ColumnDef {
name: Ident::new(HLC_TIMESTAMP_COLUMN),
data_type: DataType::String(None),
options: vec![],
});
}
}
fn transform_delete_to_update(&self, stmt: &mut Statement) {
if let Statement::Delete(del_stmt) = stmt {
let table_to_update = match &del_stmt.from {
sqlparser::ast::FromTable::WithFromKeyword(from)
| sqlparser::ast::FromTable::WithoutKeyword(from) => from[0].clone(),
};
let assignments = vec![self.create_tombstone_assignment()];
*stmt = Statement::Update {
table: table_to_update,
assignments,
from: None,
selection: del_stmt.selection.clone(),
returning: None,
or: None,
};
}
}
fn add_hlc_to_insert(
&self,
insert_stmt: &mut sqlparser::ast::Insert,
ts: &Timestamp,
) -> Result<(), ProxyError> {
insert_stmt.columns.push(Ident::new(HLC_TIMESTAMP_COLUMN));
match insert_stmt.source.as_mut() {
Some(query) => match &mut *query.body {
// Dereferenziere die Box mit *
SetExpr::Values(values) => {
for row in &mut values.rows {
row.push(Expr::Value(
Value::SingleQuotedString(ts.to_string()).into(),
));
}
}
SetExpr::Select(select) => {
let hlc_expr = Expr::Value(Value::SingleQuotedString(ts.to_string()).into());
select.projection.push(SelectItem::UnnamedExpr(hlc_expr));
}
_ => {
return Err(ProxyError::UnsupportedStatement {
description: "INSERT with unsupported source".to_string(),
});
}
},
None => {
return Err(ProxyError::UnsupportedStatement {
description: "INSERT statement has no source".to_string(),
});
}
}
Ok(())
}
/// Erstellt eine Zuweisung `haex_modified_hlc = '...'`
// NEU: Hilfsfunktion
fn create_hlc_assignment(&self, ts: &Timestamp) -> Assignment {
Assignment {
target: AssignmentTarget::ColumnName(ObjectName(vec![ObjectNamePart::Identifier(
Ident::new(HLC_TIMESTAMP_COLUMN),
)])),
value: Expr::Value(Value::SingleQuotedString(ts.to_string()).into()),
}
}
}

View File

@ -0,0 +1,787 @@
use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN};
use crate::database::error::DatabaseError;
use crate::table_names::{TABLE_CRDT_CONFIGS, TABLE_CRDT_LOGS};
use sqlparser::ast::{
Assignment, AssignmentTarget, BinaryOperator, ColumnDef, DataType, Expr, Ident, Insert,
ObjectName, ObjectNamePart, SelectItem, SetExpr, Statement, TableFactor, TableObject, Value,
};
use std::borrow::Cow;
use std::collections::HashSet;
use uhlc::Timestamp;
/// Konfiguration für CRDT-Spalten
#[derive(Clone)]
struct CrdtColumns {
tombstone: &'static str,
hlc_timestamp: &'static str,
}
impl CrdtColumns {
const DEFAULT: Self = Self {
tombstone: TOMBSTONE_COLUMN,
hlc_timestamp: HLC_TIMESTAMP_COLUMN,
};
/// Erstellt einen Tombstone-Filter für eine Tabelle
fn create_tombstone_filter(&self, table_alias: Option<&str>) -> Expr {
let column_expr = match table_alias {
Some(alias) => {
// Qualifizierte Referenz: alias.tombstone
Expr::CompoundIdentifier(vec![Ident::new(alias), Ident::new(self.tombstone)])
}
None => {
// Einfache Referenz: tombstone
Expr::Identifier(Ident::new(self.tombstone))
}
};
Expr::BinaryOp {
left: Box::new(column_expr),
op: BinaryOperator::NotEq,
right: Box::new(Expr::Value(Value::Number("1".to_string(), false).into())),
}
}
/// Erstellt eine Tombstone-Zuweisung für UPDATE/DELETE
fn create_tombstone_assignment(&self) -> Assignment {
Assignment {
target: AssignmentTarget::ColumnName(ObjectName(vec![ObjectNamePart::Identifier(
Ident::new(self.tombstone),
)])),
value: Expr::Value(Value::Number("1".to_string(), false).into()),
}
}
/// Erstellt eine HLC-Zuweisung für UPDATE/DELETE
fn create_hlc_assignment(&self, timestamp: &Timestamp) -> Assignment {
Assignment {
target: AssignmentTarget::ColumnName(ObjectName(vec![ObjectNamePart::Identifier(
Ident::new(self.hlc_timestamp),
)])),
value: Expr::Value(Value::SingleQuotedString(timestamp.to_string()).into()),
}
}
/// Fügt CRDT-Spalten zu einer Tabellendefinition hinzu
fn add_to_table_definition(&self, columns: &mut Vec<ColumnDef>) {
if !columns.iter().any(|c| c.name.value == self.tombstone) {
columns.push(ColumnDef {
name: Ident::new(self.tombstone),
data_type: DataType::Integer(None),
options: vec![],
});
}
if !columns.iter().any(|c| c.name.value == self.hlc_timestamp) {
columns.push(ColumnDef {
name: Ident::new(self.hlc_timestamp),
data_type: DataType::String(None),
options: vec![],
});
}
}
}
pub struct CrdtTransformer {
columns: CrdtColumns,
excluded_tables: HashSet<&'static str>,
}
impl CrdtTransformer {
pub fn new() -> Self {
let mut excluded_tables = HashSet::new();
excluded_tables.insert(TABLE_CRDT_CONFIGS);
excluded_tables.insert(TABLE_CRDT_LOGS);
Self {
columns: CrdtColumns::DEFAULT,
excluded_tables,
}
}
/// Prüft, ob eine Tabelle CRDT-Synchronisation unterstützen soll
fn is_crdt_sync_table(&self, name: &ObjectName) -> bool {
let table_name = self.normalize_table_name(name);
!self.excluded_tables.contains(table_name.as_ref())
}
/// Normalisiert Tabellennamen (entfernt Anführungszeichen)
fn normalize_table_name(&self, name: &ObjectName) -> Cow<str> {
let name_str = name.to_string().to_lowercase();
Cow::Owned(name_str.trim_matches('`').trim_matches('"').to_string())
}
pub fn transform_select_statement(&self, stmt: &mut Statement) -> Result<(), DatabaseError> {
match stmt {
Statement::Query(query) => self.transform_query_recursive(query),
// Fange alle anderen Fälle ab und gib einen Fehler zurück
_ => Err(DatabaseError::UnsupportedStatement {
statement_type: format!("{:?}", stmt)
.split('(')
.next()
.unwrap_or("")
.to_string(),
description: "This operation only accepts SELECT statements.".to_string(),
}),
}
}
pub fn transform_execute_statement(
&self,
stmt: &mut Statement,
hlc_timestamp: &Timestamp,
) -> Result<Option<String>, DatabaseError> {
match stmt {
Statement::CreateTable(create_table) => {
if self.is_crdt_sync_table(&create_table.name) {
self.columns
.add_to_table_definition(&mut create_table.columns);
Ok(Some(
self.normalize_table_name(&create_table.name).into_owned(),
))
} else {
Ok(None)
}
}
Statement::Insert(insert_stmt) => {
if let TableObject::TableName(name) = &insert_stmt.table {
if self.is_crdt_sync_table(name) {
self.transform_insert(insert_stmt, hlc_timestamp)?;
}
}
Ok(None)
}
Statement::Update {
table, assignments, ..
} => {
if let TableFactor::Table { name, .. } = &table.relation {
if self.is_crdt_sync_table(name) {
assignments.push(self.columns.create_hlc_assignment(hlc_timestamp));
}
}
Ok(None)
}
Statement::Delete(del_stmt) => {
if let Some(table_name) = self.extract_table_name_from_delete(del_stmt) {
if self.is_crdt_sync_table(&table_name) {
self.transform_delete_to_update(stmt, hlc_timestamp)?;
}
Ok(None)
} else {
Err(DatabaseError::UnsupportedStatement {
statement_type: "DELETE".to_string(),
description: "DELETE from non-table source or multiple tables".to_string(),
})
}
}
Statement::AlterTable { name, .. } => {
if self.is_crdt_sync_table(name) {
Ok(Some(self.normalize_table_name(name).into_owned()))
} else {
Ok(None)
}
}
_ => Ok(None),
}
}
/// Transformiert Query-Statements (fügt Tombstone-Filter hinzu)
fn transform_query_recursive(
&self,
query: &mut sqlparser::ast::Query,
) -> Result<(), DatabaseError> {
self.add_tombstone_filters_recursive(&mut query.body)
}
/// Rekursive Behandlung aller SetExpr-Typen mit vollständiger Subquery-Unterstützung
fn add_tombstone_filters_recursive(&self, set_expr: &mut SetExpr) -> Result<(), DatabaseError> {
match set_expr {
SetExpr::Select(select) => {
self.add_tombstone_filters_to_select(select)?;
// Transformiere auch Subqueries in Projektionen
for projection in &mut select.projection {
match projection {
SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
self.transform_expression_subqueries(expr)?;
}
_ => {} // Wildcard projections ignorieren
}
}
// Transformiere Subqueries in WHERE
if let Some(where_clause) = &mut select.selection {
self.transform_expression_subqueries(where_clause)?;
}
// Transformiere Subqueries in GROUP BY
match &mut select.group_by {
sqlparser::ast::GroupByExpr::All(_) => {
// GROUP BY ALL - keine Expressions zu transformieren
}
sqlparser::ast::GroupByExpr::Expressions(exprs, _) => {
for group_expr in exprs {
self.transform_expression_subqueries(group_expr)?;
}
}
}
// Transformiere Subqueries in HAVING
if let Some(having) = &mut select.having {
self.transform_expression_subqueries(having)?;
}
}
SetExpr::SetOperation { left, right, .. } => {
self.add_tombstone_filters_recursive(left)?;
self.add_tombstone_filters_recursive(right)?;
}
SetExpr::Query(query) => {
self.add_tombstone_filters_recursive(&mut query.body)?;
}
SetExpr::Values(values) => {
// Transformiere auch Subqueries in Values-Listen
for row in &mut values.rows {
for expr in row {
self.transform_expression_subqueries(expr)?;
}
}
}
_ => {} // Andere Fälle
}
Ok(())
}
/// Transformiert Subqueries innerhalb von Expressions
fn transform_expression_subqueries(&self, expr: &mut Expr) -> Result<(), DatabaseError> {
match expr {
// Einfache Subqueries
Expr::Subquery(query) => {
self.add_tombstone_filters_recursive(&mut query.body)?;
}
// EXISTS Subqueries
Expr::Exists { subquery, .. } => {
self.add_tombstone_filters_recursive(&mut subquery.body)?;
}
// IN Subqueries
Expr::InSubquery {
expr: left_expr,
subquery,
..
} => {
self.transform_expression_subqueries(left_expr)?;
self.add_tombstone_filters_recursive(&mut subquery.body)?;
}
// ANY/ALL Subqueries
Expr::AnyOp { left, right, .. } | Expr::AllOp { left, right, .. } => {
self.transform_expression_subqueries(left)?;
self.transform_expression_subqueries(right)?;
}
// Binäre Operationen
Expr::BinaryOp { left, right, .. } => {
self.transform_expression_subqueries(left)?;
self.transform_expression_subqueries(right)?;
}
// Unäre Operationen
Expr::UnaryOp {
expr: inner_expr, ..
} => {
self.transform_expression_subqueries(inner_expr)?;
}
// Verschachtelte Ausdrücke
Expr::Nested(nested) => {
self.transform_expression_subqueries(nested)?;
}
// CASE-Ausdrücke
Expr::Case {
operand,
conditions,
else_result,
..
} => {
if let Some(op) = operand {
self.transform_expression_subqueries(op)?;
}
for case_when in conditions {
self.transform_expression_subqueries(&mut case_when.condition)?;
self.transform_expression_subqueries(&mut case_when.result)?;
}
if let Some(else_res) = else_result {
self.transform_expression_subqueries(else_res)?;
}
}
// Funktionsaufrufe
Expr::Function(func) => match &mut func.args {
sqlparser::ast::FunctionArguments::List(sqlparser::ast::FunctionArgumentList {
args,
..
}) => {
for arg in args {
if let sqlparser::ast::FunctionArg::Unnamed(
sqlparser::ast::FunctionArgExpr::Expr(expr),
) = arg
{
self.transform_expression_subqueries(expr)?;
}
}
}
_ => {}
},
// BETWEEN
Expr::Between {
expr: main_expr,
low,
high,
..
} => {
self.transform_expression_subqueries(main_expr)?;
self.transform_expression_subqueries(low)?;
self.transform_expression_subqueries(high)?;
}
// IN Liste
Expr::InList {
expr: main_expr,
list,
..
} => {
self.transform_expression_subqueries(main_expr)?;
for list_expr in list {
self.transform_expression_subqueries(list_expr)?;
}
}
// IS NULL/IS NOT NULL
Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
self.transform_expression_subqueries(inner)?;
}
// Andere Expression-Typen benötigen keine Transformation
_ => {}
}
Ok(())
}
/// Fügt Tombstone-Filter zu SELECT-Statements hinzu (nur wenn nicht explizit in WHERE gesetzt)
fn add_tombstone_filters_to_select(
&self,
select: &mut sqlparser::ast::Select,
) -> Result<(), DatabaseError> {
// Sammle alle CRDT-Tabellen mit ihren Aliasen
let mut crdt_tables = Vec::new();
for twj in &select.from {
if let TableFactor::Table { name, alias, .. } = &twj.relation {
if self.is_crdt_sync_table(name) {
let table_alias = alias.as_ref().map(|a| a.name.value.as_str());
crdt_tables.push((name.clone(), table_alias));
}
}
}
if crdt_tables.is_empty() {
return Ok(());
}
// Prüfe, welche Tombstone-Spalten bereits in der WHERE-Klausel referenziert werden
let explicitly_filtered_tables = if let Some(where_clause) = &select.selection {
self.find_explicitly_filtered_tombstone_tables(where_clause, &crdt_tables)
} else {
HashSet::new()
};
// Erstelle Filter nur für Tabellen, die noch nicht explizit gefiltert werden
let mut tombstone_filters = Vec::new();
for (table_name, table_alias) in crdt_tables {
let table_name_string = table_name.to_string();
let table_key = table_alias.unwrap_or(&table_name_string);
if !explicitly_filtered_tables.contains(table_key) {
tombstone_filters.push(self.columns.create_tombstone_filter(table_alias));
}
}
// Füge die automatischen Filter hinzu
if !tombstone_filters.is_empty() {
let combined_filter = tombstone_filters
.into_iter()
.reduce(|acc, expr| Expr::BinaryOp {
left: Box::new(acc),
op: BinaryOperator::And,
right: Box::new(expr),
})
.unwrap();
match &mut select.selection {
Some(existing) => {
*existing = Expr::BinaryOp {
left: Box::new(existing.clone()),
op: BinaryOperator::And,
right: Box::new(combined_filter),
};
}
None => {
select.selection = Some(combined_filter);
}
}
}
Ok(())
}
/// Findet alle Tabellen, die bereits explizit Tombstone-Filter in der WHERE-Klausel haben
fn find_explicitly_filtered_tombstone_tables(
&self,
where_expr: &Expr,
crdt_tables: &[(ObjectName, Option<&str>)],
) -> HashSet<String> {
let mut filtered_tables = HashSet::new();
self.scan_expression_for_tombstone_references(
where_expr,
crdt_tables,
&mut filtered_tables,
);
filtered_tables
}
/// Rekursiv durchsucht einen Expression-Baum nach Tombstone-Spalten-Referenzen
fn scan_expression_for_tombstone_references(
&self,
expr: &Expr,
crdt_tables: &[(ObjectName, Option<&str>)],
filtered_tables: &mut HashSet<String>,
) {
match expr {
// Einfache Spaltenreferenz: tombstone = ?
Expr::Identifier(ident) => {
if ident.value == self.columns.tombstone {
// Wenn keine Tabelle spezifiziert ist und es nur eine CRDT-Tabelle gibt
if crdt_tables.len() == 1 {
let table_name_str = crdt_tables[0].0.to_string();
let table_key = crdt_tables[0].1.unwrap_or(&table_name_str);
filtered_tables.insert(table_key.to_string());
}
}
}
// Qualifizierte Spaltenreferenz: table.tombstone = ? oder alias.tombstone = ?
Expr::CompoundIdentifier(idents) => {
if idents.len() == 2 && idents[1].value == self.columns.tombstone {
let table_ref = &idents[0].value;
// Prüfe, ob es eine unserer CRDT-Tabellen ist (nach Name oder Alias)
for (table_name, alias) in crdt_tables {
let table_name_str = table_name.to_string();
if table_ref == &table_name_str || alias.map_or(false, |a| a == table_ref) {
filtered_tables.insert(table_ref.clone());
break;
}
}
}
}
// Binäre Operationen: AND, OR, etc.
Expr::BinaryOp { left, right, .. } => {
self.scan_expression_for_tombstone_references(left, crdt_tables, filtered_tables);
self.scan_expression_for_tombstone_references(right, crdt_tables, filtered_tables);
}
// Unäre Operationen: NOT, etc.
Expr::UnaryOp { expr, .. } => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
}
// Verschachtelte Ausdrücke
Expr::Nested(nested) => {
self.scan_expression_for_tombstone_references(nested, crdt_tables, filtered_tables);
}
// IN-Klauseln
Expr::InList { expr, .. } => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
}
// BETWEEN-Klauseln
Expr::Between { expr, .. } => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
}
// IS NULL/IS NOT NULL
Expr::IsNull(expr) | Expr::IsNotNull(expr) => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
}
// Funktionsaufrufe - KORRIGIERT
Expr::Function(func) => {
match &func.args {
sqlparser::ast::FunctionArguments::List(
sqlparser::ast::FunctionArgumentList { args, .. },
) => {
for arg in args {
if let sqlparser::ast::FunctionArg::Unnamed(
sqlparser::ast::FunctionArgExpr::Expr(expr),
) = arg
{
self.scan_expression_for_tombstone_references(
expr,
crdt_tables,
filtered_tables,
);
}
}
}
_ => {} // Andere FunctionArguments-Varianten ignorieren
}
}
// CASE-Ausdrücke - KORRIGIERT
Expr::Case {
operand,
conditions,
else_result,
..
} => {
if let Some(op) = operand {
self.scan_expression_for_tombstone_references(op, crdt_tables, filtered_tables);
}
for case_when in conditions {
self.scan_expression_for_tombstone_references(
&case_when.condition,
crdt_tables,
filtered_tables,
);
self.scan_expression_for_tombstone_references(
&case_when.result,
crdt_tables,
filtered_tables,
);
}
if let Some(else_res) = else_result {
self.scan_expression_for_tombstone_references(
else_res,
crdt_tables,
filtered_tables,
);
}
}
// Subqueries mit vollständiger Unterstützung
Expr::Subquery(query) => {
self.transform_query_recursive_for_tombstone_analysis(
query,
crdt_tables,
filtered_tables,
)
.ok();
}
// EXISTS/NOT EXISTS Subqueries
Expr::Exists { subquery, .. } => {
self.transform_query_recursive_for_tombstone_analysis(
subquery,
crdt_tables,
filtered_tables,
)
.ok();
}
// IN/NOT IN Subqueries
Expr::InSubquery { expr, subquery, .. } => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
self.transform_query_recursive_for_tombstone_analysis(
subquery,
crdt_tables,
filtered_tables,
)
.ok();
}
// ANY/ALL Subqueries
Expr::AnyOp { left, right, .. } | Expr::AllOp { left, right, .. } => {
self.scan_expression_for_tombstone_references(left, crdt_tables, filtered_tables);
self.scan_expression_for_tombstone_references(right, crdt_tables, filtered_tables);
}
// Andere Expression-Typen ignorieren wir für jetzt
_ => {}
}
}
/// Analysiert eine Subquery und sammelt Tombstone-Referenzen
fn transform_query_recursive_for_tombstone_analysis(
&self,
query: &sqlparser::ast::Query,
crdt_tables: &[(ObjectName, Option<&str>)],
filtered_tables: &mut HashSet<String>,
) -> Result<(), DatabaseError> {
self.analyze_set_expr_for_tombstone_references(&query.body, crdt_tables, filtered_tables)
}
/// Rekursiv analysiert SetExpr für Tombstone-Referenzen
fn analyze_set_expr_for_tombstone_references(
&self,
set_expr: &SetExpr,
crdt_tables: &[(ObjectName, Option<&str>)],
filtered_tables: &mut HashSet<String>,
) -> Result<(), DatabaseError> {
match set_expr {
SetExpr::Select(select) => {
// Analysiere WHERE-Klausel
if let Some(where_clause) = &select.selection {
self.scan_expression_for_tombstone_references(
where_clause,
crdt_tables,
filtered_tables,
);
}
// Analysiere alle Projektionen (können auch Subqueries enthalten)
for projection in &select.projection {
match projection {
SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
self.scan_expression_for_tombstone_references(
expr,
crdt_tables,
filtered_tables,
);
}
_ => {} // Wildcard projections ignorieren
}
}
// Analysiere GROUP BY
match &select.group_by {
sqlparser::ast::GroupByExpr::All(_) => {
// GROUP BY ALL - keine Expressions zu analysieren
}
sqlparser::ast::GroupByExpr::Expressions(exprs, _) => {
for group_expr in exprs {
self.scan_expression_for_tombstone_references(
group_expr,
crdt_tables,
filtered_tables,
);
}
}
}
// Analysiere HAVING
if let Some(having) = &select.having {
self.scan_expression_for_tombstone_references(
having,
crdt_tables,
filtered_tables,
);
}
}
SetExpr::SetOperation { left, right, .. } => {
self.analyze_set_expr_for_tombstone_references(left, crdt_tables, filtered_tables)?;
self.analyze_set_expr_for_tombstone_references(
right,
crdt_tables,
filtered_tables,
)?;
}
SetExpr::Query(query) => {
self.analyze_set_expr_for_tombstone_references(
&query.body,
crdt_tables,
filtered_tables,
)?;
}
SetExpr::Values(values) => {
// Analysiere Values-Listen
for row in &values.rows {
for expr in row {
self.scan_expression_for_tombstone_references(
expr,
crdt_tables,
filtered_tables,
);
}
}
}
_ => {} // Andere Varianten
}
Ok(())
}
/// Transformiert INSERT-Statements (fügt HLC-Timestamp hinzu)
fn transform_insert(
&self,
insert_stmt: &mut Insert,
timestamp: &Timestamp,
) -> Result<(), DatabaseError> {
insert_stmt
.columns
.push(Ident::new(self.columns.hlc_timestamp));
match insert_stmt.source.as_mut() {
Some(query) => match &mut *query.body {
SetExpr::Values(values) => {
for row in &mut values.rows {
row.push(Expr::Value(
Value::SingleQuotedString(timestamp.to_string()).into(),
));
}
}
SetExpr::Select(select) => {
let hlc_expr =
Expr::Value(Value::SingleQuotedString(timestamp.to_string()).into());
select.projection.push(SelectItem::UnnamedExpr(hlc_expr));
}
_ => {
return Err(DatabaseError::UnsupportedStatement {
statement_type: "INSERT".to_string(),
description: "INSERT with unsupported source type".to_string(),
});
}
},
None => {
return Err(DatabaseError::UnsupportedStatement {
statement_type: "INSERT".to_string(),
description: "INSERT statement has no source".to_string(),
});
}
}
Ok(())
}
/// Transformiert DELETE zu UPDATE (soft delete)
fn transform_delete_to_update(
&self,
stmt: &mut Statement,
timestamp: &Timestamp,
) -> Result<(), DatabaseError> {
if let Statement::Delete(del_stmt) = stmt {
let table_to_update = match &del_stmt.from {
sqlparser::ast::FromTable::WithFromKeyword(from)
| sqlparser::ast::FromTable::WithoutKeyword(from) => {
if from.len() == 1 {
from[0].clone()
} else {
return Err(DatabaseError::UnsupportedStatement {
statement_type: "DELETE".to_string(),
description: "DELETE with multiple tables not supported".to_string(),
});
}
}
};
let assignments = vec![
self.columns.create_tombstone_assignment(),
self.columns.create_hlc_assignment(timestamp),
];
*stmt = Statement::Update {
table: table_to_update,
assignments,
from: None,
selection: del_stmt.selection.clone(),
returning: None,
or: None,
};
}
Ok(())
}
/// Extrahiert Tabellennamen aus DELETE-Statement
fn extract_table_name_from_delete(
&self,
del_stmt: &sqlparser::ast::Delete,
) -> Option<ObjectName> {
let tables = match &del_stmt.from {
sqlparser::ast::FromTable::WithFromKeyword(from)
| sqlparser::ast::FromTable::WithoutKeyword(from) => from,
};
if tables.len() == 1 {
if let TableFactor::Table { name, .. } = &tables[0].relation {
Some(name.clone())
} else {
None
}
} else {
None
}
}
}

View File

@ -77,53 +77,6 @@ pub enum TriggerSetupResult {
TableNotFound,
}
/* fn set_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
let sql = format!(
"INSERT OR REPLACE INTO \"{meta_table}\" (key, value) VALUES (?, '1');",
meta_table = TABLE_CRDT_CONFIGS
);
conn.execute(&sql, [SYNC_ACTIVE_KEY])?;
Ok(())
} */
/* fn clear_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
let sql = format!(
"DELETE FROM \"{meta_table}\" WHERE key = ?;",
meta_table = TABLE_CRDT_CONFIGS
);
conn.execute(&sql, [SYNC_ACTIVE_KEY])?;
Ok(())
} */
/// Führt eine Aktion aus, während die Trigger temporär deaktiviert sind.
/// Diese Funktion stellt sicher, dass die Trigger auch bei einem Absturz (Panic)
/// wieder aktiviert werden.
/* pub fn with_triggers_paused<F, R>(conn: &mut Connection, action: F) -> RusqliteResult<R>
where
F: FnOnce(&mut Connection) -> RusqliteResult<R>,
{
// AssertUnwindSafe wird benötigt, um den Mutex über eine Panic-Grenze hinweg zu verwenden.
// Wir fangen einen möglichen Panic in `action` ab.
let result = panic::catch_unwind(AssertUnwindSafe(|| action(conn)));
// Diese Aktion MUSS immer ausgeführt werden, egal ob `action` erfolgreich war oder nicht.
match result {
Ok(res) => res, // Alles gut, gib das Ergebnis von `action` zurück.
Err(e) => panic::resume_unwind(e), // Ein Panic ist aufgetreten, wir geben ihn weiter, nachdem wir aufgeräumt haben.
}
} */
/// Erstellt die benötigte Meta-Tabelle, falls sie nicht existiert.
/* pub fn setup_meta_table(conn: &mut Connection) -> RusqliteResult<()> {
let sql = format!(
"CREATE TABLE IF NOT EXISTS \"{meta_table}\" (key TEXT PRIMARY KEY, value TEXT) WITHOUT ROWID;",
meta_table = TABLE_CRDT_CONFIGS
);
conn.execute(&sql, [])?;
Ok(())
} */
#[derive(Debug)]
struct ColumnInfo {
name: String,
@ -145,19 +98,11 @@ fn is_safe_identifier(name: &str) -> bool {
/// Richtet CRDT-Trigger für eine einzelne Tabelle ein.
pub fn setup_triggers_for_table(
conn: &mut Connection,
tx: &Transaction,
table_name: &str,
recreate: &bool,
recreate: bool,
) -> Result<TriggerSetupResult, CrdtSetupError> {
if !is_safe_identifier(table_name) {
return Err(rusqlite::Error::InvalidParameterName(format!(
"Invalid or unsafe table name provided: {}",
table_name
))
.into());
}
let columns = get_table_schema(conn, table_name)?;
let columns = get_table_schema(tx, table_name)?;
if columns.is_empty() {
return Ok(TriggerSetupResult::TableNotFound);
@ -198,23 +143,26 @@ pub fn setup_triggers_for_table(
let insert_trigger_sql = generate_insert_trigger_sql(table_name, &pks, &cols_to_track);
let update_trigger_sql = generate_update_trigger_sql(table_name, &pks, &cols_to_track);
let sql_batch = format!("{}\n{}", insert_trigger_sql, update_trigger_sql);
// Führe die Erstellung innerhalb einer Transaktion aus
let tx = conn.transaction()?;
if *recreate {
if recreate {
drop_triggers_for_table(&tx, table_name)?;
}
tx.execute_batch(&sql_batch)?;
tx.commit()?;
tx.execute_batch(&insert_trigger_sql)?;
tx.execute_batch(&update_trigger_sql)?;
Ok(TriggerSetupResult::Success)
}
/// Holt das Schema für eine gegebene Tabelle.
/// WICHTIG: Dies ist eine private Hilfsfunktion. Sie geht davon aus, dass `table_name`
/// bereits vom öffentlichen Aufrufer (setup_triggers_for_table) validiert wurde.
fn get_table_schema(conn: &Connection, table_name: &str) -> RusqliteResult<Vec<ColumnInfo>> {
if !is_safe_identifier(table_name) {
return Err(rusqlite::Error::InvalidParameterName(format!(
"Invalid or unsafe table name provided: {}",
table_name
))
.into());
}
let sql = format!("PRAGMA table_info(\"{}\");", table_name);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], ColumnInfo::from_row)?;
@ -399,74 +347,3 @@ fn generate_update_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
END;"
)
}
/* fn generate_update_trigger_sql(table_name: &str, pks: &[String], cols: &[String]) -> String {
let pk_json_payload = pks
.iter()
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
.collect::<Vec<_>>()
.join(", ");
let column_updates = cols.iter().fold(String::new(), |mut acc, col| {
writeln!(&mut acc, " IF NEW.\"{column}\" IS NOT OLD.\"{column}\" THEN INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value) VALUES (NEW.\"{hlc_col}\", 'UPDATE', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"), json_object('value', OLD.\"{column}\")); END IF;",
log_table = TABLE_CRDT_LOGS,
hlc_col = HLC_TIMESTAMP_COLUMN,
table = table_name,
pk_payload = pk_json_payload,
column = col
).unwrap();
acc
});
let soft_delete_logic = format!(
" IF NEW.\"{tombstone_col}\" = 1 AND OLD.\"{tombstone_col}\" = 0 THEN INSERT INTO {log_table} (hlc_timestamp, op_type, table_name, row_pk) VALUES (NEW.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload})); END IF;",
log_table = TABLE_CRDT_LOGS,
hlc_col = HLC_TIMESTAMP_COLUMN,
tombstone_col = TOMBSTONE_COLUMN,
table = table_name,
pk_payload = pk_json_payload
);
let trigger_name = UPDATE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
format!(
"CREATE TRIGGER IF NOT EXISTS \"{trigger_name}\"
AFTER UPDATE ON \"{table_name}\"
WHEN (SELECT value FROM \"{config_table}\" WHERE key = '{sync_key}') IS NOT '1'
FOR EACH ROW
BEGIN
{column_updates}
{soft_delete_logic}
END;",
config_table = TABLE_CRDT_CONFIGS,
sync_key = SYNC_ACTIVE_KEY
)
}
*/
/*
/// Durchläuft alle `haex_`-Tabellen und richtet die CRDT-Trigger ein.
pub fn generate_haex_triggers(conn: &mut Connection) -> Result<(), rusqlite::Error> {
println!("🔄 Setup CRDT triggers...");
let table_names: Vec<String> = {
let mut stmt = conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'haex_%' AND name NOT LIKE 'haex_crdt_%';")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect::<RusqliteResult<Vec<String>>>()?
};
for table_name in table_names {
if table_name == TABLE_CRDT_CONFIGS {
continue;
}
println!("➡️ Processing table: {}", table_name);
match setup_triggers_for_table(conn, &table_name) {
Ok(TriggerSetupResult::Success) => {
println!(" ✅ Triggers created for {}", table_name)
}
Ok(TriggerSetupResult::TableNotFound) => {
println!(" Table {} not found, skipping.", table_name)
}
Err(e) => println!(" ❌ Could not set up triggers for {}: {}", table_name, e),
}
}
println!("✨ Done setting up CRDT triggers.");
Ok(())
} */

View File

@ -1,276 +0,0 @@
// Wir binden die Konstanten aus unserem generierten Modul ein.
// `crate` bezieht sich auf das Wurzelverzeichnis unseres Crates (src-tauri/src).
use crate::tableNames::*;
use rusqlite::{Connection, Result as RusqliteResult, Row};
use serde::Serialize;
use std::error::Error;
use std::fmt::{self, Display, Formatter, Write};
use std::panic::{self, AssertUnwindSafe};
use ts_rs::TS;
// Harte Konstanten, die nicht aus der JSON-Datei kommen, da sie Teil der internen Logik sind.
const SYNC_ACTIVE_KEY: &str = "sync_active";
const TOMBSTONE_COLUMN: &str = "haex_tombstone";
const HLC_TIMESTAMP_COLUMN: &str = "haex_hlc_timestamp";
const INSERT_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_insert";
const UPDATE_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_update";
// --- Eigener Error-Typ für klares Fehler-Handling ---
#[derive(Debug)]
pub enum CrdtSetupError {
DatabaseError(rusqlite::Error),
TombstoneColumnMissing {
table_name: String,
column_name: String,
},
PrimaryKeyMissing {
table_name: String,
},
}
impl Display for CrdtSetupError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CrdtSetupError::DatabaseError(e) => write!(f, "Database error: {}", e),
CrdtSetupError::TombstoneColumnMissing {
table_name,
column_name,
} => write!(
f,
"Table '{}' is missing the required tombstone column '{}'",
table_name, column_name
),
CrdtSetupError::PrimaryKeyMissing { table_name } => {
write!(f, "Table '{}' has no primary key", table_name)
}
}
}
}
impl Error for CrdtSetupError {}
impl From<rusqlite::Error> for CrdtSetupError {
fn from(err: rusqlite::Error) -> Self {
CrdtSetupError::DatabaseError(err)
}
}
// --- Öffentliche Structs und Enums ---
#[derive(Debug, Serialize, TS)]
#[ts(export)]
pub enum TriggerSetupResult {
Success,
TableNotFound,
}
#[derive(Debug)]
struct ColumnInfo {
name: String,
is_pk: bool,
}
impl ColumnInfo {
fn from_row(row: &Row) -> RusqliteResult<Self> {
Ok(ColumnInfo {
name: row.get("name")?,
is_pk: row.get::<_, i64>("pk")? > 0,
})
}
}
// --- Öffentliche Funktionen für die Anwendungslogik ---
/// Erstellt die benötigten CRDT-Systemtabellen (z.B. die Config-Tabelle), falls sie nicht existieren.
/// Sollte beim Anwendungsstart einmalig aufgerufen werden.
pub fn setup_crdt_tables(conn: &mut Connection) -> RusqliteResult<()> {
let config_sql = format!(
"CREATE TABLE IF NOT EXISTS \"{config_table}\" (key TEXT PRIMARY KEY, value TEXT) WITHOUT ROWID;",
config_table = TABLE_CRDT_CONFIGS
);
conn.execute(&config_sql, [])?;
Ok(())
}
/// Führt eine Aktion aus, während die Trigger temporär deaktiviert sind.
/// Stellt sicher, dass die Trigger auch bei einem Absturz (Panic) wieder aktiviert werden.
pub fn with_triggers_paused<F, R>(conn: &mut Connection, action: F) -> RusqliteResult<R>
where
F: FnOnce(&mut Connection) -> RusqliteResult<R>,
{
set_sync_active(conn)?;
// `catch_unwind` fängt einen möglichen Panic in `action` ab.
let result = panic::catch_unwind(AssertUnwindSafe(|| action(conn)));
// Diese Aufräumaktion wird immer ausgeführt.
clear_sync_active(conn)?;
match result {
Ok(res) => res, // Alles gut, gib das Ergebnis von `action` zurück.
Err(e) => panic::resume_unwind(e), // Ein Panic ist aufgetreten, wir geben ihn weiter, nachdem wir aufgeräumt haben.
}
}
/// Analysiert alle `haex_`-Tabellen in der Datenbank und erstellt die notwendigen CRDT-Trigger.
pub fn generate_haex_triggers(conn: &mut Connection) -> RusqliteResult<()> {
println!("🔄 Setup CRDT triggers...");
let table_names: Vec<String> = {
let mut stmt = conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'haex_%' AND name NOT LIKE 'haex_crdt_%';")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect::<RusqliteResult<Vec<String>>>()?
};
for table_name in table_names {
// Überspringe die Config-Tabelle selbst, sie braucht keine Trigger.
if table_name == TABLE_CRDT_CONFIGS {
continue;
}
println!("➡️ Processing table: {}", table_name);
match setup_triggers_for_table(conn, &table_name) {
Ok(TriggerSetupResult::Success) => {
println!(" ✅ Triggers created for {}", table_name)
}
Ok(TriggerSetupResult::TableNotFound) => {
println!(" Table {} not found, skipping.", table_name)
}
Err(e) => println!(" ❌ Could not set up triggers for {}: {}", table_name, e),
}
}
println!("✨ Done setting up CRDT triggers.");
Ok(())
}
// --- Private Hilfsfunktionen ---
fn set_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
let sql = format!(
"INSERT OR REPLACE INTO \"{config_table}\" (key, value) VALUES (?, '1');",
config_table = TABLE_CRDT_CONFIGS
);
conn.execute(&sql, [SYNC_ACTIVE_KEY])?;
Ok(())
}
fn clear_sync_active(conn: &mut Connection) -> RusqliteResult<()> {
let sql = format!(
"DELETE FROM \"{config_table}\" WHERE key = ?;",
config_table = TABLE_CRDT_CONFIGS
);
conn.execute(&sql, [SYNC_ACTIVE_KEY])?;
Ok(())
}
fn is_safe_identifier(name: &str) -> bool {
!name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
}
fn setup_triggers_for_table(
conn: &mut Connection,
table_name: &str,
) -> Result<TriggerSetupResult, CrdtSetupError> {
if !is_safe_identifier(table_name) {
return Err(rusqlite::Error::InvalidParameterName(format!(
"Invalid table name: {}",
table_name
))
.into());
}
let columns = get_table_schema(conn, table_name)?;
if columns.is_empty() {
return Ok(TriggerSetupResult::TableNotFound);
}
if !columns.iter().any(|c| c.name == TOMBSTONE_COLUMN) {
return Err(CrdtSetupError::TombstoneColumnMissing {
table_name: table_name.to_string(),
column_name: TOMBSTONE_COLUMN.to_string(),
});
}
let pks: Vec<String> = columns
.iter()
.filter(|c| c.is_pk)
.map(|c| c.name.clone())
.collect();
if pks.is_empty() {
return Err(CrdtSetupError::PrimaryKeyMissing {
table_name: table_name.to_string(),
});
}
let cols_to_track: Vec<String> = columns
.iter()
.filter(|c| !c.is_pk && c.name != TOMBSTONE_COLUMN && c.name != HLC_TIMESTAMP_COLUMN)
.map(|c| c.name.clone())
.collect();
let insert_trigger_sql = generate_insert_trigger_sql(table_name, &pks, &cols_to_track);
let update_trigger_sql = generate_update_trigger_sql(table_name, &pks, &cols_to_track);
let drop_insert_trigger_sql =
drop_trigger_sql(INSERT_TRIGGER_TPL.replace("{TABLE_NAME}", table_name));
let drop_update_trigger_sql =
drop_trigger_sql(UPDATE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name));
let tx = conn.transaction()?;
tx.execute_batch(&format!(
"{}\n{}\n{}\n{}",
drop_insert_trigger_sql, drop_update_trigger_sql, insert_trigger_sql, update_trigger_sql
))?;
tx.commit()?;
Ok(TriggerSetupResult::Success)
}
fn get_table_schema(conn: &Connection, table_name: &str) -> RusqliteResult<Vec<ColumnInfo>> {
let sql = format!("PRAGMA table_info(\"{}\");", table_name);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], ColumnInfo::from_row)?;
rows.collect()
}
fn drop_trigger_sql(trigger_name: String) -> String {
format!("DROP TRIGGER IF EXISTS \"{}\";", trigger_name)
}
fn generate_insert_trigger_sql(table_name: &str, pks: &[String], cols: &[String]) -> String {
let pk_json_payload = pks
.iter()
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
.collect::<Vec<_>>()
.join(", ");
let column_inserts = cols.iter().fold(String::new(), |mut acc, col| {
writeln!(&mut acc, " INSERT INTO \"{log_table}\" (hlc_timestamp, op_type, table_name, row_pk, column_name, value) VALUES (NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"));", log_table = TABLE_CRDT_LOGS, hlc_col = HLC_TIMESTAMP_COLUMN, table = table_name, pk_payload = pk_json_payload, column = col).unwrap();
acc
});
let trigger_name = INSERT_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
format!(
"CREATE TRIGGER IF NOT EXISTS \"{trigger_name}\"\n"
+ " AFTER INSERT ON \"{table_name}\"\n"
+ " WHEN (SELECT value FROM \"{config_table}\" WHERE key = '{sync_key}') IS NOT '1'\n"
+ " FOR EACH ROW\n"
+ " BEGIN\n"
+ " {column_inserts}\n"
+ " END;",
config_table = TABLE_CRDT_CONFIGS,
sync_key = SYNC_ACTIVE_KEY
)
}
fn generate_update_trigger_sql(table_name: &str, pks: &[String], cols: &[String]) -> String {
let pk_json_payload = pks
.iter()
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
.collect::<Vec<_>>()
.join(", ");
let column_updates = cols.iter().fold(String::new(), |mut acc, col| {
writeln!(&mut acc, " IF NEW.\"{column}\" IS NOT OLD.\"{column}\" THEN INSERT INTO \"{log_table}\" (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value) VALUES (NEW.\"{hlc_col}\", 'UPDATE', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"), json_object('value', OLD.\"{column}\")); END IF;", log_table = TABLE_CRDT_LOGS, hlc_col = HLC_TIMESTAMP_COLUMN, table = table_name, pk_payload = pk_json_payload, column = col).unwrap();
acc
});
let soft_delete_logic = format!(
" IF NEW.\"{tombstone_col}\" = 1 AND OLD.\"{tombstone_col}\" = 0 THEN INSERT INTO \"{log_table}\" (hlc_timestamp, op_type, table_name, row_pk) VALUES (NEW.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload})); END IF;", log_table = TABLE_CRDT_LOGS, hlc_col = HLC_TIMESTAMP_COLUMN, tombstone_col = TOMBSTONE_COLUMN, table = table_name, pk_payload = pk_json_payload);
let trigger_name = UPDATE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);
format!(
"CREATE TRIGGER IF NOT EXISTS \"{trigger_name}\"\n"
+ " AFTER UPDATE ON \"{table_name}\"\n"
+ " WHEN (SELECT value FROM \"{config_table}\" WHERE key = '{sync_key}') IS NOT '1'\n"
+ " FOR EACH ROW\n"
+ " BEGIN\n"
+ " {column_updates}\n"
+ " {soft_delete_logic}\n"
+ " END;",
config_table = TABLE_CRDT_CONFIGS,
sync_key = SYNC_ACTIVE_KEY
)
}