Source code
Revision control
Copy as Markdown
Other Tools
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
//! A single store.
use std::{
borrow::Cow,
ffi::OsStr,
fmt::Write,
io, mem,
ops::Deref,
path::{Path, PathBuf},
sync::{
atomic::{self, AtomicUsize},
Arc, Condvar, Mutex,
},
time::SystemTime,
};
use chrono::{DateTime, Utc};
use rusqlite::OpenFlags;
use crate::{
fs::WidePathBuf,
skv::{
checker::{Checker, CheckerAction, IntoChecker},
connection::{
Connection, ConnectionIncident, ConnectionIncidents, ConnectionMaintenanceTask,
ConnectionPath, ConnectionType, ToConnectionIncident,
},
schema::{Schema, SchemaError},
},
};
/// A persistent store backed by a physical SQLite database.
///
/// Under the hood, a store holds two connections to the same physical database:
///
/// * A **read-write** connection for queries and updates. This connection
/// runs operations serially, and those operations can't be interrupted.
/// * A **read-only** connection for concurrent reads. This connection can
/// read from the physical database even if the read-write connection is busy,
/// and those reads can be interrupted. Reads on this connection won't see any
/// uncommitted changes on the read-write connection.
#[derive(Debug)]
pub struct Store {
path: StorePath,
state: Mutex<StoreState>,
waiter: OperationWaiter,
}
impl Store {
pub fn new(path: StorePath) -> Self {
Self {
path,
state: Mutex::new(StoreState::Created),
waiter: OperationWaiter::new(),
}
}
/// Gets or opens both connections to the physical database.
fn open<C>(&self) -> Result<OpenStoreGuard<'_>, StoreError>
where
for<'a> ConnectionIncidents<'a>: IntoChecker<C>,
C: ConnectionMaintenanceTask,
C::Error: std::error::Error + Send + Sync + 'static,
{
let guard = {
// Scope for the locked state.
let mut state = self.state.lock().unwrap();
loop {
let result = match &*state {
StoreState::Created => {
let store = Arc::new(OpenStore::new(&self.path)?);
*state = StoreState::Open(store);
// Advance the state machine, so that the checker can
// check the database on first use.
continue;
}
StoreState::Open(store) => {
let store = store.clone();
match IntoChecker::<C>::into_checker(store.writer.incidents()) {
CheckerAction::Skip => {
let guard = OpenStoreGuard::new(store, self.waiter.guard());
Ok(CheckedStore::Healthy(guard))
}
CheckerAction::Check(checker) => {
// Take the store temporarily out of service.
// Clients won't be able to read from or
// write to the store during maintenance, but
// will be able to close it.
let writer =
Writer(OpenStoreGuard::new(store.clone(), self.waiter.guard()));
*state = StoreState::Maintenance(store);
Err(UnhealthyStore::Check(writer, checker))
}
CheckerAction::Replace => {
// Take the store permanently out of service.
*state = StoreState::Corrupt;
Err(UnhealthyStore::Replace(store))
}
}
}
StoreState::Maintenance(_) => return Err(StoreError::Busy),
StoreState::Corrupt => return Err(StoreError::Corrupt),
StoreState::Closed => return Err(StoreError::Closed),
};
break result;
}
}
.or_else(|store| {
match store {
UnhealthyStore::Replace(store) => {
Ok(CheckedStore::Corrupt(store, StoreError::Corrupt))
}
UnhealthyStore::Check(writer, checker) => {
// Check for database corruption.
let result = writer
.maintenance(checker)
.map_err(|err| StoreError::Maintenance(err.into()));
{
// Scope for the locked state.
let mut state = self.state.lock().unwrap();
let StoreState::Maintenance(store) = &*state else {
// The store was closed during maintenance.
return result.and_then(|_| {
// The checker ran to completion, but
// the store is closed now.
Err(StoreError::Closed)
});
};
let store = store.clone();
match result {
Ok(()) => {
// If the checker succeeded, put the store
// back into service.
let guard = OpenStoreGuard::new(store.clone(), self.waiter.guard());
*state = StoreState::Open(store);
Ok(CheckedStore::Healthy(guard))
}
Err(err) => {
// If the checker failed, take the store
// permanently out of service.
*state = StoreState::Corrupt;
Ok(CheckedStore::Corrupt(store, err))
}
}
}
}
}
})?;
match guard {
CheckedStore::Healthy(guard) => Ok(guard),
CheckedStore::Corrupt(store, err) => {
// Interrupt all connection operations. Since we're about
// to replace the database, interrupting writes here is OK.
store.reader.interrupt();
store.writer.interrupt();
// Wait for all (now-interrupted) operations to finish and
// drop their strong references to the store.
self.waiter.wait();
// Invariant: Changing the state to `Corrupt`, and waiting for
// all operations to finish, should have dropped all other
// strong references.
let store = Arc::into_inner(store).expect("invariant violation");
store.close();
// A corrupt database file might be salvageable, so
// move it aside.
if let Some(path) = self.path.on_disk() {
rename_corrupt_database_file(&path);
}
Err(err)
}
}
}
/// Returns the read-write connection to use for queries and updates.
pub fn writer(&self) -> Result<Writer<'_>, StoreError> {
Ok(Writer(self.open::<Checker>()?))
}
/// Returns the read-only connection to use for concurrent reads.
pub fn reader(&self) -> Result<Reader<'_>, StoreError> {
Ok(Reader(self.open::<Checker>()?))
}
#[cfg(feature = "gtest")]
pub fn check<C>(&self) -> Result<(), StoreError>
where
for<'a> ConnectionIncidents<'a>: IntoChecker<C>,
C: ConnectionMaintenanceTask,
C::Error: std::error::Error + Send + Sync + 'static,
{
// We discard the guard because we only want to advance the
// state machine, not return a connection.
let _ = self.open::<C>()?;
Ok(())
}
/// Closes both connections to the physical database.
pub fn close(&self) {
// Take ownership of the connections, so that we can close them and
// prevent any new reads or writes from starting.
let store = match mem::replace(&mut *self.state.lock().unwrap(), StoreState::Closed) {
StoreState::Created | StoreState::Closed | StoreState::Corrupt => return,
StoreState::Open(store) => {
// If the store is working normally, interrupt concurrent reads,
// but let writes finish.
store.reader.interrupt();
store
}
StoreState::Maintenance(store) => {
// If the store is unhealthy, interrupt reads and writes.
// There's no risk of data loss, because the writer is
// only running maintenance operations, and we don't want
// closing to wait on those.
store.reader.interrupt();
store.writer.interrupt();
store
}
};
// Wait for all connection operations to finish and drop their
// strong references to the store.
self.waiter.wait();
// Invariant: Waiting for all operations to finish should have dropped
// all other strong references.
let store = Arc::into_inner(store).expect("invariant violation");
store.close();
}
}
/// Either a path to a physical SQLite database file on disk, or
/// a reference to a unique in-memory database.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum StorePath {
OnDisk(PathBuf),
InMemory(usize),
}
impl StorePath {
pub const IN_MEMORY_DATABASE_NAME: &'static str = ":memory:";
const DEFAULT_DATABASE_FILE_NAME: &'static str = "kvstore.sqlite";
/// Returns the canonical [`StorePath`] for a [`WidePathBuf`]. This
/// method normalizes string paths passed to the XPCOM
/// [`crate::skv::interface`] methods.
///
/// **Canonicalization can accesses the filesystem**, so this method
/// should not be called on the main thread.
pub fn canonicalizing(path: WidePathBuf) -> Result<Self, StoreError> {
Ok(if path == StorePath::IN_MEMORY_DATABASE_NAME {
StorePath::for_in_memory()
} else {
// Concurrently accessing the same physical SQLite database
// through different links can corrupt its WAL file,
// especially when done from multiple processes.
// Mitigate that by canonicalizing the path.
let dir = path.canonicalize().map_err(StoreError::StorageDir)?;
StorePath::for_storage_dir(dir)
})
}
/// Returns the path to the physical database file in the given
/// storage directory.
pub fn for_storage_dir(dir: impl Into<PathBuf>) -> Self {
let mut path = dir.into();
path.push(Self::DEFAULT_DATABASE_FILE_NAME);
Self::OnDisk(path)
}
/// Returns a path to a unique in-memory physical database.
pub fn for_in_memory() -> Self {
static NEXT_IN_MEMORY_DATABASE_ID: AtomicUsize = AtomicUsize::new(1);
let id = NEXT_IN_MEMORY_DATABASE_ID.fetch_add(1, atomic::Ordering::Relaxed);
Self::InMemory(id)
}
/// If this path is to a physical database file on disk,
/// returns a reference to the path.
pub fn on_disk(&self) -> Option<OnDiskStorePath<'_>> {
match self {
Self::OnDisk(buf) => buf
.file_name()
.map(|name| OnDiskStorePath::new(buf.parent(), name.into())),
Self::InMemory(_) => None,
}
}
}
impl ConnectionPath for StorePath {
fn as_path(&self) -> Cow<'_, Path> {
match self {
Self::OnDisk(buf) => Cow::Borrowed(buf.as_path()),
Self::InMemory(id) => {
// A store opens two connections to the same physical database.
// To make this work for in-memory databases, we use a URI that
// names the database and enables shared-cache mode.
Cow::Owned(format!("file:kvstore-{id}?mode=memory&cache=shared").into())
}
}
}
fn flags(&self) -> OpenFlags {
match self {
Self::OnDisk(_) => OpenFlags::empty(),
Self::InMemory(_) => {
// Note that we must use a URI filename to open an
// in-memory database in shared-cache mode.
// SQLite will return a "library used incorrectly" error if
// we try to open a named in-memory database with
// `SQLITE_OPEN_MEMORY | SQLITE_OPEN_SHARED_CACHE`.
OpenFlags::SQLITE_OPEN_URI
}
}
}
}
/// A path to an SQLite database file and its related files on disk.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct OnDiskStorePath<'a> {
dir: Option<&'a Path>,
name: Cow<'a, OsStr>,
}
impl<'a> OnDiskStorePath<'a> {
fn new(dir: Option<&'a Path>, name: Cow<'a, OsStr>) -> Self {
Self { dir, name }
}
/// Returns the path to the temporary WAL file.
pub fn wal(&self) -> PathBuf {
let mut name = self.name.clone().into_owned();
write!(&mut name, "-wal").unwrap();
self.dir.map(|dir| dir.join(&name)).unwrap_or(name.into())
}
/// Returns the path to the temporary shared-memory file.
pub fn shm(&self) -> PathBuf {
let mut name = self.name.clone().into_owned();
write!(&mut name, "-shm").unwrap();
self.dir.map(|dir| dir.join(&name)).unwrap_or(name.into())
}
/// Returns the path to use for backing up a corrupt database file
/// and its related files.
pub fn to_corrupt(&self) -> OnDiskStorePath<'a> {
let now = DateTime::<Utc>::from(SystemTime::now());
let mut name = self.name.clone().into_owned();
write!(&mut name, ".corrupt-{}", now.format("%Y%m%d%H%M%S")).unwrap();
Self::new(self.dir, name.into())
}
}
impl<'a> ConnectionPath for OnDiskStorePath<'a> {
fn as_path(&self) -> Cow<'_, Path> {
match self.dir {
Some(dir) => Cow::Owned(dir.join(&self.name)),
None => Cow::Borrowed(Path::new(&self.name)),
}
}
fn flags(&self) -> OpenFlags {
OpenFlags::empty()
}
}
/// Backs up a corrupt SQLite database file and its related files.
fn rename_corrupt_database_file(source: &OnDiskStorePath<'_>) {
let destination = source.to_corrupt();
let _ = std::fs::rename(source.as_path(), destination.as_path());
let _ = std::fs::rename(source.wal(), destination.wal());
let _ = std::fs::rename(source.shm(), destination.shm());
}
/// A strong reference to an open store.
struct OpenStoreGuard<'a> {
store: Arc<OpenStore>,
// Field order is important here: struct fields are dropped in
// declaration order, and we want to ensure that the strong reference
// to the open store is dropped before the pending operation guard.
// Otherwise, the strong reference count will race with the operation count,
// and might violate the invariant in `Store::close()`.
_guard: OperationGuard<'a>,
}
impl<'a> OpenStoreGuard<'a> {
fn new(store: Arc<OpenStore>, guard: OperationGuard<'a>) -> Self {
Self {
store,
_guard: guard,
}
}
}
/// A read-write connection to an SQLite database.
pub struct Writer<'a>(OpenStoreGuard<'a>);
impl<'a> Deref for Writer<'a> {
type Target = Connection;
fn deref(&self) -> &Self::Target {
&self.0.store.writer
}
}
/// A read-only connection to an SQLite database.
pub struct Reader<'a>(OpenStoreGuard<'a>);
impl<'a> Deref for Reader<'a> {
type Target = Connection;
fn deref(&self) -> &Self::Target {
&self.0.store.reader
}
}
/// The internal state of a [`Store`].
///
/// ## State diagram
///
/// ```custom
/// +---------+
/// | Created +-----------------------------------+
/// +--+------+ |
/// | |
/// +--v---+ +-------------+ +---------+ |
/// | Open +----> Maintenance +----> Corrupt | |
/// +-+--^-+ +---v--v------+ +----+----+ |
/// | | | | | |
/// | +----------+ | | |
/// | | | |
/// | +--------------+ | |
/// | | | |
/// | | +-----------------------------+ |
/// | | | |
/// +-v-v-v--+ |
/// | Closed <------------------------------------+
/// +--------+
/// ```
#[derive(Debug)]
enum StoreState {
Created,
Open(Arc<OpenStore>),
Maintenance(Arc<OpenStore>),
Corrupt,
Closed,
}
#[derive(Debug)]
struct OpenStore {
writer: Connection,
reader: Connection,
}
impl OpenStore {
fn new(path: &StorePath) -> Result<Self, StoreError> {
Ok(match Self::open(path) {
Ok(store) => store,
Err(StoreError::Sqlite(err)) => {
let (Some(code), Some(path)) = (err.sqlite_error_code(), path.on_disk()) else {
return Err(err.into());
};
match code {
rusqlite::ErrorCode::NotADatabase | rusqlite::ErrorCode::DatabaseCorrupt => {
// If SQLite can't open the database file, it's unlikely
// that we can salvage it, but move it aside anyway.
rename_corrupt_database_file(&path);
Self::open(&path)?
}
_ => return Err(err.into()),
}
}
Err(err) => return Err(err),
})
}
fn open(path: &impl ConnectionPath) -> Result<Self, StoreError> {
// Order is important here: the writer must be opened first,
// so that it can initialize the schema.
let writer = Connection::new::<Schema>(path, ConnectionType::ReadWrite)?;
let reader = Connection::new::<Schema>(path, ConnectionType::ReadOnly)?;
Ok(Self { writer, reader })
}
fn close(self) {
self.reader.close();
self.writer.close();
}
}
/// A temporarily out-of-service store.
enum UnhealthyStore<'a, C> {
Check(Writer<'a>, C),
Replace(Arc<OpenStore>),
}
/// An out-of-service store that was checked for corruption.
enum CheckedStore<'a> {
Healthy(OpenStoreGuard<'a>),
Corrupt(Arc<OpenStore>, StoreError),
}
#[derive(Debug)]
struct OperationWaiter {
count: Mutex<usize>,
cvar: Condvar,
}
impl OperationWaiter {
fn new() -> Self {
Self {
count: Mutex::new(0),
cvar: Condvar::new(),
}
}
/// Increments the pending operation count, and returns a guard
/// that decrements the count when dropped.
fn guard(&self) -> OperationGuard<'_> {
*self.count.lock().unwrap() += 1;
OperationGuard(self)
}
/// Waits for the pending operation count to reach zero.
fn wait(&self) {
let mut count = self.count.lock().unwrap();
while *count > 0 {
count = self.cvar.wait(count).unwrap();
}
}
}
struct OperationGuard<'a>(&'a OperationWaiter);
impl<'a> Drop for OperationGuard<'a> {
fn drop(&mut self) {
let mut count = self.0.count.lock().unwrap();
*count -= 1;
if *count == 0 {
self.0.cvar.notify_all();
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum StoreError {
#[error("schema: {0}")]
Schema(#[from] SchemaError),
#[error("busy")]
Busy,
#[error("maintenance: {0}")]
Maintenance(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("closed")]
Closed,
#[error("corrupt")]
Corrupt,
#[error("sqlite: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("storage dir: {0}")]
StorageDir(#[source] io::Error),
}
impl ToConnectionIncident for StoreError {
fn to_incident(&self) -> Option<ConnectionIncident> {
match self {
Self::Sqlite(err) => err.to_incident(),
_ => None,
}
}
}