• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829321272

pending completion
4829321272

Pull #1515

github

GitHub
Merge b6d982211 into f2ab0e6ce
Pull Request #1515: feat: Run migration only when necessary

193 of 193 new or added lines in 11 files covered. (100.0%)

35565 of 45252 relevant lines covered (78.59%)

16462.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.08
/dozer-log-js/src/lib.rs
1
use std::{path::Path, sync::Arc};
2✔
2

3
use dozer_log::{
4
    home_dir::HomeDir,
5
    reader::LogReader as RustLogReader,
6
    schemas::load_schema,
7
    tokio::{runtime::Runtime as TokioRuntime, sync::Mutex},
8
};
9
use dozer_types::types::Schema;
10
use neon::prelude::*;
11

12
const EXTERNAL_PROPERTY_NAME: &str = "__external__";
13

14
#[derive(Debug, Clone)]
×
15
struct Runtime {
16
    runtime: Arc<TokioRuntime>,
17
    channel: Channel,
18
}
19

20
impl Finalize for Runtime {}
21

22
fn new_runtime(mut cx: FunctionContext) -> JsResult<JsObject> {
×
23
    // Create the object that will be returned.
×
24
    let runtime_object = JsObject::new(&mut cx);
×
25

26
    // Create the runtime and store it in the object.
27
    let runtime = match TokioRuntime::new() {
×
28
        Ok(runtime) => runtime,
×
29
        Err(error) => return cx.throw_error(error.to_string()),
×
30
    };
31
    let channel = Channel::new(&mut cx);
×
32
    let managed_runtime = cx.boxed(Runtime {
×
33
        runtime: Arc::new(runtime),
×
34
        channel,
×
35
    });
×
36
    runtime_object.set(&mut cx, EXTERNAL_PROPERTY_NAME, managed_runtime)?;
×
37

38
    // Create the `create_reader` function.
39
    let create_reader = JsFunction::new(&mut cx, runtime_create_reader)?;
×
40
    runtime_object.set(&mut cx, "create_reader", create_reader)?;
×
41
    Ok(runtime_object)
×
42
}
×
43

44
struct LogReader {
45
    runtime: Runtime,
46
    reader: Arc<Mutex<RustLogReader>>,
47
    schema: Arc<Schema>,
48
}
49

50
impl Finalize for LogReader {}
51

52
fn runtime_create_reader(mut cx: FunctionContext) -> JsResult<JsPromise> {
×
53
    // Extract runtime from `this`.
×
54
    let this = cx.this();
×
55
    let runtime_object = this.downcast_or_throw::<JsObject, _>(&mut cx)?;
×
56
    let runtime = runtime_object.get::<JsBox<Runtime>, _, _>(&mut cx, EXTERNAL_PROPERTY_NAME)?;
×
57

58
    // Extract `home_dir` from the first argument.
59
    let home_dir = cx.argument::<JsString>(0)?.value(&mut cx);
×
60

61
    // Extract `endpoint_name` from the second argument.
62
    let endpoint_name = cx.argument::<JsString>(1)?.value(&mut cx);
×
63

×
64
    // Find latest migration.
×
65
    let home_dir = HomeDir::new(home_dir.as_ref(), Default::default());
×
66
    let migration_path = match home_dir.find_latest_migration_path(&endpoint_name) {
×
67
        Ok(Some(migration_path)) => migration_path,
×
68
        Ok(None) => return cx.throw_error("No migration found"),
×
69
        Err((path, error)) => return cx.throw_error(format!("Failed to read {path:?}: {error}")),
×
70
    };
71

72
    // Load schema.
73
    let schema = match load_schema(&migration_path.schema_path) {
×
74
        Ok(schema) => schema,
×
75
        Err(error) => return cx.throw_error(error.to_string()),
×
76
    };
77

78
    // Create the reader.
79
    let (deferred, promise) = cx.promise();
×
80
    let runtime_for_reader = (**runtime).clone();
×
81
    let channel = runtime.channel.clone();
×
82
    runtime.runtime.spawn(async move {
×
83
        // Create the reader.
×
84
        let log_path = migration_path.log_path;
×
85
        let name = AsRef::<Path>::as_ref(&log_path)
×
86
            .parent()
×
87
            .and_then(|parent| parent.file_name().and_then(|file_name| file_name.to_str()))
×
88
            .unwrap_or("unknown".as_ref())
×
89
            .to_string();
×
90
        let reader = RustLogReader::new(log_path.as_ref(), &name, 0, None).await;
×
91

92
        // Resolve the promise.
93
        deferred.settle_with(&channel, move |mut cx| match reader {
×
94
            Ok(reader) => new_reader(&mut cx, runtime_for_reader, reader, schema.schema),
×
95
            Err(error) => cx.throw_error(error.to_string()),
×
96
        });
×
97
    });
×
98
    Ok(promise)
×
99
}
×
100

101
fn new_reader<'a, C: Context<'a>>(
×
102
    cx: &mut C,
×
103
    runtime: Runtime,
×
104
    reader: RustLogReader,
×
105
    schema: Schema,
×
106
) -> JsResult<'a, JsObject> {
×
107
    // Create the object that will be returned.
×
108
    let reader_object = JsObject::new(cx);
×
109

×
110
    // Store the reader in the object.
×
111
    let managed_reader = cx.boxed(LogReader {
×
112
        runtime,
×
113
        reader: Arc::new(Mutex::new(reader)),
×
114
        schema: Arc::new(schema),
×
115
    });
×
116
    reader_object.set(cx, EXTERNAL_PROPERTY_NAME, managed_reader)?;
×
117

118
    // Create the `next_op` function.
119
    let next_op = JsFunction::new(cx, reader_next_op)?;
×
120
    reader_object.set(cx, "next_op", next_op)?;
×
121

122
    Ok(reader_object)
×
123
}
×
124

125
fn reader_next_op(mut cx: FunctionContext) -> JsResult<JsPromise> {
×
126
    // Extract reader from `this`.
×
127
    let this = cx.this();
×
128
    let reader_object = this.downcast_or_throw::<JsObject, _>(&mut cx)?;
×
129
    let reader = reader_object.get::<JsBox<LogReader>, _, _>(&mut cx, EXTERNAL_PROPERTY_NAME)?;
×
130

131
    // Create the promise.
132
    let (deferred, promise) = cx.promise();
×
133
    let runtime = &reader.runtime;
×
134
    let channel = runtime.channel.clone();
×
135
    let schema = reader.schema.clone();
×
136
    let reader = reader.reader.clone();
×
137
    runtime.runtime.spawn(async move {
×
138
        // Read the next operation.
139
        let op = reader.lock().await.next_op().await;
×
140

141
        // Resolve the promise.
142
        deferred.settle_with(&channel, move |mut cx| {
×
143
            mapper::map_executor_operation(op, &schema, &mut cx)
×
144
        });
×
145
    });
×
146
    Ok(promise)
×
147
}
×
148

149
#[neon::main]
×
150
fn main(mut cx: ModuleContext) -> NeonResult<()> {
151
    cx.export_function("Runtime", new_runtime)?;
152
    Ok(())
153
}
154

155
mod mapper;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc