From 8a3f1e53421fe157fdecb3f0a1cf620277ea08e7 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Tue, 12 May 2026 10:48:44 +0800 Subject: [PATCH] Fix inventory flushing - it's a combo of handling a tuple and pickling of raw data, and is triggered under rare occasions --- src/helper_startup.py | 9 +- src/storage/sqlite.py | 7 +- src/tests/test_config_process.py | 4 +- src/tests/test_inventory_flush.py | 166 ++++++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 3 deletions(-) create mode 100644 src/tests/test_inventory_flush.py diff --git a/src/helper_startup.py b/src/helper_startup.py index 52e1bf7ae4..541dab7206 100644 --- a/src/helper_startup.py +++ b/src/helper_startup.py @@ -32,6 +32,11 @@ logger = logging.getLogger('default') +#: The latest version of the keys.dat settings schema. Bump this +#: when adding a new migration step in :func:`updateConfig` or +#: :class:`class_sqlThread.sqlThread`. +LATEST_SETTINGS_VERSION = 10 + # The user may de-select Portable Mode in the settings if they want # the config files to stay in the application data folder. StoreConfigFilesInSameDirectoryAsProgramByDefault = False @@ -72,7 +77,9 @@ def loadConfig(): # no config file (or it cannot be accessed). Create config file. # config.add_section('bitmessagesettings') config.read() - config.set('bitmessagesettings', 'settingsversion', '10') + config.set( + 'bitmessagesettings', 'settingsversion', + str(LATEST_SETTINGS_VERSION)) if 'linux' in sys.platform: config.set('bitmessagesettings', 'minimizetotray', 'false') # This isn't implimented yet and when True on diff --git a/src/storage/sqlite.py b/src/storage/sqlite.py index eb5df098d8..e6d2bba0c6 100644 --- a/src/storage/sqlite.py +++ b/src/storage/sqlite.py @@ -109,7 +109,12 @@ def flush(self): for objectHash, value in self._inventory.items(): sql.execute( 'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', - sqlite3.Binary(objectHash), *value) + sqlite3.Binary(objectHash), + value.type, + value.stream, + sqlite3.Binary(bytes(value.payload)), + value.expires, + sqlite3.Binary(bytes(value.tag))) self._inventory.clear() def clean(self): diff --git a/src/tests/test_config_process.py b/src/tests/test_config_process.py index 9322a2f07f..daaf88bba9 100644 --- a/src/tests/test_config_process.py +++ b/src/tests/test_config_process.py @@ -5,6 +5,7 @@ import os import tempfile from pybitmessage.bmconfigparser import config +from pybitmessage.helper_startup import LATEST_SETTINGS_VERSION from .test_process import TestProcessProto from .common import skip_python3 @@ -22,7 +23,8 @@ def test_config_defaults(self): config.read(os.path.join(self.home, 'keys.dat')) self.assertEqual(config.safeGetInt( - 'bitmessagesettings', 'settingsversion'), 10) + 'bitmessagesettings', 'settingsversion'), + LATEST_SETTINGS_VERSION) self.assertEqual(config.safeGetInt( 'bitmessagesettings', 'port'), 8444) # don't connect diff --git a/src/tests/test_inventory_flush.py b/src/tests/test_inventory_flush.py new file mode 100644 index 0000000000..de96539d3c --- /dev/null +++ b/src/tests/test_inventory_flush.py @@ -0,0 +1,166 @@ +"""Tests for SqliteInventory.flush()""" +# pylint: disable=protected-access,wrong-import-order,wrong-import-position +# pylint: disable=import-outside-toplevel + +import os +import struct +import tempfile +import threading +import time + +from .common import skip_python3 +from .partial import TestPartialRun + +skip_python3() + + +class TestInventoryFlush(TestPartialRun): + """ + Integration test: exercises flush() end-to-end with the real sqlThread + consumer running, so that type errors in parameter binding surface here + rather than silently killing a production thread. + """ + + @classmethod + def setUpClass(cls): + os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir() + super(TestInventoryFlush, cls).setUpClass() + + import helper_sql + from bmconfigparser import config, config_ready + from class_sqlThread import sqlThread + from helper_startup import LATEST_SETTINGS_VERSION + from storage.sqlite import SqliteInventory + + cls._sqlStoredProcedure = staticmethod(helper_sql.sqlStoredProcedure) + + # sqlThread.run() waits on config_ready and then reads + # settingsversion; normally helper_startup.loadConfig() handles + # both, but TestPartialRun only calls config.read() which loads + # default.ini (no settingsversion). Set the minimum the + # sqlThread needs so it can initialise the database. + if not config.has_option( + 'bitmessagesettings', 'settingsversion'): + config.set( + 'bitmessagesettings', 'settingsversion', + str(LATEST_SETTINGS_VERSION)) + config_ready.set() + + # test_api_thread replaces helper_sql.sql_ready with a mock + # that only has wait(); restore a real Event so sqlThread can + # call .set() on it. In Python 2 threading.Event is a factory + # function, not a class, so we duck-type the check. + cls._original_sql_ready = helper_sql.sql_ready + if not hasattr(helper_sql.sql_ready, 'set'): + helper_sql.sql_ready = threading.Event() + + sql_lookup = sqlThread() + sql_lookup.daemon = True + sql_lookup.start() + helper_sql.sql_ready.wait() + cls.inventory = SqliteInventory() + + @classmethod + def tearDownClass(cls): + import helper_sql + from bmconfigparser import config_ready + + cls._sqlStoredProcedure('exit') + for thread in threading.enumerate(): + if thread.name == "SQL": + thread.join(timeout=10) + helper_sql.sql_ready = cls._original_sql_ready + # Reset config to default.ini so added settingsversion does + # not leak into subsequent tests. Also clear config_ready + # since it is a one-shot event set by loadConfig(). + cls.config.read() + config_ready.clear() + super(TestInventoryFlush, cls).tearDownClass() + + # -- helpers ---------------------------------------------------------- + + @staticmethod + def _make_hash(seed): + """Return a 32-byte hash derived from *seed*.""" + return (b'\x00' * 31 + bytes([seed & 0xFF]))[-32:] + + def _flush_and_check(self, obj_hash): + """ + Flush the inventory to the database, clear the _objects lookup + cache so that __contains__ is forced to hit sqlite, then verify + the hash is found via the normal inventory API. + """ + self.inventory.flush() + self.inventory._objects.clear() + self.assertIn(obj_hash, self.inventory) + + # -- test cases ------------------------------------------------------- + + def test_flush_with_bytes_payload(self): + """Baseline: payload and tag are plain bytes.""" + h = self._make_hash(1) + self.inventory[h] = ( + 2, 1, b'\x80\x01' + os.urandom(64), + int(time.time()) + 3600, b'\xff' * 32) + self._flush_and_check(h) + + def test_flush_with_memoryview_payload(self): + """ + Reproduce the production crash: payload and tag as memoryview + cause 'Error binding parameter 3 - probably unsupported type.' + """ + h = self._make_hash(2) + self.inventory[h] = ( + 2, 1, memoryview(b'\x80\x02' + os.urandom(64)), + int(time.time()) + 3600, memoryview(b'\xee' * 32)) + self._flush_and_check(h) + + def test_flush_with_bytearray_payload(self): + """bytearray is another bytes-like type that could trip sqlite3.""" + h = self._make_hash(3) + self.inventory[h] = ( + 2, 1, bytearray(b'\x80\x03' + os.urandom(64)), + int(time.time()) + 3600, bytearray(b'\xdd' * 32)) + self._flush_and_check(h) + + def test_flush_with_empty_tag(self): + """Empty tag (b'') must not break the INSERT.""" + h = self._make_hash(4) + self.inventory[h] = ( + 2, 1, b'\x80\x04' + os.urandom(64), + int(time.time()) + 3600, b'') + self._flush_and_check(h) + + # pylint: disable=redefined-variable-type + def test_flush_multiple_mixed_types(self): + """Flush a batch of items with mixed blob types.""" + count = 20 + hashes = [self._make_hash(0x10 + i) for i in range(count)] + expires = int(time.time()) + 3600 + + for i, h in enumerate(hashes): + payload = struct.pack('>I', i) + os.urandom(60) + tag = struct.pack('>I', i) + b'\x00' * 28 + if i % 3 == 0: + payload = memoryview(payload) + tag = memoryview(tag) + elif i % 3 == 1: + payload = bytearray(payload) + tag = bytearray(tag) + self.inventory[h] = (2, 1, payload, expires, tag) + + self.inventory.flush() + self.inventory._objects.clear() + + for i, h in enumerate(hashes): + self.assertIn( + h, self.inventory, + "Item {} missing after batch flush".format(i)) + + def test_flush_clears_memory_cache(self): + """After flush the in-memory _inventory dict must be empty.""" + h = self._make_hash(0xF0) + self.inventory[h] = ( + 2, 1, b'\x00' * 32, int(time.time()) + 3600, b'') + self.inventory.flush() + self.assertEqual(len(self.inventory._inventory), 0)