object/read/
read_cache.rs

1use alloc::boxed::Box;
2use alloc::vec::Vec;
3use core::cell::RefCell;
4use core::convert::TryInto;
5use core::mem;
6use core::ops::Range;
7#[cfg(feature = "std")]
8use std::io::{Read, Seek, SeekFrom};
9
10#[cfg(not(feature = "std"))]
11use alloc::collections::btree_map::{BTreeMap as Map, Entry};
12#[cfg(feature = "std")]
13use std::collections::hash_map::{Entry, HashMap as Map};
14
15use crate::read::ReadRef;
16
17/// An implementation of [`ReadRef`] for data in a stream that implements
18/// `Read + Seek`.
19///
20/// Contains a cache of read-only blocks of data, allowing references to
21/// them to be returned. Entries in the cache are never removed.
22/// Entries are keyed on the offset and size of the read.
23/// Currently overlapping reads are considered separate reads.
24///
25/// This is primarily intended for environments where memory mapped files
26/// are not available or not suitable, such as WebAssembly.
27///
28/// Note that malformed files can cause the cache to grow much larger than
29/// the file size.
30#[derive(Debug)]
31pub struct ReadCache<R: ReadCacheOps> {
32    cache: RefCell<ReadCacheInternal<R>>,
33}
34
35#[derive(Debug)]
36struct ReadCacheInternal<R: ReadCacheOps> {
37    read: R,
38    bufs: Map<(u64, u64), Box<[u8]>>,
39    strings: Map<(u64, u8), Box<[u8]>>,
40    len: Option<u64>,
41}
42
43impl<R: ReadCacheOps> ReadCacheInternal<R> {
44    /// Ensures this range is contained in the len of the file
45    fn range_in_bounds(&mut self, range: &Range<u64>) -> Result<(), ()> {
46        if range.start <= range.end && range.end <= self.len()? {
47            Ok(())
48        } else {
49            Err(())
50        }
51    }
52
53    /// The length of the underlying read, memoized
54    fn len(&mut self) -> Result<u64, ()> {
55        match self.len {
56            Some(len) => Ok(len),
57            None => {
58                let len = self.read.len()?;
59                self.len = Some(len);
60                Ok(len)
61            }
62        }
63    }
64}
65
66impl<R: ReadCacheOps> ReadCache<R> {
67    /// Create an empty `ReadCache` for the given stream.
68    pub fn new(read: R) -> Self {
69        ReadCache {
70            cache: RefCell::new(ReadCacheInternal {
71                read,
72                bufs: Map::new(),
73                strings: Map::new(),
74                len: None,
75            }),
76        }
77    }
78
79    /// Return an implementation of `ReadRef` that restricts reads
80    /// to the given range of the stream.
81    pub fn range(&self, offset: u64, size: u64) -> ReadCacheRange<'_, R> {
82        ReadCacheRange {
83            r: self,
84            offset,
85            size,
86        }
87    }
88
89    /// Free buffers used by the cache.
90    pub fn clear(&mut self) {
91        self.cache.borrow_mut().bufs.clear();
92    }
93
94    /// Unwrap this `ReadCache<R>`, returning the underlying reader.
95    pub fn into_inner(self) -> R {
96        self.cache.into_inner().read
97    }
98}
99
100impl<'a, R: ReadCacheOps> ReadRef<'a> for &'a ReadCache<R> {
101    fn len(self) -> Result<u64, ()> {
102        self.cache.borrow_mut().len()
103    }
104
105    fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> {
106        if size == 0 {
107            return Ok(&[]);
108        }
109        let cache = &mut *self.cache.borrow_mut();
110        cache.range_in_bounds(&(offset..(offset.saturating_add(size))))?;
111        let buf = match cache.bufs.entry((offset, size)) {
112            Entry::Occupied(entry) => entry.into_mut(),
113            Entry::Vacant(entry) => {
114                let size = size.try_into().map_err(|_| ())?;
115                cache.read.seek(offset)?;
116                let mut bytes = Vec::new();
117                bytes.try_reserve_exact(size).map_err(|_| ())?;
118                bytes.resize(size, 0);
119                let mut bytes = bytes.into_boxed_slice();
120                cache.read.read_exact(&mut bytes)?;
121                entry.insert(bytes)
122            }
123        };
124        // Extend the lifetime to that of self.
125        // This is OK because we never mutate or remove entries.
126        Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) })
127    }
128
129    fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> {
130        let cache = &mut *self.cache.borrow_mut();
131        cache.range_in_bounds(&range)?;
132        let buf = match cache.strings.entry((range.start, delimiter)) {
133            Entry::Occupied(entry) => entry.into_mut(),
134            Entry::Vacant(entry) => {
135                cache.read.seek(range.start)?;
136
137                let max_check: usize = (range.end - range.start).try_into().map_err(|_| ())?;
138                // Strings should be relatively small.
139                // TODO: make this configurable?
140                let max_check = max_check.min(4096);
141
142                let mut bytes = Vec::new();
143                let mut checked = 0;
144                loop {
145                    bytes.resize((checked + 256).min(max_check), 0);
146                    let read = cache.read.read(&mut bytes[checked..])?;
147                    if read == 0 {
148                        return Err(());
149                    }
150                    if let Some(len) = memchr::memchr(delimiter, &bytes[checked..][..read]) {
151                        bytes.truncate(checked + len);
152                        break entry.insert(bytes.into_boxed_slice());
153                    }
154                    checked += read;
155                    if checked >= max_check {
156                        return Err(());
157                    }
158                }
159            }
160        };
161        // Extend the lifetime to that of self.
162        // This is OK because we never mutate or remove entries.
163        Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) })
164    }
165}
166
167/// An implementation of [`ReadRef`] for a range of data in a stream that
168/// implements `Read + Seek`.
169///
170/// Shares an underlying [`ReadCache`] with a lifetime of `'a`.
171#[derive(Debug)]
172pub struct ReadCacheRange<'a, R: ReadCacheOps> {
173    r: &'a ReadCache<R>,
174    offset: u64,
175    size: u64,
176}
177
178impl<'a, R: ReadCacheOps> Clone for ReadCacheRange<'a, R> {
179    fn clone(&self) -> Self {
180        *self
181    }
182}
183
184impl<'a, R: ReadCacheOps> Copy for ReadCacheRange<'a, R> {}
185
186impl<'a, R: ReadCacheOps> ReadRef<'a> for ReadCacheRange<'a, R> {
187    fn len(self) -> Result<u64, ()> {
188        Ok(self.size)
189    }
190
191    fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> {
192        if size == 0 {
193            return Ok(&[]);
194        }
195        let end = offset.checked_add(size).ok_or(())?;
196        if end > self.size {
197            return Err(());
198        }
199        let r_offset = self.offset.checked_add(offset).ok_or(())?;
200        self.r.read_bytes_at(r_offset, size)
201    }
202
203    fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> {
204        let r_start = self.offset.checked_add(range.start).ok_or(())?;
205        let r_end = self.offset.checked_add(range.end).ok_or(())?;
206        let bytes = self.r.read_bytes_at_until(r_start..r_end, delimiter)?;
207        let size = bytes.len().try_into().map_err(|_| ())?;
208        let end = range.start.checked_add(size).ok_or(())?;
209        if end > self.size {
210            return Err(());
211        }
212        Ok(bytes)
213    }
214}
215
216/// Operations required to implement [`ReadCache`].
217///
218/// This is a subset of the `Read` and `Seek` traits.
219/// A blanket implementation is provided for all types that implement
220/// `Read + Seek`.
221#[allow(clippy::len_without_is_empty)]
222pub trait ReadCacheOps {
223    /// Return the length of the stream.
224    ///
225    /// Equivalent to `std::io::Seek::seek(SeekFrom::End(0))`.
226    fn len(&mut self) -> Result<u64, ()>;
227
228    /// Seek to the given position in the stream.
229    ///
230    /// Equivalent to `std::io::Seek::seek` with `SeekFrom::Start(pos)`.
231    fn seek(&mut self, pos: u64) -> Result<u64, ()>;
232
233    /// Read up to `buf.len()` bytes into `buf`.
234    ///
235    /// Equivalent to `std::io::Read::read`.
236    fn read(&mut self, buf: &mut [u8]) -> Result<usize, ()>;
237
238    /// Read exactly `buf.len()` bytes into `buf`.
239    ///
240    /// Equivalent to `std::io::Read::read_exact`.
241    fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ()>;
242}
243
244#[cfg(feature = "std")]
245impl<T: Read + Seek> ReadCacheOps for T {
246    fn len(&mut self) -> Result<u64, ()> {
247        self.seek(SeekFrom::End(0)).map_err(|_| ())
248    }
249
250    fn seek(&mut self, pos: u64) -> Result<u64, ()> {
251        self.seek(SeekFrom::Start(pos)).map_err(|_| ())
252    }
253
254    fn read(&mut self, buf: &mut [u8]) -> Result<usize, ()> {
255        Read::read(self, buf).map_err(|_| ())
256    }
257
258    fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ()> {
259        Read::read_exact(self, buf).map_err(|_| ())
260    }
261}