Using BerkeleyDB to Store Serialized Objects in C++
I’ve been working on a C++ interface to Berkeley DB to support some ongoing projects. The goal is to create an interface that allows me to store serialized C++ objects in a database. Each object is flattened into a text buffer, and used as either a key or a value in a Berkeley DB table. In addition, we want to use all the features of the database, including ACID, and transaction processing.
We’re using Berkeley DB instead of a database server like MySQL because when implemented correctly it can be much faster. When you use a database server you have all the overhead of network operations. In addition, with BerkelyDB it is much simpler to put binary objects into the database, and still access the data elements in your structures as database keys. With a straight SQL implementation, anything you want to use as a key needs to be extracted as a separate database record field.
Our Database Interface Usage Example
First lets look at how the interface works, then we can work though how something like this is implemented.
BDBConnector db( "testEnv", "test.db" ); db.createTable( "testTable" ); KeyObj key; ValueObj val; KxStreamObj keyStream; KxStreamObj valStream; keyStream << key; valStream << val; db.put( "testTable", keyStream, valStream ); db.get( "testTable", keyStream, valStream ); valStream << val; db.del( "testTable", keyStream ); // supposing there were multiple values associated wih keyStream KxVector vals; db.get( "testTable", keyStream, vals );
You can see it works basically like a std::hash_map
. So why do this? Because we want safe, persistent storage of objects. In addition we want support for transactions so that we can group multiple changes to our database, and only have them go in if all of the operations succeed. We’re doing this to support online services, where partial transactions are common, and we don’t want our data corrupted.
So to support transaction processing in our example we want something that works like this:
BDBConnector db( "testEnv", "test.db" ); BDBTxn txn( db ); KeyObj key; ValueObj val1; ValueObj val2; ValueObj val3; KxStreamObj keyStream; KxStreamObj valStream; keyStream << key; valStream << val1; txn.put( "testTable", keyStream, valStream ); valStream.clear(); valStream << val2; txn.put( "testTable", keyStream, valStream ); txn.commit(); valStream.clear(); valStream << val3; txn.put( "testTable", keyStream, valStream ); txn.commit();
In this case none of the objects will enter the database until the call to commit()
. Note that unlike in regular Berkeley DB, our own transaction object, BDBTxn
, is reusable. Note also that both BDBConnector
and BDBTxn
support the same database api. BDBTxn
simply adds the commit()
method.
Implementing an object streaming system will be left for a future article. For now we are going to focus on just the database object implementation. All we need to know now about the streaming object, KxStreamObj
, is that it supports an interface something like this:
class KxStreamObj { // returns the data buffer containing the serialized object const s8* getBuffer() const; // returns the offset of the last byte written to the buffer, e.g. the size. u32 getWriteCursor() const; }; // streamable objects need to implement: friend KxStreamObj& operator<<( KxStreamObj& os, const KeyObj& object ); friend KxStreamObj& operator>>( KxStreamObj& os, KeyObj& object );
Our Database Interface
We start by defining an abstract base class that both BDBTxn
and BDBConnector
will use:
class BDBIface { public: virtual bool put( KxSymbol table, KxSerialObj key, KxSerialObj& val, bool update = true ) = NULL; virtual bool del( KxSymbol table, KxSerialObj& key ) = NULL; virtual bool get( KxSymbol table, KxSerialObj& key, KxSerialObj& val ) = NULL; virtual bool get( KxSymbol table, KxSerialObj& key, KxVector& vals ) = NULL; virtual bool createTable( KxSymbol table, bool allowDuplicates = false ) = NULL; };
These are just some of the methods, but they are enough for now. KxSymbol
, is just a string class. You could implement this all with const char*
or with std::String
. We are using our own string class in this example. SimilarlyKxVector
is our own basic templatized vector container like std::Vector
.
Now we define our database object itself:
class BDBConnector : public BDBIface { public: BDBConnector( KxSymbol env, KxSymbol dbFile ); ~BDBConnector(); bool put( KxSymbol table, KxSerialObj& key, KxSerialObj& val, bool update = true ); bool del( KxSymbol table, KxSerialObj& key ); bool get( KxSymbol table, KxSerialObj& key, KxSerialObj& val ); bool get( KxSymbol table, KxSerialObj& key, KxVector& vals ); bool createTable( KxSymbol table, bool allowDuplicates = false ); bool createIndex( KxSymbol index, KxSymbol table, BDBExtractorFunc extractor ); protected: friend class BDBTxn; Db* getTable_i( KxSymbol table ); Db* openTable_i( KxSymbol table, DbTxn* txn, bool allowDuplicates = false ); bool closeTable_i( Db* db ); DbTxn* startTxn_i(); bool createTable_i( KxSymbol table, DbTxn* txn, bool allowDuplicates = false ); bool createIndex_i( KxSymbol index, KxSymbol table, DbTxn* txn, BDBExtractorFunc extractor ); bool put_i( KxSymbol table, DbTxn* txn, KxSerialObj& key, KxSerialObj& val, bool update = true ); bool del_i( KxSymbol table, DbTxn* txn, KxSerialObj& key ); bool get_i( KxSymbol table, DbTxn* txn, KxSerialObj& key, KxSerialObj& val ); bool get_i( KxSymbol table, DbTxn* txn, KxSerialObj& key, KxVector& vals ); KxSymbol mEnvName; KxSymbol mDbFile; DbEnv mEnv; KxDict mTables; KxDict mIndexes; }; class BDBTxn : public BDBIface { public: BDBTxn( BDBConnector& dbc ); ~BDBTxn(); void commit(); void abort(); bool put( KxSymbol table, KxSerialObj& key, KxSerialObj& val, bool update = true ); bool del( KxSymbol table, KxSerialObj& key ); bool get( KxSymbol table, KxSerialObj& key, KxSerialObj& val ); bool get( KxSymbol table, KxSerialObj& key, KxVector& vals ); bool createTable( KxSymbol table, bool allowDuplicates = false ); bool createIndex( KxSymbol index, KxSymbol table, BDBExtractorFunc extractor ); protected: DbTxn* mTxn; BDBConnector* mDbc; bool mDirty; };
Some things to notice, both objects derive from BDBIface, so they both support the same database interfaces. Their public methods like put
and get
are short passthrough methods to the protected methods like get_i
and put_i
.
The BDBTxn
object is constructed with a reference to its related BDBConnector
object, and contains a pointer to it, along with a Berkeley DB DbTxn
handle. This is a lightweight class that delegates most of its work to BDBConnector
and provides some syntactic sugar to make using transactions a lot easier.
The BDBConnector
holds a Berkeley DB DbEnv
object, which is a handle into a set of Berkeley DB tables, and a few containers that hold handles into the Berkeley DB tables themselves. KxDict
is a lot like an stl::hash_map
. They simply map table names to table handles. We keep one for tables, and another for secondary indexes.
Initializing Databases and Tables
Starting the System
We’re going to open the database using the Berkley DB environment object, DbEnv
. This allows us to use multiple tables, and to have transactions. We’ve removed a lot of the error checking code for simplicity:
BDBConnector::BDBConnector( KxSymbol env, KxSymbol dbFile ) : mEnvName( env ), mDbFile( dbFile ), mEnv( 0 ) { mEnv.set_alloc( bdbcAlloc, bdbcRealloc, bdbcFree ); // initialize the database environment if ( mEnv.open( mEnvName.string(), // the name of the environment DB_CREATE | // create environment files if they don't exist DB_INIT_LOCK | // to support transaction processing. DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, // needed for multi-thread supported 0 ))) { assert(0); } }
By default Berkeley DB uses exception handling, which I find super annoying. We turn it off in the constructor to DbEnv
by passing it the DB_CXX_NO_EXCEPTIONS
flag.
Next we initialize our own memory management functions. bdbAlloc
and the the other functions are pass throughs to our own memory manager:
static void* bdbcAlloc( size_t size ) { return kxAlloc( size ); } static void* bdbcRealloc( void* mem, size_t size ) { return kxRealloc( mem, size ); } static void bdbcFree( void* mem ) { return kxFree( mem ); }
This is required to work correctly under Windows. In Windows, Berkeley DB runs in a dll that has its own heap. It is sometimes necessary to allocate memory in your application that gets passed to, and eventually freed by, Berkeley DB. Without this you would end up allocating memory on your local heap, and having Berkeley DB free it in its .dll heap, and crash your program. Defining your own entry points here will make the database use the application’s heap.
Non-trivial programs should probably use their own heap management functions anyway for efficiency.
Next we initialize DbEnv
. The first argument is the name of the environment. This is just the name of the directory where you want Berkeley DB to store all its files. This will include the database file itself, plus the lock files, and the transaction log. Temporary data like locks and transaction log are stored in separate files to avoid corrupting the database.
Creating Tables
This is how we create tables:
bool createTable( KxSymbol table, bool allowDuplicates = false ) { return createTable_i( table, NULL, allowDuplicates ); } bool BDBConnector::createTable_i( KxSymbol table, DbTxn* txn, bool allowDuplicates ) { Db* db = openTable_i( table, txn, allowDuplicates ); if ( !db ) return false; mTables.insert( table, db ); return true; } Db* BDBConnector::openTable_i( KxSymbol table, DbTxn* txn, bool allowDuplicates ) { // open the database Db* db = new Db( &mEnv, 0 ); if ( allowDuplicates ) db->set_flags( DB_DUP | DB_DUPSORT ); int flags = DB_CREATE | DB_THREAD ; // | DB_AUTO_COMMIT; if ( db->open( txn, mDbFile.string(), table.string(), DB_BTREE, flags, 0 )) { delete db; return NULL; } return db; }
Creating a table starts with a pass through function to createTable_i
. Note that for this version we pass in NULL
for the transaction object. This will cause the operation to happen immediately, and without locking the database. In a multi-threaded system, it might cause records to be saved or retrieved in an inconsistent state. The database itself should not corrupt, but your records might not end up the way you expect. Note that we can create tables that either accept duplicate keys or not.
createTable_i()
calls openTable_i()
, and saves it into our mapping of names and tables, so we can get opened table handles by name later. The separation between createTable_i()
and openTable_i()
will make more sense later when we introduce indexes.
openTable_i()
does all the work of actually opening table handles. We start by creating the handle. Right afterwards you can set a number of options that control the size of the cache, the string used by Berkeley Db’s error handlers, a desired error stream etc. ( e.g set_errpfx()
, set_pagesize()
) We’ll skip thse for now, but you may want to configure these for your purposes.
If we want this table to support duplicates, we add flags for that. DB_DUP
enables duplicates. DB_DUPSORT
causes duplicate entries to stay sorted. This may have performance implications if you end up with large numbers of values attached to the same key.
DB_CREATE
will cause the table to be created if it doesn’t exist already. DB_THREAD
enables thread support for this table.
According to the Berkeley DB documentation DB_AUTO_COMMIT
is required to support transaction processing. But when I enabled it, I get an error from Berkeley DB saying that it is incompatible with transactions:
BDB0632 DB_AUTO_COMMIT may not be specified along with a transaction handle
Finally we do the actual open call. We’re using DB_TREE
for all our tables, but there are several types with different attributes.
Shutdown
Now to shutdown the system, we put it all in the destructor. When BDBConnector goes out of scope everything is shut down cleanly:
BDBConnector::~BDBConnector() { for ( u32 i = 0; i < mIndexes.size(); i++ ) { Db* db = mIndexes.valueIdx( i ); closeTable_i( db ); } mIndexes.clear(); for ( u32 i = 0; i < mTables.size(); i++ ) { Db* db = mTables.valueIdx( i ); closeTable_i( db ); } mTables.clear(); mEnv.close( 0 ); } bool BDBConnector::closeTable_i( Db* db ) { if ( db->close( 0 ) ) return false; delete db; return true; }
Basically we iterate over our two collections of tables, and close them, and then we close the environment object.
Normal Database Operations
Berkeley DB supports a large number of database operations. They can all be added to our BDBIface
. We’ll just go through a few basic examples to get a sense of how this all works.
First adding records to the database:
Db* BDBConnector::getTable_i( KxSymbol table ) { if ( mTables.isDefined( table )) return mTables.valueKey( table ); if ( mIndexes.isDefined( table )) return mIndexes.valueKey( table ); return NULL; } bool put( KxSymbol table, KxSerialObj& key, KxSerialObj& val, bool update ) { return put_i( table, NULL, key, val, update ); } bool BDBConnector::put_i( KxSymbol table, DbTxn* txn, KxSerialObj& key, KxSerialObj& val, bool update ) { Db* db = getTable_i( table ); if ( !db ) return false; Dbt keyd( key.getBuffer(), key.getWriteCursor() ); Dbt vald( val.getBuffer(), val.getWriteCursor() ); int ret; int flags = 0; if ( !update ) flags |= DB_NOOVERWRITE; if (( ret = db->put( txn, &keyd, &vald, flags ))) { if ( ret == DB_KEYEXIST ) return false; assert(0); return false; } return true; }
getTable_i()
will retrieve an open table handle from the collection of tables or indexes.
put()
is just a passthough that sets the DbTxn
to NULL
.
put_i()
does the actual work. First get the named table handle.
Keys and values in Berkeley DB are stored in Dbt
objects, which are basically just flat buffers of data. Our KxSerialObj
objects, hold our serialized objects in flat data buffers as well. So the next step is simply to construct the Dbt
objects from our buffers. The Dbt
constructor will hold a pointer to our own buffers for now, so we don’t pay a cost for a copy or allocation just yet. New buffers will get allocated in db->put()
, so until then it is important that the data in the serialized objects is not modified.
Finally we add the keys to the database. If an error occurs db->put()
returns non-zero. It will return DB_KEYEXIST
on duplicate keys if the table does not accept duplicates. We consider this to be a soft error. We will return false, but not trigger any error handling. Other errors are more serious. Where I have the assert()
you should implement some more robust error handling.
The following methods show some more database operations:
bool BDBConnector::del_i( KxSymbol table, DbTxn* txn, KxSerialObj& key ) { Db* db = getTable_i( table ); if ( !db ) return false; Dbt keyd( key.getBuffer(), key.getWriteCursor() ); int ret; if (( ret = db->del( txn, &keyd, 0 ))) { if ( ret == DB_NOTFOUND ) return false; assert(0); return false; } return true; } bool BDBConnector::get_i( KxSymbol table, DbTxn* txn, KxSerialObj& key, KxSerialObj& val ) { val.clear(); Db* db = getTable_i( table ); if ( !db ) return false; Dbt keyd( key.getBuffer(), key.getWriteCursor() ); Dbt vald; Dbc* dbc; db->cursor( txn, &dbc, 0 ); int ret; if (( ret = dbc->get( &keyd, &vald, DB_SET ))) { if ( ret == DB_NOTFOUND ) { dbc->close(); return false; } assert(0); } else { val.write( (s8*)vald.get_data(), vald.get_size() ); } dbc->close(); return true; } bool BDBConnector::get_i( KxSymbol table, DbTxn* txn, KxSerialObj& key, KxVector& vals ) { Db* db = getTable_i( table ); if ( !db ) return false; Dbt keyd( key.getBuffer(), key.getWriteCursor() ); Dbt vald; Dbc* dbc; db->cursor( txn, &dbc, 0 ); // get the first one if ( !dbc->get( &keyd, &vald, DB_SET )) { KxSerialObj* obj = new KxSerialObj; obj->write( (s8*)vald.get_data(), vald.get_size() ); vals.insert( obj ); } // get any additional duplicates while( !dbc->get( &keyd, &vald, DB_NEXT_DUP )) { KxSerialObj* obj = new KxSerialObj; obj->write( (s8*)vald.get_data(), vald.get_size() ); vals.insert( obj ); } dbc->close(); return true; }
del_i()
is self explanatory.
The first get_i()
method will return the first value that matches a key. We do this using a Berkeley DB Cursor, Dbc
. We create it, and then call dbc->get()
on it. The DB_SET
flag tells it to get the value for the named key. And finally we copy the data out of the returned Dbt
into our KxSerialObj
. If there are multiple values for this key we will only return the first one.
The second get_i()
method supports multiple values per key. We pass in a templatized array to hold them. In this case the KxSerialObj
are allocated with new
, so it will be up to the caller to delete them. The function proceeds like the first get_i()
, but we add a second loop that fetches duplicates. The while loop will continue until the function returns non-zero. Then we know there are no more duplicates.
Transaction Processing
Transactions allow Berkeley DB to group up database operations and to commit them to the database as a unit in a way that is reversible if the operation does not complete. It also allows temporary changes to be visible in the context of the transaction but not to other transactions, and it keeps other transactions from seeing any incomplete sets of changes.
It is implemented in Berkeley DB by creating a DbTxn
from the DbEnv
object, and then passing the transaction object into each call to a database operation.
Our BDBTxn
object creates DbTxn
objects on the fly to make it appear like we have re-usable transactions, and also to simplify transaction management.
We’ll only go over a few methods to give you a sense of how this works. First construction and destruction:
class BDBTxn : public BDBIface { public: BDBTxn( BDBConnector& dbc ); ~BDBTxn(); void commit(); void abort(); bool put( KxSymbol table, KxSerialObj& key, KxSerialObj& val, bool update = true ); bool get( KxSymbol table, KxSerialObj& key, KxSerialObj& val ); protected: DbTxn* mTxn; BDBConnector* mDbc; bool mDirty; }; BDBTxm::BDBTxn( BDBConnector& dbc ) : mTxn( NULL ), mDbc( &dbc ), mDirty( false ) { } BDBTxm::~BDBTxn() { if ( mTxn ) mTxn->abort(); }
We create the BDBTxn
simply storing a pointer to an associated BDBConnector
object. We also store a “dirty” flag. This flag starts out false. It gets set to true if we make a database operation that actually changes the database. If the transaction ends up being used exclusively in a read-only way, then we will abort the transaction instead of commiting it.
The actual DbTxn
objects are only created as needed. So initially we set this pointer to NULL
.
In the destructor, we will abort any transaction which is still pending at the time of destruction. So if for some reason a BDBTxn
object unexpectedly goes out of scope, any pending database operations on it are reversed.
Now we look at two example database operations, a read only one, and one that modifies the database:
bool BDBTxn::put( KxSymbol table, KxSerialObj& key, KxSerialObj& val, bool update ) { if ( !mTxn ) mTxn = mDbc->startTxn_i(); mDirty = true; return mDbc->put_i( table, mTxn, key, val, update ); } bool BDBTxn::get( KxSymbol table, KxSerialObj& key, KxSerialObj& val ) { if ( !mTxn ) mTxn = mDbc->startTxn_i(); return mDbc->get_i( table, mTxn, key, val ); } DbTxn* BDBConnector::startTxn_i() { DbTxn* txn; if ( mEnv.txn_begin( NULL, &txn, 0 )) return NULL; return txn; }
In any database operation call, we first check to see if we have an active DbTxn
. If not, we create one with startTxn_i
. This function calls the DbEnv
in BDBConnector
to create a new transaction with txn_begin()
.
For an operation that modifies the database, we set the dirty flag. For all database operations we call one of the protected database operation functions in BDBConnector
, but this time setting the DbTxn
argument to the active transaction.
And finally we implement commit()
and abort()
:
void BDBTxn::commit() { assert( mTxn ); if ( mTxn ) { if ( mDirty ) mTxn->commit(0); else mTxn->abort(); mTxn = NULL; } } void BDBTxn::abort() { assert( mTxn ); if ( mTxn ) mTxn->abort(); mTxn = NULL; }
In both cases first we check if there is an active DbTxn
object. If not we ignore the operation.
DbTxn
objects are not re-usable after commit()
or abort()
have been called on them, so in both cases we clear the mTxn
pointer. This way a new DbTxn
will be created on the next database operation.
For commit()
calls, we check if the transaction has actually modified the database to see if we must commit the transaction, or if it is enough to just abort it. I’m not sure if this is strictly necessary, it could be that Berkeley DB already does this.
Secondary Indexes
At its core Berkeley DB is very simple, it just provides fast access to a mapping of keys to values. But sometimes you want to be able to look up values by more than one key type. For example you could have a database of user records that you want to look up by record number or user name. For this you need to create additional indexes.
In Berkeley DB you do this by creating a new table and associating it with an existing table. The new index table contains the new key type as the key for the index, and the database index into the primary table as the value in the secondary index table. Then you “associate” the secondary index with the primary table by calling Berkeley DB’s associate()
function. From that point on, Berkeley DB will manage the secondary index automatically. Under normal operation, once created, the secondary index is something you only ever need to read, never write.
To support this you only need to create a “key extractor” function. This is a function that for a given key or value in the primary table returns a key to be used in the secondary index table. For example, if the primary table holds user records indexed by user record number, to create a secondary index by user name you need to implement a function that given a user record returns the record name.
We start by creating a typedef for the extractor function prototype:
typedef int (BDBExtractorFunc) ( Db*, const Dbt*, const Dbt*, Dbt* );
And now the function that creates secondary indexes:
bool BDBConnector::createIndex_i( KxSymbol index, KxSymbol table, DbTxn* txn, BDBExtractorFunc extractor ) { Db* tdb = getTable_i( table ); if ( !tdb ) return false; Db* idb = openTable_i( index, txn, true ); if ( !idb ) return false; mIndexes.insert( index, idb ); if ( tdb->associate( txn, idb, extractor, 0 )) { closeTable_i( idb ); return false; } return true; }
We give it the name of the primary table, and the name of the desired secondary index. The primary table must already exist. This function will create the secondary index table, and you provide a pointer to the key extractor function.
Once created, you can retrieve records from the index table, just like you would from the primary table, except that queries into the secondary index use the secondary key, but return values from the primary table. For example:
db.createTable( "users" ); db.createIndex( "usersByName", "users", nameExtractor ); KxSerialObj userNumber; KxSerialObj userRecord; // insert a user by record number db.put( "users", userNumber, userRecord ); // but retrieve it by name KxSerialObj userName; db.get( "usersByName", userName, useRecord );
Here is the prototype of our name extractor function:
int nameExtractor( Db* db, const Dbt* pkey, const Dbt* pdata, Dbt* skey );
pkey
and pdata
hold the key and value for a given record in the primary table that Berkeley DB needs to map into the index table. nameExtractor()
needs to fill in skey
with the user name.
This can be done in two ways. In some cases you can simply store a pointer to data that is already in the primary table value. When this happens you don’t have to allocate any new data. So if the user name was a 64 byte field that is 20 bytes into a user record you would implement the extractor something like this:
int nameExtractor( Db* db, const Dbt* pkey, const Dbt* pdata, Dbt* skey ) { skey->set_data( pdata->get_data() + 20 ); skey->set_size( 64 ); return 0; }
In principle, we could do the same thing, if we knew the offsets for different parts of our serialized objects. But this requires that we encode knowledge of the serializer’s format into all our extractors. If we ever changed the serialization format, it would break all our extractors. So in our case we will want to un-serialize the objects, find the data we need, and allocate new buffers into skey
:
int nameExtractor( Db* db, const Dbt* pkey, const Dbt* pdata, Dbt* skey ) { // extract the user record from pdata UserRecord rec; KxSerialObj obj; obj.write( (s8*)pdata->get_data(), pdata->get_size()); obj >> rec; int nameLen = strlen( rec.name() ); void* data = malloc( nameLen + 1 ); memcpy( data, rec.name(), nameLen + 1 ); skey->set_data( data ); skey->set_size( nameLen + 1 ); skey->set_flags( DB_DBT_APPMALLOC ); return 0; }
Because we allocated memory ourselves, we need to set the DB_DBT_APPMALLOC
flag in skey
. This tells Berkeley DB to free the memory when it is done with this object.
This is why we need to set our own memory management functions back when we created the DbEnv
object. If not, then at least on Windows, the program will immediately crash. In addition, on a heavily loaded system, these memory allocations would happen very often. If we insisted on using naked calls to malloc()
this would become a performance issue.
Resources:
Berkeley DB C++ API Reference | Oracle’s official Berkeley DB API reference |
Getting Started with Berkely DB | A really complete book on working with Berkeley DB |
Implementing Key Extractors | From the book above, the section on implementing key extractors |
Getting Started with Transactional Processing for C++ | A whitepaper on using Berkeley DB’s transactional processing |
Berkeley DB Reference Guide: Secondary Indices | A tutorial in implementing Berkeley DB Secondary Indices in C++ |
DB->set_malloc | Has an important note on memory allocation under Windows |