1use std::cell::RefCell;
2use std::sync::Arc;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{Mutex, RwLock};
5use std::time::Duration;
6
7use crossbeam::queue::ArrayQueue;
8use log::{Level, LevelFilter, Log, Metadata, Record, SetLoggerError};
9use logger as c_log;
10use once_cell::sync::{Lazy, OnceCell};
11use shadow_shim_helper_rs::emulated_time::EmulatedTime;
12use shadow_shim_helper_rs::util::time::TimeParts;
13
14use crate::core::worker::Worker;
15use crate::host::host::HostInfo;
16
17const ASYNC_FLUSH_QD_LINES_THRESHOLD: usize = 100_000;
19
20const SYNC_FLUSH_QD_LINES_THRESHOLD: usize = 10 * ASYNC_FLUSH_QD_LINES_THRESHOLD;
26
27const MIN_FLUSH_FREQUENCY: Duration = Duration::from_secs(10);
29
30static SHADOW_LOGGER: Lazy<ShadowLogger> = Lazy::new(ShadowLogger::new);
31
32pub fn init(
34    max_log_level: LevelFilter,
35    report_errors_to_stderr: bool,
36) -> Result<(), SetLoggerError> {
37    SHADOW_LOGGER.set_max_level(max_log_level);
38    SHADOW_LOGGER.set_report_errors_to_stderr(report_errors_to_stderr);
39
40    log::set_logger(&*SHADOW_LOGGER)?;
41
42    log::set_max_level(log::LevelFilter::Trace);
45
46    std::thread::Builder::new()
48        .name("shadow-logger".to_string())
49        .spawn(move || SHADOW_LOGGER.logger_thread_fn())
50        .unwrap();
51
52    let default_panic_handler = std::panic::take_hook();
54    std::panic::set_hook(Box::new(move |panic_info| {
55        SHADOW_LOGGER.flush_records(None).ok();
61        default_panic_handler(panic_info);
62    }));
63
64    Ok(())
65}
66
67pub struct ShadowLogger {
73    command_sender: Mutex<Sender<LoggerCommand>>,
79
80    command_receiver: Mutex<Receiver<LoggerCommand>>,
84
85    records: ArrayQueue<ShadowLogRecord>,
95
96    buffering_enabled: RwLock<bool>,
99
100    max_log_level: OnceCell<LevelFilter>,
102
103    report_errors_to_stderr: OnceCell<bool>,
105}
106
107thread_local!(static SENDER: RefCell<Option<Sender<LoggerCommand>>> = const{ RefCell::new(None)});
108thread_local!(static THREAD_NAME: String = get_thread_name());
109thread_local!(static THREAD_ID: nix::unistd::Pid = nix::unistd::gettid());
110
111fn get_thread_name() -> String {
112    let mut thread_name = Vec::<i8>::with_capacity(16);
113    let res = unsafe {
114        thread_name.set_len(thread_name.capacity());
115        libc::pthread_getname_np(
117            libc::pthread_self(),
118            thread_name.as_mut_ptr(),
119            thread_name.len(),
120        )
121    };
122    debug_assert_eq!(
124        res,
125        0,
126        "pthread_getname_np: {}",
127        nix::errno::Errno::from_raw(res),
128    );
129    if res == 0 {
130        let thread_name_cstr = unsafe { std::ffi::CStr::from_ptr(thread_name.as_ptr()) };
133        return thread_name_cstr.to_owned().to_string_lossy().to_string();
134    }
135
136    String::new()
142}
143
144impl ShadowLogger {
145    fn new() -> ShadowLogger {
146        let (sender, receiver) = std::sync::mpsc::channel();
147
148        ShadowLogger {
149            records: ArrayQueue::new(SYNC_FLUSH_QD_LINES_THRESHOLD),
150            command_sender: Mutex::new(sender),
151            command_receiver: Mutex::new(receiver),
152            buffering_enabled: RwLock::new(false),
153            max_log_level: OnceCell::new(),
154            report_errors_to_stderr: OnceCell::new(),
155        }
156    }
157
158    fn logger_thread_fn(&self) {
161        let command_receiver = self.command_receiver.lock().unwrap();
162
163        loop {
164            use std::sync::mpsc::RecvTimeoutError;
165            match command_receiver.recv_timeout(MIN_FLUSH_FREQUENCY) {
166                Ok(LoggerCommand::Flush(done_sender)) => self.flush_records(done_sender).unwrap(),
167                Err(RecvTimeoutError::Timeout) => {
168                    self.flush_records(None).unwrap();
170                }
171                Err(e) => panic!("Unexpected error {e}"),
172            }
173        }
174    }
175
176    fn flush_records(&self, done_sender: Option<Sender<()>>) -> std::io::Result<()> {
180        use std::io::Write;
181
182        let mut toflush = self.records.len();
188
189        let stdout_unlocked = std::io::stdout();
190        let stdout_locked = stdout_unlocked.lock();
191        let mut stdout = std::io::BufWriter::new(stdout_locked);
192
193        while toflush > 0 {
194            let record = match self.records.pop() {
195                Some(r) => r,
196                None => {
197                    break;
201                }
202            };
203            toflush -= 1;
204
205            write!(stdout, "{record}")?;
206
207            if record.level <= Level::Error && *self.report_errors_to_stderr.get().unwrap() {
208                stdout.flush()?;
212
213                let stderr_unlocked = std::io::stderr();
218                let stderr_locked = stderr_unlocked.lock();
219                let mut stderr = std::io::BufWriter::new(stderr_locked);
220                writeln!(stderr, "Error: {}", record.message)?;
221
222                stderr.flush()?;
224                drop(stderr);
225            }
226        }
227
228        stdout.flush()?;
230        drop(stdout);
231
232        if let Some(done_sender) = done_sender {
233            done_sender.send(()).unwrap_or_else(|e| {
237                println!(
238                    "WARNING: Logger couldn't notify
239                calling thread: {e:?}",
240                )
241            });
242        }
243        Ok(())
244    }
245
246    pub fn set_buffering_enabled(&self, buffering_enabled: bool) {
250        let mut writer = self.buffering_enabled.write().unwrap();
251        *writer = buffering_enabled;
252    }
253
254    pub fn max_level(&self) -> LevelFilter {
256        self.max_log_level
257            .get()
258            .copied()
259            .unwrap_or(LevelFilter::Trace)
260    }
261
262    fn set_max_level(&self, level: LevelFilter) {
265        self.max_log_level.set(level).unwrap()
266    }
267
268    fn set_report_errors_to_stderr(&self, val: bool) {
273        self.report_errors_to_stderr.set(val).unwrap()
274    }
275
276    fn flush_impl(&self, notify_done: Option<Sender<()>>) {
278        self.send_command(LoggerCommand::Flush(notify_done))
279    }
280
281    fn flush_sync(&self) {
283        let (done_sender, done_receiver) = std::sync::mpsc::channel();
284        self.flush_impl(Some(done_sender));
285        done_receiver.recv().unwrap();
286    }
287
288    fn flush_async(&self) {
290        self.flush_impl(None);
291    }
292
293    fn send_command(&self, cmd: LoggerCommand) {
295        SENDER
296            .try_with(|thread_sender| {
297                if thread_sender.borrow().is_none() {
298                    let lock = self.command_sender.lock().unwrap();
299                    *thread_sender.borrow_mut() = Some(lock.clone());
300                }
301                thread_sender
302                    .borrow()
303                    .as_ref()
304                    .unwrap()
305                    .send(cmd)
306                    .unwrap_or_else(|e| {
307                        println!("WARNING: Couldn't send command to logger thread: {e:?}");
308                    });
309            })
310            .unwrap_or_else(|e| {
311                println!("WARNING: Couldn't get sender channel to logger thread: {e:?}");
312            });
313    }
314}
315
316impl Log for ShadowLogger {
317    fn enabled(&self, metadata: &Metadata) -> bool {
318        let filter = match Worker::with_active_host(|host| host.info().log_level) {
319            Some(Some(level)) => level,
320            _ => self.max_level(),
321        };
322        metadata.level() <= filter
323    }
324
325    fn log(&self, record: &Record) {
326        if !self.enabled(record.metadata()) {
327            return;
328        }
329
330        let message = std::fmt::format(*record.args());
331
332        let host_info = Worker::with_active_host(|host| host.info().clone());
333
334        let mut shadowrecord = ShadowLogRecord {
335            level: record.level(),
336            file: record.file_static(),
337            module_path: record.module_path_static(),
338            line: record.line(),
339            message,
340            wall_time: Duration::from_micros(unsafe {
341                u64::try_from(c_log::logger_elapsed_micros()).unwrap()
342            }),
343
344            emu_time: Worker::current_time(),
345            thread_name: THREAD_NAME
346                .try_with(|name| (*name).clone())
347                .unwrap_or_else(|_| get_thread_name()),
348            thread_id: THREAD_ID
349                .try_with(|id| *id)
350                .unwrap_or_else(|_| nix::unistd::gettid()),
351            host_info,
352        };
353
354        loop {
355            match self.records.push(shadowrecord) {
356                Ok(()) => break,
357                Err(r) => {
358                    shadowrecord = r;
360                    self.flush_sync();
361                }
362            }
363        }
364
365        if record.level() == Level::Error {
366            self.flush_sync();
372        } else if self.records.len() > ASYNC_FLUSH_QD_LINES_THRESHOLD
373            || !*self.buffering_enabled.read().unwrap()
374        {
375            self.flush_async();
376        }
377    }
378
379    fn flush(&self) {
380        self.flush_sync();
381    }
382}
383
384struct ShadowLogRecord {
385    level: Level,
386    file: Option<&'static str>,
387    module_path: Option<&'static str>,
388    line: Option<u32>,
389    message: String,
390    wall_time: Duration,
391
392    emu_time: Option<EmulatedTime>,
393    thread_name: String,
394    thread_id: nix::unistd::Pid,
395    host_info: Option<Arc<HostInfo>>,
396}
397
398impl std::fmt::Display for ShadowLogRecord {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        {
401            let parts = TimeParts::from_nanos(self.wall_time.as_nanos());
402            write!(
403                f,
404                "{:02}:{:02}:{:02}.{:06}",
405                parts.hours,
406                parts.mins,
407                parts.secs,
408                parts.nanos / 1000
409            )?;
410        }
411        write!(f, " [{}:{}]", self.thread_id, self.thread_name)?;
412        if let Some(emu_time) = self.emu_time {
413            let sim_time = emu_time.duration_since(&EmulatedTime::SIMULATION_START);
414            let parts = TimeParts::from_nanos(sim_time.as_nanos());
415            write!(
416                f,
417                " {:02}:{:02}:{:02}.{:09}",
418                parts.hours, parts.mins, parts.secs, parts.nanos
419            )?;
420        } else {
421            write!(f, " n/a")?;
422        }
423        write!(f, " [{level}]", level = self.level)?;
424        if let Some(host) = &self.host_info {
425            write!(
426                f,
427                " [{hostname}:{ip}]",
428                hostname = host.name,
429                ip = host.default_ip,
430            )?;
431        } else {
432            write!(f, " [n/a]",)?;
433        }
434        write!(
435            f,
436            " [{file}:",
437            file = self
438                .file
439                .map(|f| if let Some(sep_pos) = f.rfind('/') {
440                    &f[(sep_pos + 1)..]
441                } else {
442                    f
443                })
444                .unwrap_or("n/a"),
445        )?;
446        if let Some(line) = self.line {
447            write!(f, "{line}")?;
448        } else {
449            write!(f, "n/a")?;
450        }
451        writeln!(
452            f,
453            "] [{module}] {msg}",
454            module = self.module_path.unwrap_or("n/a"),
455            msg = self.message
456        )?;
457        Ok(())
458    }
459}
460
461enum LoggerCommand {
462    Flush(Option<Sender<()>>),
464}
465
466pub fn set_buffering_enabled(buffering_enabled: bool) {
467    SHADOW_LOGGER.set_buffering_enabled(buffering_enabled);
468}
469
470mod export {
471    use super::*;
472
473    #[unsafe(no_mangle)]
477    pub unsafe extern "C-unwind" fn shadow_logger_setEnableBuffering(buffering_enabled: i32) {
478        set_buffering_enabled(buffering_enabled != 0)
479    }
480}