refatored rust sql and drizzle

This commit is contained in:
2025-10-22 15:05:36 +02:00
parent 9ea057e943
commit f70e924cc3
27 changed files with 3968 additions and 2747 deletions

View File

@ -4,8 +4,8 @@
use crate::crdt::trigger::{HLC_TIMESTAMP_COLUMN, TOMBSTONE_COLUMN};
use crate::database::error::DatabaseError;
use sqlparser::ast::{
Assignment, AssignmentTarget, BinaryOperator, Expr, Ident, Insert, ObjectNamePart,
OnConflict, OnConflictAction, OnInsert, SelectItem, SetExpr, Value,
Assignment, AssignmentTarget, BinaryOperator, Expr, Ident, Insert, ObjectNamePart, OnConflict,
OnConflictAction, OnInsert, SelectItem, SetExpr, Value,
};
use uhlc::Timestamp;
@ -23,6 +23,37 @@ impl InsertTransformer {
}
}
fn find_or_add_column(columns: &mut Vec<Ident>, col_name: &'static str) -> usize {
match columns.iter().position(|c| c.value == col_name) {
Some(index) => index, // Gefunden! Gib Index zurück.
None => {
// Nicht gefunden! Hinzufügen.
columns.push(Ident::new(col_name));
columns.len() - 1 // Der Index des gerade hinzugefügten Elements
}
}
}
/// Wenn der Index == der Länge ist, wird der Wert stattdessen gepusht.
fn set_or_push_value(row: &mut Vec<Expr>, index: usize, value: Expr) {
if index < row.len() {
// Spalte war vorhanden, Wert (wahrscheinlich `?` oder NULL) ersetzen
row[index] = value;
} else {
// Spalte war nicht vorhanden, Wert hinzufügen
row.push(value);
}
}
fn set_or_push_projection(projection: &mut Vec<SelectItem>, index: usize, value: Expr) {
let item = SelectItem::UnnamedExpr(value);
if index < projection.len() {
projection[index] = item;
} else {
projection.push(item);
}
}
/// Transformiert INSERT-Statements (fügt HLC-Timestamp hinzu und behandelt Tombstone-Konflikte)
/// Fügt automatisch RETURNING für Primary Keys hinzu, damit der Executor die tatsächlichen PKs kennt
pub fn transform_insert(
@ -32,11 +63,11 @@ impl InsertTransformer {
primary_keys: &[String],
foreign_keys: &[String],
) -> Result<(), DatabaseError> {
// Add both haex_timestamp and haex_tombstone columns
insert_stmt
.columns
.push(Ident::new(self.hlc_timestamp_column));
insert_stmt.columns.push(Ident::new(self.tombstone_column));
// Add both haex_timestamp and haex_tombstone columns if not exists
let hlc_col_index =
Self::find_or_add_column(&mut insert_stmt.columns, self.hlc_timestamp_column);
let tombstone_col_index =
Self::find_or_add_column(&mut insert_stmt.columns, self.tombstone_column);
// Füge RETURNING für alle Primary Keys hinzu (falls noch nicht vorhanden)
// Dies erlaubt uns, die tatsächlichen PK-Werte nach ON CONFLICT zu kennen
@ -57,7 +88,7 @@ impl InsertTransformer {
// Erstelle UPDATE-Assignments für alle Spalten außer CRDT-Spalten, Primary Keys und Foreign Keys
let mut assignments = Vec::new();
for column in insert_stmt.columns.iter() {
for column in insert_stmt.columns.clone().iter() {
let col_name = &column.value;
// Überspringe CRDT-Spalten
@ -87,17 +118,17 @@ impl InsertTransformer {
// Füge HLC-Timestamp Update hinzu (mit dem übergebenen timestamp)
assignments.push(Assignment {
target: AssignmentTarget::ColumnName(sqlparser::ast::ObjectName(vec![ObjectNamePart::Identifier(
Ident::new(self.hlc_timestamp_column),
)])),
target: AssignmentTarget::ColumnName(sqlparser::ast::ObjectName(vec![
ObjectNamePart::Identifier(Ident::new(self.hlc_timestamp_column)),
])),
value: Expr::Value(Value::SingleQuotedString(timestamp.to_string()).into()),
});
// Setze Tombstone auf 0 (reaktiviere den Eintrag)
assignments.push(Assignment {
target: AssignmentTarget::ColumnName(sqlparser::ast::ObjectName(vec![ObjectNamePart::Identifier(
Ident::new(self.tombstone_column),
)])),
target: AssignmentTarget::ColumnName(sqlparser::ast::ObjectName(vec![
ObjectNamePart::Identifier(Ident::new(self.tombstone_column)),
])),
value: Expr::Value(Value::Number("0".to_string(), false).into()),
});
@ -122,23 +153,26 @@ impl InsertTransformer {
Some(query) => match &mut *query.body {
SetExpr::Values(values) => {
for row in &mut values.rows {
// Add haex_timestamp value
row.push(Expr::Value(
Value::SingleQuotedString(timestamp.to_string()).into(),
));
// Add haex_tombstone value (0 = not deleted)
row.push(Expr::Value(Value::Number("0".to_string(), false).into()));
let hlc_value =
Expr::Value(Value::SingleQuotedString(timestamp.to_string()).into());
let tombstone_value =
Expr::Value(Value::Number("0".to_string(), false).into());
Self::set_or_push_value(row, hlc_col_index, hlc_value);
Self::set_or_push_value(row, tombstone_col_index, tombstone_value);
}
}
SetExpr::Select(select) => {
let hlc_expr =
let hlc_value =
Expr::Value(Value::SingleQuotedString(timestamp.to_string()).into());
select.projection.push(SelectItem::UnnamedExpr(hlc_expr));
// Add haex_tombstone value (0 = not deleted)
let tombstone_expr = Expr::Value(Value::Number("0".to_string(), false).into());
select
.projection
.push(SelectItem::UnnamedExpr(tombstone_expr));
let tombstone_value = Expr::Value(Value::Number("0".to_string(), false).into());
Self::set_or_push_projection(&mut select.projection, hlc_col_index, hlc_value);
Self::set_or_push_projection(
&mut select.projection,
tombstone_col_index,
tombstone_value,
);
}
_ => {
return Err(DatabaseError::UnsupportedStatement {