Revision control
Copy as Markdown
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Weak},
};
use camino::Utf8PathBuf;
use error_support::trace;
use parking_lot::Mutex;
use serde::Deserialize;
use viaduct::Request;
use crate::{
client::RemoteState, config::BaseUrl, error::Error, storage::Storage, RemoteSettingsClient,
RemoteSettingsConfig2, RemoteSettingsContext, RemoteSettingsServer, Result,
};
/// Internal Remote settings service API
pub struct RemoteSettingsService {
inner: Mutex<RemoteSettingsServiceInner>,
}
struct RemoteSettingsServiceInner {
storage_dir: Utf8PathBuf,
base_url: BaseUrl,
bucket_name: String,
app_context: Option<RemoteSettingsContext>,
remote_state: RemoteState,
/// Weakrefs for all clients that we've created. Note: this stores the
/// top-level/public `RemoteSettingsClient` structs rather than `client::RemoteSettingsClient`.
/// The reason for this is that we return Arcs to the public struct to the foreign code, so we
/// need to use the same type for our weakrefs. The alternative would be to create 2 Arcs for
/// each client, which is wasteful.
clients: Vec<Weak<RemoteSettingsClient>>,
}
impl RemoteSettingsService {
/// Construct a [RemoteSettingsService]
///
/// This is typically done early in the application-startup process
pub fn new(storage_dir: String, config: RemoteSettingsConfig2) -> Self {
let storage_dir = storage_dir.into();
let base_url = config
.server
.unwrap_or(RemoteSettingsServer::Prod)
.get_base_url_with_prod_fallback();
let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
Self {
inner: Mutex::new(RemoteSettingsServiceInner {
storage_dir,
base_url,
bucket_name,
app_context: config.app_context,
remote_state: RemoteState::default(),
clients: vec![],
}),
}
}
pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
let mut inner = self.inner.lock();
// Allow using in-memory databases for testing of external crates.
let storage = if inner.storage_dir == ":memory:" {
Storage::new(inner.storage_dir.clone())
} else {
Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
};
let client = Arc::new(RemoteSettingsClient::new(
inner.base_url.clone(),
inner.bucket_name.clone(),
collection_name.clone(),
inner.app_context.clone(),
storage,
));
inner.clients.push(Arc::downgrade(&client));
client
}
/// Sync collections for all active clients
pub fn sync(&self) -> Result<Vec<String>> {
// Make sure we only sync each collection once, even if there are multiple clients
let mut synced_collections = HashSet::new();
let mut inner = self.inner.lock();
let changes = inner.fetch_changes()?;
let change_map: HashMap<_, _> = changes
.changes
.iter()
.map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
.collect();
let bucket_name = inner.bucket_name.clone();
for client in inner.active_clients() {
let client = &client.internal;
let collection_name = client.collection_name();
if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
{
if client_last_modified != *server_last_modified {
trace!("skipping up-to-date collection: {collection_name}");
continue;
}
}
}
if synced_collections.insert(collection_name.to_string()) {
trace!("syncing collection: {collection_name}");
client.sync()?;
}
}
Ok(synced_collections.into_iter().collect())
}
/// Update the remote settings config
///
/// This will cause all current and future clients to use new config and will delete any stored
/// records causing the clients to return new results from the new config.
pub fn update_config(&self, config: RemoteSettingsConfig2) -> Result<()> {
let base_url = config
.server
.unwrap_or(RemoteSettingsServer::Prod)
.get_base_url()?;
let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
let mut inner = self.inner.lock();
for client in inner.active_clients() {
client.internal.update_config(
base_url.clone(),
bucket_name.clone(),
config.app_context.clone(),
)?;
}
inner.base_url = base_url;
inner.bucket_name = bucket_name;
inner.app_context = config.app_context;
Ok(())
}
}
impl RemoteSettingsServiceInner {
// Find live clients in self.clients
//
// Also, drop dead weakrefs from the vec
fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
let mut active_clients = vec![];
self.clients.retain(|weak| {
if let Some(client) = weak.upgrade() {
active_clients.push(client);
true
} else {
false
}
});
active_clients
}
fn fetch_changes(&mut self) -> Result<Changes> {
let mut url = self.base_url.clone();
url.path_segments_mut()
.push("buckets")
.push("monitor")
.push("collections")
.push("changes")
.push("changeset");
// For now, always use `0` for the expected value. This means we'll get updates based on
// the default TTL of 1 hour.
//
// Eventually, we should add support for push notifications and use the timestamp from the
// notification.
url.query_pairs_mut().append_pair("_expected", "0");
let url = url.into_inner();
trace!("make_request: {url}");
self.remote_state.ensure_no_backoff()?;
let req = Request::get(url);
let resp = req.send()?;
self.remote_state.handle_backoff_hint(&resp)?;
if resp.is_success() {
Ok(resp.json()?)
} else {
Err(Error::ResponseError(format!(
"status code: {}",
resp.status
)))
}
}
}
/// Data from the changes endpoint
///
#[derive(Debug, Deserialize)]
struct Changes {
changes: Vec<ChangesCollection>,
}
#[derive(Debug, Deserialize)]
struct ChangesCollection {
collection: String,
bucket: String,
last_modified: u64,
}