Revision control

Copy as Markdown

Other Tools

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// The "incoming" part of syncing - handling the incoming rows, staging them,
// working out a plan for them, updating the local data and mirror, etc.
use interrupt_support::Interruptee;
use rusqlite::{Connection, Row, Transaction};
use sql_support::ConnExt;
use sync15::bso::{IncomingContent, IncomingKind};
use sync_guid::Guid as SyncGuid;
use crate::api::{StorageChanges, StorageValueChange};
use crate::error::*;
use super::{merge, remove_matching_keys, JsonMap, WebextRecord};
/// The state data can be in. Could be represented as Option<JsonMap>, but this
/// is clearer and independent of how the data is stored.
#[derive(Debug, PartialEq, Eq)]
pub enum DataState {
/// The data was deleted.
Deleted,
/// Data exists, as stored in the map.
Exists(JsonMap),
}
// A little helper to create a StorageChanges object when we are creating
// a new value with multiple keys that doesn't exist locally.
fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges {
let mut result = StorageChanges::with_capacity(new.len());
for (key, val) in new.iter() {
result.push(StorageValueChange {
key: key.clone(),
old_value: None,
new_value: Some(val.clone()),
});
}
result
}
// This module deals exclusively with the Map inside a JsonValue::Object().
// This helper reads such a Map from a SQL row, ignoring anything which is
// either invalid JSON or a different JSON type.
fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> {
let s = row.get::<_, Option<String>>(col)?;
Ok(match s {
None => DataState::Deleted,
Some(s) => match serde_json::from_str(&s) {
Ok(serde_json::Value::Object(m)) => DataState::Exists(m),
_ => {
// We don't want invalid json or wrong types to kill syncing.
// It should be impossible as we never write anything which
// could cause it, but we can't really log the bad data as there
// might be PII. Logging just a message without any additional
// clues is going to be unhelpfully noisy, so, silently None.
// XXX - Maybe record telemetry?
DataState::Deleted
}
},
})
}
/// The first thing we do with incoming items is to "stage" them in a temp table.
/// The actual processing is done via this table.
pub fn stage_incoming(
tx: &Transaction<'_>,
incoming_records: &[IncomingContent<WebextRecord>],
signal: &dyn Interruptee,
) -> Result<()> {
sql_support::each_sized_chunk(
incoming_records,
// We bind 3 params per chunk.
sql_support::default_max_variable_number() / 3,
|chunk, _| -> Result<()> {
let mut params = Vec::with_capacity(chunk.len() * 3);
for record in chunk {
signal.err_if_interrupted()?;
match &record.kind {
IncomingKind::Content(r) => {
params.push(Some(record.envelope.id.to_string()));
params.push(Some(r.ext_id.to_string()));
params.push(Some(r.data.clone()));
}
IncomingKind::Tombstone => {
params.push(Some(record.envelope.id.to_string()));
params.push(None);
params.push(None);
}
IncomingKind::Malformed => {
log::error!("Ignoring incoming malformed record: {}", record.envelope.id);
}
}
}
// we might have skipped records
let actual_len = params.len() / 3;
if actual_len != 0 {
let sql = format!(
"INSERT OR REPLACE INTO temp.storage_sync_staging
(guid, ext_id, data)
VALUES {}",
sql_support::repeat_multi_values(actual_len, 3)
);
tx.execute(&sql, rusqlite::params_from_iter(params))?;
}
Ok(())
},
)?;
Ok(())
}
/// The "state" we find ourselves in when considering an incoming/staging
/// record. This "state" is the input to calculating the IncomingAction and
/// carries all the data we need to make the required local changes.
#[derive(Debug, PartialEq, Eq)]
pub enum IncomingState {
/// There's an incoming item, but data for that extension doesn't exist
/// either in our local data store or in the local mirror. IOW, this is the
/// very first time we've seen this extension.
IncomingOnlyData { ext_id: String, data: JsonMap },
/// An incoming tombstone that doesn't exist locally. Because tombstones
/// don't carry the ext-id, it means it's not in our mirror. We are just
/// going to ignore it, but we track the state for consistency.
IncomingOnlyTombstone,
/// There's an incoming item and we have data for the same extension in
/// our local store - but not in our mirror. This should be relatively
/// uncommon as it means:
/// * Some other profile has recently installed an extension and synced.
/// * This profile has recently installed the same extension.
/// * This is the first sync for this profile since both those events
/// happened.
HasLocal {
ext_id: String,
incoming: DataState,
local: DataState,
},
/// There's an incoming item and there's an item for the same extension in
/// the mirror. The addon probably doesn't exist locally, or if it does,
/// the last time we synced we synced the deletion of all data.
NotLocal {
ext_id: String,
incoming: DataState,
mirror: DataState,
},
/// This will be the most common "incoming" case - there's data incoming,
/// in the mirror and in the local store for an addon.
Everywhere {
ext_id: String,
incoming: DataState,
mirror: DataState,
local: DataState,
},
}
/// Get the items we need to process from the staging table. Return details about
/// the item and the state of that item, ready for processing.
pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> {
let sql = "
SELECT
s.guid as guid,
l.ext_id as l_ext_id,
m.ext_id as m_ext_id,
s.ext_id as s_ext_id,
s.data as s_data, m.data as m_data, l.data as l_data,
l.sync_change_counter
FROM temp.storage_sync_staging s
LEFT JOIN storage_sync_mirror m ON m.guid = s.guid
LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);";
fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> {
let guid = row.get("guid")?;
// This is complicated because the staging row doesn't hold the ext_id.
// However, both the local table and the mirror do.
let mirror_ext_id: Option<String> = row.get("m_ext_id")?;
let local_ext_id: Option<String> = row.get("l_ext_id")?;
let staged_ext_id: Option<String> = row.get("s_ext_id")?;
let incoming = json_map_from_row(row, "s_data")?;
// We find the state by examining which tables the ext-id exists in,
// using whether that column is null as a proxy for that.
let state = match (local_ext_id, mirror_ext_id) {
(None, None) => {
match staged_ext_id {
Some(ext_id) => {
let data = match incoming {
// incoming record with missing data that's not a
// tombstone shouldn't happen, but we can cope by
// pretending it was an empty json map.
DataState::Deleted => JsonMap::new(),
DataState::Exists(data) => data,
};
IncomingState::IncomingOnlyData { ext_id, data }
}
None => IncomingState::IncomingOnlyTombstone,
}
}
(Some(ext_id), None) => IncomingState::HasLocal {
ext_id,
incoming,
local: json_map_from_row(row, "l_data")?,
},
(None, Some(ext_id)) => IncomingState::NotLocal {
ext_id,
incoming,
mirror: json_map_from_row(row, "m_data")?,
},
(Some(ext_id), Some(_)) => IncomingState::Everywhere {
ext_id,
incoming,
mirror: json_map_from_row(row, "m_data")?,
local: json_map_from_row(row, "l_data")?,
},
};
Ok((guid, state))
}
conn.conn().query_rows_and_then(sql, [], from_row)
}
/// This is the set of actions we know how to take *locally* for incoming
/// records. Which one depends on the IncomingState.
/// Every state which updates also records the set of changes we should notify
#[derive(Debug, PartialEq, Eq)]
pub enum IncomingAction {
/// We should locally delete the data for this record
DeleteLocally {
ext_id: String,
changes: StorageChanges,
},
/// We will take the remote.
TakeRemote {
ext_id: String,
data: JsonMap,
changes: StorageChanges,
},
/// We merged this data - this is what we came up with.
Merge {
ext_id: String,
data: JsonMap,
changes: StorageChanges,
},
/// Entry exists locally and it's the same as the incoming record.
Same { ext_id: String },
/// Incoming tombstone for an item we've never seen.
Nothing,
}
/// Takes the state of an item and returns the action we should take for it.
pub fn plan_incoming(s: IncomingState) -> IncomingAction {
match s {
IncomingState::Everywhere {
ext_id,
incoming,
local,
mirror,
} => {
// All records exist - but do they all have data?
match (incoming, local, mirror) {
(
DataState::Exists(incoming_data),
DataState::Exists(local_data),
DataState::Exists(mirror_data),
) => {
// all records have data - 3-way merge.
merge(ext_id, incoming_data, local_data, Some(mirror_data))
}
(
DataState::Exists(incoming_data),
DataState::Exists(local_data),
DataState::Deleted,
) => {
// No parent, so first time seeing this remotely - 2-way merge
merge(ext_id, incoming_data, local_data, None)
}
(DataState::Exists(incoming_data), DataState::Deleted, _) => {
// Incoming data, removed locally. Server wins.
IncomingAction::TakeRemote {
ext_id,
changes: changes_for_new_incoming(&incoming_data),
data: incoming_data,
}
}
(DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => {
// Deleted remotely.
// Treat this as a delete of every key that we
// know was present at the time.
let (result, changes) = remove_matching_keys(local_data, &mirror);
if result.is_empty() {
// If there were no more keys left, we can
// delete our version too.
IncomingAction::DeleteLocally { ext_id, changes }
} else {
IncomingAction::Merge {
ext_id,
data: result,
changes,
}
}
}
(DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => {
// Perhaps another client created and then deleted
// the whole object for this extension since the
// last time we synced.
// Treat this as a delete of every key that we
// knew was present. Unfortunately, we don't know
// any keys that were present, so we delete no keys.
IncomingAction::Merge {
ext_id,
data: local_data,
changes: StorageChanges::new(),
}
}
(DataState::Deleted, DataState::Deleted, _) => {
// We agree with the remote (regardless of what we
// have mirrored).
IncomingAction::Same { ext_id }
}
}
}
IncomingState::HasLocal {
ext_id,
incoming,
local,
} => {
// So we have a local record and an incoming/staging record, but *not* a
// mirror record. This means some other device has synced this for
// the first time and we are yet to do the same.
match (incoming, local) {
(DataState::Exists(incoming_data), DataState::Exists(local_data)) => {
// This means the extension exists locally and remotely
// but this is the first time we've synced it. That's no problem, it's
// just a 2-way merge...
merge(ext_id, incoming_data, local_data, None)
}
(DataState::Deleted, DataState::Exists(local_data)) => {
// We've data locally, but there's an incoming deletion.
// We would normally remove keys that we knew were
// present on the server, but we don't know what
// was on the server, so we don't remove anything.
IncomingAction::Merge {
ext_id,
data: local_data,
changes: StorageChanges::new(),
}
}
(DataState::Exists(incoming_data), DataState::Deleted) => {
// No data locally, but some is incoming - take it.
IncomingAction::TakeRemote {
ext_id,
changes: changes_for_new_incoming(&incoming_data),
data: incoming_data,
}
}
(DataState::Deleted, DataState::Deleted) => {
// Nothing anywhere - odd, but OK.
IncomingAction::Same { ext_id }
}
}
}
IncomingState::NotLocal {
ext_id, incoming, ..
} => {
// No local data but there's mirror and an incoming record.
// This means a local deletion is being replaced by, or just re-doing
// the incoming record.
match incoming {
DataState::Exists(data) => IncomingAction::TakeRemote {
ext_id,
changes: changes_for_new_incoming(&data),
data,
},
DataState::Deleted => IncomingAction::Same { ext_id },
}
}
IncomingState::IncomingOnlyData { ext_id, data } => {
// Only the staging record exists and it's not a tombstone.
// This means it's the first time we've ever seen it. No
// conflict possible, just take the remote.
IncomingAction::TakeRemote {
ext_id,
changes: changes_for_new_incoming(&data),
data,
}
}
IncomingState::IncomingOnlyTombstone => {
// Only the staging record exists and it is a tombstone - nothing to do.
IncomingAction::Nothing
}
}
}
fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> {
tx.execute_cached(
"INSERT INTO temp.storage_sync_applied (ext_id, changes)
VALUES (:ext_id, :changes)",
rusqlite::named_params! {
":ext_id": ext_id,
":changes": &serde_json::to_string(&changes)?,
},
)?;
Ok(())
}
// Apply the actions necessary to fully process the incoming items.
pub fn apply_actions(
tx: &Transaction<'_>,
actions: Vec<(SyncGuid, IncomingAction)>,
signal: &dyn Interruptee,
) -> Result<()> {
for (item, action) in actions {
signal.err_if_interrupted()?;
log::trace!("action for '{:?}': {:?}", item, action);
match action {
IncomingAction::DeleteLocally { ext_id, changes } => {
// Can just nuke it entirely.
tx.execute_cached(
"DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
&[(":ext_id", &ext_id)],
)?;
insert_changes(tx, &ext_id, &changes)?;
}
// We want to update the local record with 'data' and after this update the item no longer is considered dirty.
IncomingAction::TakeRemote {
ext_id,
data,
changes,
} => {
tx.execute_cached(
"INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter)
VALUES (:ext_id, :data, 0)",
rusqlite::named_params! {
":ext_id": ext_id,
":data": serde_json::Value::Object(data),
},
)?;
insert_changes(tx, &ext_id, &changes)?;
}
// We merged this data, so need to update locally but still consider
// it dirty because the merged data must be uploaded.
IncomingAction::Merge {
ext_id,
data,
changes,
} => {
tx.execute_cached(
"UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id",
rusqlite::named_params! {
":ext_id": ext_id,
":data": serde_json::Value::Object(data),
},
)?;
insert_changes(tx, &ext_id, &changes)?;
}
// Both local and remote ended up the same - only need to nuke the
// change counter.
IncomingAction::Same { ext_id } => {
tx.execute_cached(
"UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id",
&[(":ext_id", &ext_id)],
)?;
// no changes to write
}
// Literally nothing to do!
IncomingAction::Nothing => {}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::super::test::new_syncable_mem_db;
use super::*;
use crate::api;
use interrupt_support::NeverInterrupts;
use serde_json::{json, Value};
use sync15::bso::IncomingBso;
// select simple int
fn ssi(conn: &Connection, stmt: &str) -> u32 {
conn.try_query_one(stmt, [], true)
.expect("must work")
.unwrap_or_default()
}
fn array_to_incoming(mut array: Value) -> Vec<IncomingContent<WebextRecord>> {
let jv = array.as_array_mut().expect("you must pass a json array");
let mut result = Vec::with_capacity(jv.len());
for elt in jv {
result.push(IncomingBso::from_test_content(elt.take()).into_content());
}
result
}
// Can't find a way to import these from crate::sync::tests...
macro_rules! map {
($($map:tt)+) => {
json!($($map)+).as_object().unwrap().clone()
};
}
macro_rules! change {
($key:literal, None, None) => {
StorageValueChange {
key: $key.to_string(),
old_value: None,
new_value: None,
};
};
($key:literal, $old:tt, None) => {
StorageValueChange {
key: $key.to_string(),
old_value: Some(json!($old)),
new_value: None,
}
};
($key:literal, None, $new:tt) => {
StorageValueChange {
key: $key.to_string(),
old_value: None,
new_value: Some(json!($new)),
};
};
($key:literal, $old:tt, $new:tt) => {
StorageValueChange {
key: $key.to_string(),
old_value: Some(json!($old)),
new_value: Some(json!($new)),
}
};
}
macro_rules! changes {
( $( $change:expr ),* ) => {
{
let mut changes = StorageChanges::new();
$(
changes.push($change);
)*
changes
}
};
}
#[test]
fn test_incoming_populates_staging() -> Result<()> {
let db = new_syncable_mem_db();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let incoming = json! {[
{
"id": "guidAAAAAAAA",
"extId": "ext1@example.com",
"data": json!({"foo": "bar"}).to_string(),
}
]};
stage_incoming(&tx, &array_to_incoming(incoming), &NeverInterrupts)?;
// check staging table
assert_eq!(
ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"),
1
);
Ok(())
}
#[test]
fn test_fetch_incoming_state() -> Result<()> {
let db = new_syncable_mem_db();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
// Start with an item just in staging.
tx.execute(
r#"
INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
VALUES ('guid', 'ext_id', '{"foo":"bar"}')
"#,
[],
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(incoming[0].0, SyncGuid::new("guid"),);
assert_eq!(
incoming[0].1,
IncomingState::IncomingOnlyData {
ext_id: "ext_id".to_string(),
data: map!({"foo": "bar"}),
}
);
// Add the same item to the mirror.
tx.execute(
r#"
INSERT INTO storage_sync_mirror (guid, ext_id, data)
VALUES ('guid', 'ext_id', '{"foo":"new"}')
"#,
[],
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(
incoming[0].1,
IncomingState::NotLocal {
ext_id: "ext_id".to_string(),
incoming: DataState::Exists(map!({"foo": "bar"})),
mirror: DataState::Exists(map!({"foo": "new"})),
}
);
// and finally the data itself - might as use the API here!
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(
incoming[0].1,
IncomingState::Everywhere {
ext_id: "ext_id".to_string(),
incoming: DataState::Exists(map!({"foo": "bar"})),
local: DataState::Exists(map!({"foo": "local"})),
mirror: DataState::Exists(map!({"foo": "new"})),
}
);
Ok(())
}
// Like test_fetch_incoming_state, but check NULLs are handled correctly.
#[test]
fn test_fetch_incoming_state_nulls() -> Result<()> {
let db = new_syncable_mem_db();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
// Start with a tombstone just in staging.
tx.execute(
r#"
INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
VALUES ('guid', NULL, NULL)
"#,
[],
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,);
// Add the same item to the mirror (can't store an ext_id for a
// tombstone in the mirror as incoming tombstones never have it)
tx.execute(
r#"
INSERT INTO storage_sync_mirror (guid, ext_id, data)
VALUES ('guid', NULL, NULL)
"#,
[],
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone);
tx.execute(
r#"
INSERT INTO storage_sync_data (ext_id, data)
VALUES ('ext_id', NULL)
"#,
[],
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(
incoming[0].1,
// IncomingOnly* seems a little odd, but it is because we can't
// tie the tombstones together due to the lack of any ext-id/guid
// mapping in this case.
IncomingState::IncomingOnlyTombstone
);
Ok(())
}
// apply_action tests.
#[derive(Debug, PartialEq)]
struct LocalItem {
data: DataState,
sync_change_counter: i32,
}
fn get_local_item(conn: &Connection) -> Option<LocalItem> {
conn.try_query_row::<_, Error, _, _>(
"SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'",
[],
|row| {
let data = json_map_from_row(row, "data")?;
let sync_change_counter = row.get::<_, i32>(1)?;
Ok(LocalItem {
data,
sync_change_counter,
})
},
true,
)
.expect("query should work")
}
fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> {
// no custom deserialize for storagechanges and we only need it for
// tests, so do it manually.
conn.try_query_row::<_, Error, _, _>(
"SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'",
[],
|row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?),
true,
)
.expect("query should work")
.map(|val: serde_json::Value| {
let ob = val.as_object().expect("should be an object of items");
let mut result = StorageChanges::with_capacity(ob.len());
for (key, val) in ob.into_iter() {
let details = val.as_object().expect("elts should be objects");
result.push(StorageValueChange {
key: key.to_string(),
old_value: details.get("oldValue").cloned(),
new_value: details.get("newValue").cloned(),
});
}
result
})
}
fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) {
let guid = SyncGuid::new("guid");
apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply");
}
#[test]
fn test_apply_actions() -> Result<()> {
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("connection should be retrieved");
// DeleteLocally - row should be entirely removed.
let tx = conn
.unchecked_transaction()
.expect("transaction should begin");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
json!({"foo": "local"})
);
let changes = changes![change!("foo", "local", None)];
do_apply_action(
&tx,
IncomingAction::DeleteLocally {
ext_id: "ext_id".to_string(),
changes: changes.clone(),
},
);
assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({}));
// and there should not be a local record at all.
assert!(get_local_item(&tx).is_none());
assert_eq!(get_applied_item_changes(&tx), Some(changes));
tx.rollback()?;
// TakeRemote - replace local data with remote and marked as not dirty.
let tx = conn
.unchecked_transaction()
.expect("transaction should begin");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
json!({"foo": "local"})
);
// data should exist locally with a change recorded.
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "local"})),
sync_change_counter: 1
})
);
let changes = changes![change!("foo", "local", "remote")];
do_apply_action(
&tx,
IncomingAction::TakeRemote {
ext_id: "ext_id".to_string(),
data: map!({"foo": "remote"}),
changes: changes.clone(),
},
);
// data should exist locally with the remote data and not be dirty.
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "remote"})),
sync_change_counter: 0
})
);
assert_eq!(get_applied_item_changes(&tx), Some(changes));
tx.rollback()?;
// Merge - like ::TakeRemote, but data remains dirty.
let tx = conn
.unchecked_transaction()
.expect("transaction should begin");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
json!({"foo": "local"})
);
// data should exist locally with a change recorded.
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "local"})),
sync_change_counter: 1
})
);
let changes = changes![change!("foo", "local", "remote")];
do_apply_action(
&tx,
IncomingAction::Merge {
ext_id: "ext_id".to_string(),
data: map!({"foo": "remote"}),
changes: changes.clone(),
},
);
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "remote"})),
sync_change_counter: 2
})
);
assert_eq!(get_applied_item_changes(&tx), Some(changes));
tx.rollback()?;
// Same - data stays the same but is marked not dirty.
let tx = conn
.unchecked_transaction()
.expect("transaction should begin");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
json!({"foo": "local"})
);
// data should exist locally with a change recorded.
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "local"})),
sync_change_counter: 1
})
);
do_apply_action(
&tx,
IncomingAction::Same {
ext_id: "ext_id".to_string(),
},
);
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "local"})),
sync_change_counter: 0
})
);
assert_eq!(get_applied_item_changes(&tx), None);
tx.rollback()?;
Ok(())
}
}