mirror of
https://github.com/haexhub/haex-hub.git
synced 2025-12-17 06:30:50 +01:00
zwischenstand
This commit is contained in:
22
src-tauri/src/crdt/log.rs
Normal file
22
src-tauri/src/crdt/log.rs
Normal file
@ -0,0 +1,22 @@
|
||||
// src/entities/crdt_log.rs
|
||||
use sea_orm::entity::prelude::*;
|
||||
|
||||
#[sea_orm(table_name = "crdt_log")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key, auto_increment = true)]
|
||||
pub id: i64,
|
||||
pub hlc_timestamp: String,
|
||||
pub op_type: String,
|
||||
pub table_name: String,
|
||||
pub row_pk: String, // Wird als JSON-String gespeichert
|
||||
#[sea_orm(nullable)]
|
||||
pub column_name: Option<String>,
|
||||
#[sea_orm(nullable)]
|
||||
pub value: Option<String>,
|
||||
#[sea_orm(nullable)]
|
||||
pub old_value: Option<String>,
|
||||
}
|
||||
|
||||
pub enum Relation {}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
165
src-tauri/src/crdt/proxy.rs
Normal file
165
src-tauri/src/crdt/proxy.rs
Normal file
@ -0,0 +1,165 @@
|
||||
// In src-tauri/src/sql_proxy.rs
|
||||
|
||||
use rusqlite::Connection;
|
||||
use sqlparser::ast::Statement;
|
||||
use sqlparser::ast::{ColumnDef, DataType, Expr, Ident, Query, Statement, TableWithJoins, Value};
|
||||
use sqlparser::dialect::SQLiteDialect;
|
||||
use sqlparser::parser::Parser;
|
||||
use sqlparser::visit_mut::{self, VisitorMut};
|
||||
use std::ops::ControlFlow;
|
||||
|
||||
// Der Name der Tombstone-Spalte als Konstante, um "Magic Strings" zu vermeiden.
|
||||
pub const TOMBSTONE_COLUMN_NAME: &str = "tombstone";
|
||||
const EXCLUDED_TABLES: &[&str] = &["crdt_log"];
|
||||
|
||||
// Die Hauptstruktur unseres Proxys.
|
||||
// Sie ist zustandslos, da wir uns gegen einen Schema-Cache entschieden haben.
|
||||
pub struct SqlProxy;
|
||||
|
||||
impl SqlProxy {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
// Die zentrale Ausführungsfunktion
|
||||
pub fn execute(&self, sql: &str, conn: &Connection) -> Result<(), String> {
|
||||
// 1. Parsen des SQL-Strings in einen oder mehrere ASTs.
|
||||
// Ein String kann mehrere, durch Semikolon getrennte Anweisungen enthalten.
|
||||
let dialect = SQLiteDialect {};
|
||||
let mut ast_vec =
|
||||
Parser::parse_sql(&dialect, sql).map_err(|e| format!("SQL-Parse-Fehler: {}", e))?;
|
||||
|
||||
// 2. Wir durchlaufen und transformieren jedes einzelne Statement im AST-Vektor.
|
||||
for statement in &mut ast_vec {
|
||||
self.transform_statement(statement)?;
|
||||
}
|
||||
|
||||
// 3. Ausführen der (möglicherweise modifizierten) Anweisungen in einer einzigen Transaktion.
|
||||
// Dies stellt sicher, dass alle Operationen atomar sind.
|
||||
let tx = conn.transaction().map_err(|e| e.to_string())?;
|
||||
for statement in ast_vec {
|
||||
let final_sql = statement.to_string();
|
||||
tx.execute(&final_sql)
|
||||
.map_err(|e| format!("DB-Ausführungsfehler bei '{}': {}", final_sql, e))?;
|
||||
|
||||
// Wenn es ein CREATE/ALTER TABLE war, müssen die Trigger neu erstellt werden.
|
||||
// Dies geschieht innerhalb derselben Transaktion.
|
||||
if let Statement::CreateTable { name, .. } | Statement::AlterTable { name, .. } =
|
||||
statement
|
||||
{
|
||||
let table_name = name.0.last().unwrap().value.clone();
|
||||
let trigger_manager = crate::trigger_manager::TriggerManager::new(&tx);
|
||||
trigger_manager
|
||||
.setup_triggers_for_table(&table_name)
|
||||
.map_err(|e| {
|
||||
format!("Trigger-Setup-Fehler für Tabelle '{}': {}", table_name, e)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
tx.commit().map_err(|e| e.to_string())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Diese Methode wendet die Transformation auf ein einzelnes Statement an.
|
||||
fn transform_statement(&self, statement: &mut Statement) -> Result<(), String> {
|
||||
let mut visitor = TombstoneVisitor;
|
||||
// `visit` durchläuft den AST und ruft die entsprechenden `visit_*_mut` Methoden auf.
|
||||
statement.visit(&mut visitor);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct TombstoneVisitor;
|
||||
|
||||
impl TombstoneVisitor {
|
||||
fn is_audited_table(&self, table_name: &str) -> bool {
|
||||
!EXCLUDED_TABLES.contains(&table_name.to_lowercase().as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl VisitorMut for TombstoneVisitor {
|
||||
type Break = ();
|
||||
|
||||
// Diese Methode wird für jedes Statement im AST aufgerufen
|
||||
fn visit_statement_mut(&mut self, stmt: &mut Statement) -> ControlFlow<Self::Break> {
|
||||
match stmt {
|
||||
// Fall 1: CREATE TABLE
|
||||
Statement::CreateTable { name, columns, .. } => {
|
||||
let table_name = name.0.last().unwrap().value.as_str();
|
||||
if self.is_audited_table(table_name) {
|
||||
// Füge die 'tombstone'-Spalte hinzu, wenn sie nicht existiert
|
||||
if !columns
|
||||
.iter()
|
||||
.any(|c| c.name.value.to_lowercase() == TOMBSTONE_COLUMN_NAME)
|
||||
{
|
||||
columns.push(ColumnDef {
|
||||
name: Ident::new(TOMBSTONE_COLUMN_NAME),
|
||||
data_type: DataType::Integer,
|
||||
collation: None,
|
||||
options: vec![], // Default ist 0
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fall 2: DELETE
|
||||
Statement::Delete(del_stmt) => {
|
||||
// Wandle das DELETE-Statement in ein UPDATE-Statement um
|
||||
let new_update = Statement::Update {
|
||||
table: del_stmt.from.clone(),
|
||||
assignments: vec![],
|
||||
value: Box::new(Expr::Value(Value::Number("1".to_string(), false))),
|
||||
from: None,
|
||||
selection: del_stmt.selection.clone(),
|
||||
returning: None,
|
||||
};
|
||||
// Ersetze das aktuelle Statement im AST
|
||||
*stmt = new_update;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Setze die Traversierung für untergeordnete Knoten fort (z.B. SELECTs)
|
||||
visit_mut::walk_statement_mut(self, stmt)
|
||||
}
|
||||
|
||||
// Diese Methode wird für jede Query (auch Subqueries) aufgerufen
|
||||
fn visit_query_mut(&mut self, query: &mut Query) -> ControlFlow<Self::Break> {
|
||||
// Zuerst rekursiv in die Tiefe gehen, um innere Queries zuerst zu bearbeiten
|
||||
visit_mut::walk_query_mut(self, query);
|
||||
|
||||
// Dann die WHERE-Klausel der aktuellen Query anpassen
|
||||
if let Some(from_clause) = query.body.as_select_mut().map(|s| &mut s.from) {
|
||||
// (Hier würde eine komplexere Logik zur Analyse der Joins und Tabellen stehen)
|
||||
// Vereinfacht nehmen wir an, wir fügen es für die erste Tabelle hinzu.
|
||||
let table_name = if let Some(relation) = from_clause.get_mut(0) {
|
||||
// Diese Logik muss verfeinert werden, um Aliase etc. zu behandeln
|
||||
relation.relation.to_string()
|
||||
} else {
|
||||
"".to_string()
|
||||
};
|
||||
|
||||
if self.is_audited_table(&table_name) {
|
||||
let tombstone_check = Expr::BinaryOp {
|
||||
left: Box::new(Expr::Identifier(Ident::new(TOMBSTONE_COLUMN_NAME))),
|
||||
op: sqlparser::ast::BinaryOperator::Eq,
|
||||
right: Box::new(Expr::Value(Value::Number("0".to_string(), false))),
|
||||
};
|
||||
|
||||
let existing_selection = query.selection.take();
|
||||
let new_selection = match existing_selection {
|
||||
Some(expr) => Expr::BinaryOp {
|
||||
left: Box::new(expr),
|
||||
op: sqlparser::ast::BinaryOperator::And,
|
||||
right: Box::new(tombstone_check),
|
||||
},
|
||||
None => tombstone_check,
|
||||
};
|
||||
query.selection = Some(Box::new(new_selection));
|
||||
}
|
||||
}
|
||||
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
}
|
||||
124
src-tauri/src/crdt/trigger.rs
Normal file
124
src-tauri/src/crdt/trigger.rs
Normal file
@ -0,0 +1,124 @@
|
||||
// In src-tauri/src/trigger_manager.rs -> impl<'a> TriggerManager<'a>
|
||||
|
||||
// In einem neuen Modul, z.B. src-tauri/src/trigger_manager.rs
|
||||
use crate::sql_proxy::ColumnInfo;
|
||||
use rusqlite::{Result, Transaction};
|
||||
|
||||
pub struct TriggerManager<'a> {
|
||||
tx: &'a Transaction<'a>,
|
||||
}
|
||||
|
||||
impl<'a> TriggerManager<'a> {
|
||||
pub fn new(tx: &'a Transaction<'a>) -> Self {
|
||||
Self { tx }
|
||||
}
|
||||
|
||||
// Die Hauptfunktion, die alles einrichtet
|
||||
pub fn setup_triggers_for_table(&self, table_name: &str) -> Result<()> {
|
||||
let columns = self.get_table_schema(table_name)?;
|
||||
let pk_cols: Vec<_> = columns
|
||||
.iter()
|
||||
.filter(|c| c.is_pk)
|
||||
.map(|c| c.name.as_str())
|
||||
.collect();
|
||||
let other_cols: Vec<_> = columns
|
||||
.iter()
|
||||
.filter(|c| !c.is_pk && c.name != "tombstone")
|
||||
.map(|c| c.name.as_str())
|
||||
.collect();
|
||||
|
||||
let drop_sql = self.generate_drop_triggers_sql(table_name);
|
||||
let insert_sql = self.generate_insert_trigger_sql(table_name, &pk_cols, &other_cols);
|
||||
let update_sql = self.generate_update_trigger_sql(table_name, &pk_cols, &other_cols);
|
||||
|
||||
self.tx.execute_batch(&drop_sql)?;
|
||||
self.tx.execute_batch(&insert_sql)?;
|
||||
self.tx.execute_batch(&update_sql)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_table_schema(&self, table_name: &str) -> Result<Vec<ColumnInfo>> {
|
||||
let sql = format!("PRAGMA table_info('{}')", table_name);
|
||||
let mut stmt = self.tx.prepare(&sql)?;
|
||||
let rows = stmt.query_map(|row| {
|
||||
let pk_val: i64 = row.get(5)?;
|
||||
Ok(ColumnInfo {
|
||||
name: row.get(1)?,
|
||||
is_pk: pk_val > 0,
|
||||
})
|
||||
})?;
|
||||
|
||||
let mut columns = Vec::new();
|
||||
for row_result in rows {
|
||||
columns.push(row_result?);
|
||||
}
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
//... Implementierung der SQL-Generierungsfunktionen...
|
||||
|
||||
fn generate_update_trigger_sql(&self, table_name: &str, pks: &[&str], cols: &[&str]) -> String {
|
||||
// Erstellt dynamisch die Key-Value-Paare für das JSON-Objekt des Primärschlüssels.
|
||||
let pk_json_payload_new = pks
|
||||
.iter()
|
||||
.map(|pk| format!("'{}', NEW.\"{}\"", pk, pk))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
let pk_json_payload_old = pks
|
||||
.iter()
|
||||
.map(|pk| format!("'{}', OLD.\"{}\"", pk, pk))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
// Erstellt die einzelnen INSERT-Anweisungen für jede Spalte
|
||||
let column_updates = cols.iter().map(|col| format!(
|
||||
r#"
|
||||
-- Protokolliere die Spaltenänderung, wenn sie stattgefunden hat und es kein Soft-Delete ist
|
||||
INSERT INTO crdt_log (hlc_timestamp, op_type, table_name, row_pk, column_name, value, old_value)
|
||||
SELECT
|
||||
'placeholder_hlc', -- TODO: HLC-Funktion hier aufrufen
|
||||
'UPDATE',
|
||||
'{table}',
|
||||
json_object({pk_payload_new}),
|
||||
'{column}',
|
||||
json_object('value', NEW."{column}"),
|
||||
json_object('value', OLD."{column}")
|
||||
WHERE
|
||||
NEW."{column}" IS NOT OLD."{column}"
|
||||
"#,
|
||||
table = table_name,
|
||||
pk_payload_new = pk_json_payload_new,
|
||||
column = col
|
||||
)).collect::<Vec<_>>().join("\n");
|
||||
|
||||
// Erstellt die Logik für den Soft-Delete
|
||||
let delete_logic = format!(
|
||||
r#"
|
||||
-- Protokolliere den Soft-Delete
|
||||
INSERT INTO crdt_log (hlc_timestamp, op_type, table_name, row_pk)
|
||||
SELECT
|
||||
'placeholder_hlc', -- TODO: HLC-Funktion hier aufrufen
|
||||
'DELETE',
|
||||
'{table}',
|
||||
json_object({pk_payload_old})
|
||||
WHERE
|
||||
OLD.{tombstone_col} = 0
|
||||
"#,
|
||||
table = table_name,
|
||||
pk_payload_old = pk_json_payload_old
|
||||
);
|
||||
|
||||
// Kombiniert alles zu einem einzigen Trigger
|
||||
format!(
|
||||
"CREATE TRIGGER IF NOT EXISTS {table_name}_crdt_update
|
||||
AFTER UPDATE ON {table_name}
|
||||
FOR EACH ROW
|
||||
BEGIN
|
||||
{column_updates}
|
||||
{delete_logic}
|
||||
END;"
|
||||
)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user