fixed trigger

This commit is contained in:
2025-10-23 13:17:58 +02:00
parent 99ccadce00
commit 4f839aa856
42 changed files with 535 additions and 10695 deletions

View File

@ -1,21 +1,19 @@
// src-tauri/src/crdt/insert_transformer.rs
// INSERT-spezifische CRDT-Transformationen (ON CONFLICT, RETURNING)
use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN};
use crate::crdt::trigger::HLC_TIMESTAMP_COLUMN;
use crate::database::error::DatabaseError;
use sqlparser::ast::{Expr, Ident, Insert, SelectItem, SetExpr, Value};
use uhlc::Timestamp;
/// Helper-Struct für INSERT-Transformationen
pub struct InsertTransformer {
tombstone_column: &'static str,
hlc_timestamp_column: &'static str,
}
impl InsertTransformer {
pub fn new() -> Self {
Self {
tombstone_column: TOMBSTONE_COLUMN,
hlc_timestamp_column: HLC_TIMESTAMP_COLUMN,
}
}
@ -58,11 +56,9 @@ impl InsertTransformer {
insert_stmt: &mut Insert,
timestamp: &Timestamp,
) -> Result<(), DatabaseError> {
// Add both haex_timestamp and haex_tombstone columns if not exists
// Add haex_timestamp column if not exists
let hlc_col_index =
Self::find_or_add_column(&mut insert_stmt.columns, self.hlc_timestamp_column);
let tombstone_col_index =
Self::find_or_add_column(&mut insert_stmt.columns, self.tombstone_column);
// ON CONFLICT Logik komplett entfernt!
// Bei Hard Deletes gibt es keine Tombstone-Einträge mehr zu reaktivieren
@ -74,24 +70,15 @@ impl InsertTransformer {
for row in &mut values.rows {
let hlc_value =
Expr::Value(Value::SingleQuotedString(timestamp.to_string()).into());
let tombstone_value =
Expr::Value(Value::Number("0".to_string(), false).into());
Self::set_or_push_value(row, hlc_col_index, hlc_value);
Self::set_or_push_value(row, tombstone_col_index, tombstone_value);
}
}
SetExpr::Select(select) => {
let hlc_value =
Expr::Value(Value::SingleQuotedString(timestamp.to_string()).into());
let tombstone_value = Expr::Value(Value::Number("0".to_string(), false).into());
Self::set_or_push_projection(&mut select.projection, hlc_col_index, hlc_value);
Self::set_or_push_projection(
&mut select.projection,
tombstone_col_index,
tombstone_value,
);
}
_ => {
return Err(DatabaseError::UnsupportedStatement {

View File

@ -1,12 +1,12 @@
// src-tauri/src/crdt/transformer.rs
use crate::crdt::insert_transformer::InsertTransformer;
use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN};
use crate::crdt::trigger::HLC_TIMESTAMP_COLUMN;
use crate::database::error::DatabaseError;
use crate::table_names::{TABLE_CRDT_CONFIGS, TABLE_CRDT_LOGS};
use sqlparser::ast::{
Assignment, AssignmentTarget, ColumnDef, DataType, Expr, Ident, ObjectName,
ObjectNamePart, Statement, TableFactor, TableObject, Value,
Assignment, AssignmentTarget, ColumnDef, DataType, Expr, Ident, ObjectName, ObjectNamePart,
Statement, TableFactor, TableObject, Value,
};
use std::borrow::Cow;
use std::collections::HashSet;
@ -15,13 +15,11 @@ use uhlc::Timestamp;
/// Konfiguration für CRDT-Spalten
#[derive(Clone)]
struct CrdtColumns {
tombstone: &'static str,
hlc_timestamp: &'static str,
}
impl CrdtColumns {
const DEFAULT: Self = Self {
tombstone: TOMBSTONE_COLUMN,
hlc_timestamp: HLC_TIMESTAMP_COLUMN,
};
@ -37,13 +35,6 @@ impl CrdtColumns {
/// Fügt CRDT-Spalten zu einer Tabellendefinition hinzu
fn add_to_table_definition(&self, columns: &mut Vec<ColumnDef>) {
if !columns.iter().any(|c| c.name.value == self.tombstone) {
columns.push(ColumnDef {
name: Ident::new(self.tombstone),
data_type: DataType::Integer(None),
options: vec![],
});
}
if !columns.iter().any(|c| c.name.value == self.hlc_timestamp) {
columns.push(ColumnDef {
name: Ident::new(self.hlc_timestamp),
@ -86,7 +77,7 @@ impl CrdtTransformer {
// =================================================================
// ÖFFENTLICHE API-METHODEN
// =================================================================
pub fn transform_execute_statement_with_table_info(
&self,
stmt: &mut Statement,
@ -171,7 +162,7 @@ impl CrdtTransformer {
Statement::Update {
table, assignments, ..
} => {
if let TableFactor::Table { name, ..} = &table.relation {
if let TableFactor::Table { name, .. } = &table.relation {
if self.is_crdt_sync_table(name) {
assignments.push(self.columns.create_hlc_assignment(hlc_timestamp));
}

View File

@ -12,18 +12,16 @@ const UPDATE_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_update";
const DELETE_TRIGGER_TPL: &str = "z_crdt_{TABLE_NAME}_delete";
//const SYNC_ACTIVE_KEY: &str = "sync_active";
pub const TOMBSTONE_COLUMN: &str = "haex_tombstone";
pub const HLC_TIMESTAMP_COLUMN: &str = "haex_timestamp";
/// Name der custom UUID-Generierungs-Funktion (registriert in database::core::open_and_init_db)
pub const UUID_FUNCTION_NAME: &str = "gen_uuid";
#[derive(Debug)]
pub enum CrdtSetupError {
/// Kapselt einen Fehler, der von der rusqlite-Bibliothek kommt.
DatabaseError(rusqlite::Error),
/// Die Tabelle hat keine Tombstone-Spalte, was eine CRDT-Voraussetzung ist.
TombstoneColumnMissing {
table_name: String,
column_name: String,
},
HlcColumnMissing {
table_name: String,
column_name: String,
@ -37,14 +35,6 @@ impl Display for CrdtSetupError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CrdtSetupError::DatabaseError(e) => write!(f, "Database error: {}", e),
CrdtSetupError::TombstoneColumnMissing {
table_name,
column_name,
} => write!(
f,
"Table '{}' is missing the required tombstone column '{}'",
table_name, column_name
),
CrdtSetupError::HlcColumnMissing {
table_name,
column_name,
@ -110,13 +100,6 @@ pub fn setup_triggers_for_table(
return Ok(TriggerSetupResult::TableNotFound);
}
if !columns.iter().any(|c| c.name == TOMBSTONE_COLUMN) {
return Err(CrdtSetupError::TombstoneColumnMissing {
table_name: table_name.to_string(),
column_name: TOMBSTONE_COLUMN.to_string(),
});
}
if !columns.iter().any(|c| c.name == HLC_TIMESTAMP_COLUMN) {
return Err(CrdtSetupError::HlcColumnMissing {
table_name: table_name.to_string(),
@ -138,7 +121,7 @@ pub fn setup_triggers_for_table(
let cols_to_track: Vec<String> = columns
.iter()
.filter(|c| !c.is_pk) //&& c.name != TOMBSTONE_COLUMN && c.name != HLC_TIMESTAMP_COLUMN
.filter(|c| !c.is_pk)
.map(|c| c.name.clone())
.collect();
@ -269,9 +252,10 @@ fn generate_insert_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
let column_inserts = if cols.is_empty() {
// Nur PKs -> einfacher Insert ins Log
format!(
"INSERT INTO {log_table} (haex_timestamp, op_type, table_name, row_pks)
VALUES (NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}));",
"INSERT INTO {log_table} (id, haex_timestamp, op_type, table_name, row_pks)
VALUES ({uuid_fn}(), NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}));",
log_table = TABLE_CRDT_LOGS,
uuid_fn = UUID_FUNCTION_NAME,
hlc_col = HLC_TIMESTAMP_COLUMN,
table = table_name,
pk_payload = pk_json_payload
@ -280,9 +264,10 @@ fn generate_insert_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
cols.iter().fold(String::new(), |mut acc, col| {
writeln!(
&mut acc,
"INSERT INTO {log_table} (haex_timestamp, op_type, table_name, row_pks, column_name, new_value)
VALUES (NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"));",
"INSERT INTO {log_table} (id, haex_timestamp, op_type, table_name, row_pks, column_name, new_value)
VALUES ({uuid_fn}(), NEW.\"{hlc_col}\", 'INSERT', '{table}', json_object({pk_payload}), '{column}', json_object('value', NEW.\"{column}\"));",
log_table = TABLE_CRDT_LOGS,
uuid_fn = UUID_FUNCTION_NAME,
hlc_col = HLC_TIMESTAMP_COLUMN,
table = table_name,
pk_payload = pk_json_payload,
@ -324,11 +309,12 @@ fn generate_update_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
for col in cols {
writeln!(
&mut body,
"INSERT INTO {log_table} (haex_timestamp, op_type, table_name, row_pks, column_name, new_value, old_value)
SELECT NEW.\"{hlc_col}\", 'UPDATE', '{table}', json_object({pk_payload}), '{column}',
"INSERT INTO {log_table} (id, haex_timestamp, op_type, table_name, row_pks, column_name, new_value, old_value)
SELECT {uuid_fn}(), NEW.\"{hlc_col}\", 'UPDATE', '{table}', json_object({pk_payload}), '{column}',
json_object('value', NEW.\"{column}\"), json_object('value', OLD.\"{column}\")
WHERE NEW.\"{column}\" IS NOT OLD.\"{column}\";",
log_table = TABLE_CRDT_LOGS,
uuid_fn = UUID_FUNCTION_NAME,
hlc_col = HLC_TIMESTAMP_COLUMN,
table = table_name,
pk_payload = pk_json_payload,
@ -367,10 +353,11 @@ fn generate_delete_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
for col in cols {
writeln!(
&mut body,
"INSERT INTO {log_table} (haex_timestamp, op_type, table_name, row_pks, column_name, old_value)
VALUES (OLD.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload}), '{column}',
"INSERT INTO {log_table} (id, haex_timestamp, op_type, table_name, row_pks, column_name, old_value)
VALUES ({uuid_fn}(), OLD.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload}), '{column}',
json_object('value', OLD.\"{column}\"));",
log_table = TABLE_CRDT_LOGS,
uuid_fn = UUID_FUNCTION_NAME,
hlc_col = HLC_TIMESTAMP_COLUMN,
table = table_name,
pk_payload = pk_json_payload,
@ -381,13 +368,15 @@ fn generate_delete_trigger_sql(table_name: &str, pks: &[String], cols: &[String]
// Nur PKs -> minimales Delete Log
writeln!(
&mut body,
"INSERT INTO {log_table} (haex_timestamp, op_type, table_name, row_pks)
VALUES (OLD.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload}));",
"INSERT INTO {log_table} (id, haex_timestamp, op_type, table_name, row_pks)
VALUES ({uuid_fn}(), OLD.\"{hlc_col}\", 'DELETE', '{table}', json_object({pk_payload}));",
log_table = TABLE_CRDT_LOGS,
uuid_fn = UUID_FUNCTION_NAME,
hlc_col = HLC_TIMESTAMP_COLUMN,
table = table_name,
pk_payload = pk_json_payload
).unwrap();
)
.unwrap();
}
let trigger_name = DELETE_TRIGGER_TPL.replace("{TABLE_NAME}", table_name);