use window system

This commit is contained in:
2025-10-20 19:14:05 +02:00
parent a291619f63
commit 2b8f1781f3
51 changed files with 6687 additions and 2070 deletions

View File

@ -1,9 +1,12 @@
use crate::crdt::insert_transformer::InsertTransformer;
use crate::crdt::query_transformer::QueryTransformer;
use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN};
use crate::database::error::DatabaseError;
use crate::table_names::{TABLE_CRDT_CONFIGS, TABLE_CRDT_LOGS};
use sqlparser::ast::{
Assignment, AssignmentTarget, BinaryOperator, ColumnDef, DataType, Expr, Ident, Insert,
ObjectName, ObjectNamePart, SelectItem, SetExpr, Statement, TableFactor, TableObject, Value,
Assignment, AssignmentTarget, BinaryOperator, ColumnDef, DataType, Expr, Ident,
ObjectName, ObjectNamePart, Statement, TableFactor, TableObject,
Value,
};
use std::borrow::Cow;
use std::collections::HashSet;
@ -112,7 +115,10 @@ impl CrdtTransformer {
pub fn transform_select_statement(&self, stmt: &mut Statement) -> Result<(), DatabaseError> {
match stmt {
Statement::Query(query) => self.transform_query_recursive(query),
Statement::Query(query) => {
let query_transformer = QueryTransformer::new();
query_transformer.transform_query_recursive(query, &self.excluded_tables)
}
// Fange alle anderen Fälle ab und gib einen Fehler zurück
_ => Err(DatabaseError::UnsupportedStatement {
sql: stmt.to_string(),
@ -121,10 +127,12 @@ impl CrdtTransformer {
}
}
pub fn transform_execute_statement(
/// Transformiert Statements MIT Zugriff auf Tabelleninformationen (empfohlen)
pub fn transform_execute_statement_with_table_info(
&self,
stmt: &mut Statement,
hlc_timestamp: &Timestamp,
tx: &rusqlite::Transaction,
) -> Result<Option<String>, DatabaseError> {
match stmt {
Statement::CreateTable(create_table) => {
@ -141,7 +149,100 @@ impl CrdtTransformer {
Statement::Insert(insert_stmt) => {
if let TableObject::TableName(name) = &insert_stmt.table {
if self.is_crdt_sync_table(name) {
self.transform_insert(insert_stmt, hlc_timestamp)?;
// Hole die Tabelleninformationen um PKs und FKs zu identifizieren
let table_name_str = self.normalize_table_name(name);
let columns = crate::crdt::trigger::get_table_schema(tx, &table_name_str)
.map_err(|e| DatabaseError::ExecutionError {
sql: format!("PRAGMA table_info('{}')", table_name_str),
reason: e.to_string(),
table: Some(table_name_str.to_string()),
})?;
let primary_keys: Vec<String> = columns
.iter()
.filter(|c| c.is_pk)
.map(|c| c.name.clone())
.collect();
let foreign_keys = crate::crdt::trigger::get_foreign_key_columns(tx, &table_name_str)
.map_err(|e| DatabaseError::ExecutionError {
sql: format!("PRAGMA foreign_key_list('{}')", table_name_str),
reason: e.to_string(),
table: Some(table_name_str.to_string()),
})?;
let insert_transformer = InsertTransformer::new();
insert_transformer.transform_insert(insert_stmt, hlc_timestamp, &primary_keys, &foreign_keys)?;
}
}
Ok(None)
}
Statement::Update {
table, assignments, ..
} => {
if let TableFactor::Table { name, .. } = &table.relation {
if self.is_crdt_sync_table(name) {
assignments.push(self.columns.create_hlc_assignment(hlc_timestamp));
}
}
Ok(None)
}
Statement::Delete(del_stmt) => {
if let Some(table_name) = self.extract_table_name_from_delete(del_stmt) {
let table_name_str = self.normalize_table_name(&table_name);
let is_crdt = self.is_crdt_sync_table(&table_name);
eprintln!("DEBUG DELETE (with_table_info): table='{}', is_crdt_sync={}, normalized='{}'",
table_name, is_crdt, table_name_str);
if is_crdt {
eprintln!("DEBUG: Transforming DELETE to UPDATE for table '{}'", table_name_str);
self.transform_delete_to_update(stmt, hlc_timestamp)?;
}
Ok(None)
} else {
Err(DatabaseError::UnsupportedStatement {
sql: del_stmt.to_string(),
reason: "DELETE from non-table source or multiple tables".to_string(),
})
}
}
Statement::AlterTable { name, .. } => {
if self.is_crdt_sync_table(name) {
Ok(Some(self.normalize_table_name(name).into_owned()))
} else {
Ok(None)
}
}
_ => Ok(None),
}
}
pub fn transform_execute_statement(
&self,
stmt: &mut Statement,
hlc_timestamp: &Timestamp,
) -> Result<Option<String>, DatabaseError> {
// Für INSERT-Statements ohne Connection nutzen wir eine leere PK-Liste
// Das bedeutet ALLE Spalten werden im ON CONFLICT UPDATE gesetzt
// Dies ist ein Fallback für den Fall, dass keine Connection verfügbar ist
match stmt {
Statement::CreateTable(create_table) => {
if self.is_crdt_sync_table(&create_table.name) {
self.columns
.add_to_table_definition(&mut create_table.columns);
Ok(Some(
self.normalize_table_name(&create_table.name).into_owned(),
))
} else {
Ok(None)
}
}
Statement::Insert(insert_stmt) => {
if let TableObject::TableName(name) = &insert_stmt.table {
if self.is_crdt_sync_table(name) {
// Ohne Connection: leere PK- und FK-Listen (alle Spalten werden upgedatet)
let insert_transformer = InsertTransformer::new();
insert_transformer.transform_insert(insert_stmt, hlc_timestamp, &[], &[])?;
}
}
Ok(None)
@ -180,560 +281,6 @@ impl CrdtTransformer {
}
}
/// Transformiert Query-Statements (fügt Tombstone-Filter hinzu)
fn transform_query_recursive(
&self,
query: &mut sqlparser::ast::Query,
) -> Result<(), DatabaseError> {
self.add_tombstone_filters_recursive(&mut query.body)
}
/// Rekursive Behandlung aller SetExpr-Typen mit vollständiger Subquery-Unterstützung
fn add_tombstone_filters_recursive(&self, set_expr: &mut SetExpr) -> Result<(), DatabaseError> {
match set_expr {
SetExpr::Select(select) => {
self.add_tombstone_filters_to_select(select)?;
// Transformiere auch Subqueries in Projektionen
for projection in &mut select.projection {
match projection {
SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
self.transform_expression_subqueries(expr)?;
}
_ => {} // Wildcard projections ignorieren
}
}
// Transformiere Subqueries in WHERE
if let Some(where_clause) = &mut select.selection {
self.transform_expression_subqueries(where_clause)?;
}
// Transformiere Subqueries in GROUP BY
match &mut select.group_by {
sqlparser::ast::GroupByExpr::All(_) => {
// GROUP BY ALL - keine Expressions zu transformieren
}
sqlparser::ast::GroupByExpr::Expressions(exprs, _) => {
for group_expr in exprs {
self.transform_expression_subqueries(group_expr)?;
}
}
}
// Transformiere Subqueries in HAVING
if let Some(having) = &mut select.having {
self.transform_expression_subqueries(having)?;
}
}
SetExpr::SetOperation { left, right, .. } => {
self.add_tombstone_filters_recursive(left)?;
self.add_tombstone_filters_recursive(right)?;
}
SetExpr::Query(query) => {
self.add_tombstone_filters_recursive(&mut query.body)?;
}
SetExpr::Values(values) => {
// Transformiere auch Subqueries in Values-Listen
for row in &mut values.rows {
for expr in row {
self.transform_expression_subqueries(expr)?;
}
}
}
_ => {} // Andere Fälle
}
Ok(())
}
/// Transformiert Subqueries innerhalb von Expressions
fn transform_expression_subqueries(&self, expr: &mut Expr) -> Result<(), DatabaseError> {
match expr {
// Einfache Subqueries
Expr::Subquery(query) => {
self.add_tombstone_filters_recursive(&mut query.body)?;
}
// EXISTS Subqueries
Expr::Exists { subquery, .. } => {
self.add_tombstone_filters_recursive(&mut subquery.body)?;
}
// IN Subqueries
Expr::InSubquery {
expr: left_expr,
subquery,
..
} => {
self.transform_expression_subqueries(left_expr)?;
self.add_tombstone_filters_recursive(&mut subquery.body)?;
}
// ANY/ALL Subqueries
Expr::AnyOp { left, right, .. } | Expr::AllOp { left, right, .. } => {
self.transform_expression_subqueries(left)?;
self.transform_expression_subqueries(right)?;
}
// Binäre Operationen
Expr::BinaryOp { left, right, .. } => {
self.transform_expression_subqueries(left)?;
self.transform_expression_subqueries(right)?;
}
// Unäre Operationen
Expr::UnaryOp {
expr: inner_expr, ..
} => {
self.transform_expression_subqueries(inner_expr)?;
}
// Verschachtelte Ausdrücke
Expr::Nested(nested) => {
self.transform_expression_subqueries(nested)?;
}
// CASE-Ausdrücke
Expr::Case {
operand,
conditions,
else_result,
..
} => {
if let Some(op) = operand {
self.transform_expression_subqueries(op)?;
}
for case_when in conditions {
self.transform_expression_subqueries(&mut case_when.condition)?;
self.transform_expression_subqueries(&mut case_when.result)?;
}
if let Some(else_res) = else_result {
self.transform_expression_subqueries(else_res)?;
}
}
// Funktionsaufrufe
Expr::Function(func) => match &mut func.args {
sqlparser::ast::FunctionArguments::List(sqlparser::ast::FunctionArgumentList {
args,
..
}) => {
for arg in args {
if let sqlparser::ast::FunctionArg::Unnamed(
sqlparser::ast::FunctionArgExpr::Expr(expr),
) = arg
{
self.transform_expression_subqueries(expr)?;
}
}
}
_ => {}
},
// BETWEEN
Expr::Between {
expr: main_expr,
low,
high,
..
} => {
self.transform_expression_subqueries(main_expr)?;
self.transform_expression_subqueries(low)?;
self.transform_expression_subqueries(high)?;
}
// IN Liste
Expr::InList {
expr: main_expr,
list,
..
} => {
self.transform_expression_subqueries(main_expr)?;
for list_expr in list {
self.transform_expression_subqueries(list_expr)?;
}
}
// IS NULL/IS NOT NULL
Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
self.transform_expression_subqueries(inner)?;
}
// Andere Expression-Typen benötigen keine Transformation
_ => {}
}
Ok(())
}
/// Fügt Tombstone-Filter zu SELECT-Statements hinzu (nur wenn nicht explizit in WHERE gesetzt)
fn add_tombstone_filters_to_select(
&self,
select: &mut sqlparser::ast::Select,
) -> Result<(), DatabaseError> {
// Sammle alle CRDT-Tabellen mit ihren Aliasen
let mut crdt_tables = Vec::new();
for twj in &select.from {
if let TableFactor::Table { name, alias, .. } = &twj.relation {
if self.is_crdt_sync_table(name) {
let table_alias = alias.as_ref().map(|a| a.name.value.as_str());
crdt_tables.push((name.clone(), table_alias));
}
}
}
if crdt_tables.is_empty() {
return Ok(());
}
// Prüfe, welche Tombstone-Spalten bereits in der WHERE-Klausel referenziert werden
let explicitly_filtered_tables = if let Some(where_clause) = &select.selection {
self.find_explicitly_filtered_tombstone_tables(where_clause, &crdt_tables)
} else {
HashSet::new()
};
// Erstelle Filter nur für Tabellen, die noch nicht explizit gefiltert werden
let mut tombstone_filters = Vec::new();
for (table_name, table_alias) in crdt_tables {
let table_name_string = table_name.to_string();
let table_key = table_alias.unwrap_or(&table_name_string);
if !explicitly_filtered_tables.contains(table_key) {
tombstone_filters.push(self.columns.create_tombstone_filter(table_alias));
}
}
// Füge die automatischen Filter hinzu
if !tombstone_filters.is_empty() {
let combined_filter = tombstone_filters
.into_iter()
.reduce(|acc, expr| Expr::BinaryOp {
left: Box::new(acc),
op: BinaryOperator::And,
right: Box::new(expr),
})
.unwrap();
match &mut select.selection {
Some(existing) => {
*existing = Expr::BinaryOp {
left: Box::new(existing.clone()),
op: BinaryOperator::And,
right: Box::new(combined_filter),
};
}
None => {
select.selection = Some(combined_filter);
}
}
}
Ok(())
}
/// Findet alle Tabellen, die bereits explizit Tombstone-Filter in der WHERE-Klausel haben
fn find_explicitly_filtered_tombstone_tables(
&self,
where_expr: &Expr,
crdt_tables: &[(ObjectName, Option<&str>)],
) -> HashSet<String> {
let mut filtered_tables = HashSet::new();
self.scan_expression_for_tombstone_references(
where_expr,
crdt_tables,
&mut filtered_tables,
);
filtered_tables
}
/// Rekursiv durchsucht einen Expression-Baum nach Tombstone-Spalten-Referenzen
fn scan_expression_for_tombstone_references(
&self,
expr: &Expr,
crdt_tables: &[(ObjectName, Option<&str>)],
filtered_tables: &mut HashSet<String>,
) {
match expr {
// Einfache Spaltenreferenz: tombstone = ?
Expr::Identifier(ident) => {
if ident.value == self.columns.tombstone {
// Wenn keine Tabelle spezifiziert ist und es nur eine CRDT-Tabelle gibt
if crdt_tables.len() == 1 {
let table_name_str = crdt_tables[0].0.to_string();
let table_key = crdt_tables[0].1.unwrap_or(&table_name_str);
filtered_tables.insert(table_key.to_string());
}
}
}
// Qualifizierte Spaltenreferenz: table.tombstone = ? oder alias.tombstone = ?
Expr::CompoundIdentifier(idents) => {
if idents.len() == 2 && idents[1].value == self.columns.tombstone {
let table_ref = &idents[0].value;
// Prüfe, ob es eine unserer CRDT-Tabellen ist (nach Name oder Alias)
for (table_name, alias) in crdt_tables {
let table_name_str = table_name.to_string();
if table_ref == &table_name_str || alias.map_or(false, |a| a == table_ref) {
filtered_tables.insert(table_ref.clone());
break;
}
}
}
}
// Binäre Operationen: AND, OR, etc.
Expr::BinaryOp { left, right, .. } => {
self.scan_expression_for_tombstone_references(left, crdt_tables, filtered_tables);
self.scan_expression_for_tombstone_references(right, crdt_tables, filtered_tables);
}
// Unäre Operationen: NOT, etc.
Expr::UnaryOp { expr, .. } => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
}
// Verschachtelte Ausdrücke
Expr::Nested(nested) => {
self.scan_expression_for_tombstone_references(nested, crdt_tables, filtered_tables);
}
// IN-Klauseln
Expr::InList { expr, .. } => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
}
// BETWEEN-Klauseln
Expr::Between { expr, .. } => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
}
// IS NULL/IS NOT NULL
Expr::IsNull(expr) | Expr::IsNotNull(expr) => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
}
// Funktionsaufrufe - KORRIGIERT
Expr::Function(func) => {
match &func.args {
sqlparser::ast::FunctionArguments::List(
sqlparser::ast::FunctionArgumentList { args, .. },
) => {
for arg in args {
if let sqlparser::ast::FunctionArg::Unnamed(
sqlparser::ast::FunctionArgExpr::Expr(expr),
) = arg
{
self.scan_expression_for_tombstone_references(
expr,
crdt_tables,
filtered_tables,
);
}
}
}
_ => {} // Andere FunctionArguments-Varianten ignorieren
}
}
// CASE-Ausdrücke - KORRIGIERT
Expr::Case {
operand,
conditions,
else_result,
..
} => {
if let Some(op) = operand {
self.scan_expression_for_tombstone_references(op, crdt_tables, filtered_tables);
}
for case_when in conditions {
self.scan_expression_for_tombstone_references(
&case_when.condition,
crdt_tables,
filtered_tables,
);
self.scan_expression_for_tombstone_references(
&case_when.result,
crdt_tables,
filtered_tables,
);
}
if let Some(else_res) = else_result {
self.scan_expression_for_tombstone_references(
else_res,
crdt_tables,
filtered_tables,
);
}
}
// Subqueries mit vollständiger Unterstützung
Expr::Subquery(query) => {
self.transform_query_recursive_for_tombstone_analysis(
query,
crdt_tables,
filtered_tables,
)
.ok();
}
// EXISTS/NOT EXISTS Subqueries
Expr::Exists { subquery, .. } => {
self.transform_query_recursive_for_tombstone_analysis(
subquery,
crdt_tables,
filtered_tables,
)
.ok();
}
// IN/NOT IN Subqueries
Expr::InSubquery { expr, subquery, .. } => {
self.scan_expression_for_tombstone_references(expr, crdt_tables, filtered_tables);
self.transform_query_recursive_for_tombstone_analysis(
subquery,
crdt_tables,
filtered_tables,
)
.ok();
}
// ANY/ALL Subqueries
Expr::AnyOp { left, right, .. } | Expr::AllOp { left, right, .. } => {
self.scan_expression_for_tombstone_references(left, crdt_tables, filtered_tables);
self.scan_expression_for_tombstone_references(right, crdt_tables, filtered_tables);
}
// Andere Expression-Typen ignorieren wir für jetzt
_ => {}
}
}
/// Analysiert eine Subquery und sammelt Tombstone-Referenzen
fn transform_query_recursive_for_tombstone_analysis(
&self,
query: &sqlparser::ast::Query,
crdt_tables: &[(ObjectName, Option<&str>)],
filtered_tables: &mut HashSet<String>,
) -> Result<(), DatabaseError> {
self.analyze_set_expr_for_tombstone_references(&query.body, crdt_tables, filtered_tables)
}
/// Rekursiv analysiert SetExpr für Tombstone-Referenzen
fn analyze_set_expr_for_tombstone_references(
&self,
set_expr: &SetExpr,
crdt_tables: &[(ObjectName, Option<&str>)],
filtered_tables: &mut HashSet<String>,
) -> Result<(), DatabaseError> {
match set_expr {
SetExpr::Select(select) => {
// Analysiere WHERE-Klausel
if let Some(where_clause) = &select.selection {
self.scan_expression_for_tombstone_references(
where_clause,
crdt_tables,
filtered_tables,
);
}
// Analysiere alle Projektionen (können auch Subqueries enthalten)
for projection in &select.projection {
match projection {
SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
self.scan_expression_for_tombstone_references(
expr,
crdt_tables,
filtered_tables,
);
}
_ => {} // Wildcard projections ignorieren
}
}
// Analysiere GROUP BY
match &select.group_by {
sqlparser::ast::GroupByExpr::All(_) => {
// GROUP BY ALL - keine Expressions zu analysieren
}
sqlparser::ast::GroupByExpr::Expressions(exprs, _) => {
for group_expr in exprs {
self.scan_expression_for_tombstone_references(
group_expr,
crdt_tables,
filtered_tables,
);
}
}
}
// Analysiere HAVING
if let Some(having) = &select.having {
self.scan_expression_for_tombstone_references(
having,
crdt_tables,
filtered_tables,
);
}
}
SetExpr::SetOperation { left, right, .. } => {
self.analyze_set_expr_for_tombstone_references(left, crdt_tables, filtered_tables)?;
self.analyze_set_expr_for_tombstone_references(
right,
crdt_tables,
filtered_tables,
)?;
}
SetExpr::Query(query) => {
self.analyze_set_expr_for_tombstone_references(
&query.body,
crdt_tables,
filtered_tables,
)?;
}
SetExpr::Values(values) => {
// Analysiere Values-Listen
for row in &values.rows {
for expr in row {
self.scan_expression_for_tombstone_references(
expr,
crdt_tables,
filtered_tables,
);
}
}
}
_ => {} // Andere Varianten
}
Ok(())
}
/// Transformiert INSERT-Statements (fügt HLC-Timestamp hinzu)
fn transform_insert(
&self,
insert_stmt: &mut Insert,
timestamp: &Timestamp,
) -> Result<(), DatabaseError> {
// Add both haex_timestamp and haex_tombstone columns
insert_stmt
.columns
.push(Ident::new(self.columns.hlc_timestamp));
insert_stmt
.columns
.push(Ident::new(self.columns.tombstone));
match insert_stmt.source.as_mut() {
Some(query) => match &mut *query.body {
SetExpr::Values(values) => {
for row in &mut values.rows {
// Add haex_timestamp value
row.push(Expr::Value(
Value::SingleQuotedString(timestamp.to_string()).into(),
));
// Add haex_tombstone value (0 = not deleted)
row.push(Expr::Value(
Value::Number("0".to_string(), false).into(),
));
}
}
SetExpr::Select(select) => {
let hlc_expr =
Expr::Value(Value::SingleQuotedString(timestamp.to_string()).into());
select.projection.push(SelectItem::UnnamedExpr(hlc_expr));
// Add haex_tombstone value (0 = not deleted)
let tombstone_expr =
Expr::Value(Value::Number("0".to_string(), false).into());
select.projection.push(SelectItem::UnnamedExpr(tombstone_expr));
}
_ => {
return Err(DatabaseError::UnsupportedStatement {
sql: insert_stmt.to_string(),
reason: "INSERT with unsupported source type".to_string(),
});
}
},
None => {
return Err(DatabaseError::UnsupportedStatement {
reason: "INSERT statement has no source".to_string(),
sql: insert_stmt.to_string(),
});
}
}
Ok(())
}
/// Transformiert DELETE zu UPDATE (soft delete)
fn transform_delete_to_update(