1
Fork 0
mirror of https://github.com/SapphireServer/Sapphire.git synced 2025-04-27 22:57:45 +00:00

New mysql system implemented, strongly borrowed from TrinityCore

This commit is contained in:
Mordred 2017-09-30 23:51:01 +02:00
parent 1a749d820b
commit 3d113feb28
19 changed files with 1566 additions and 98 deletions

View file

@ -14,6 +14,8 @@
<Username>root</Username>
<Pass></Pass>
<Database>sapphire</Database>
<SyncThreads>2</SyncThreads>
<AsyncThreads>2</AsyncThreads>
</Mysql>
</General>

@ -1 +1 @@
Subproject commit 5512f4d48186cf0796449215a5b03d3229ec9b31
Subproject commit 5a9881aee513b1daed1598755225fb2a5b59221b

View file

@ -0,0 +1,24 @@
#include "CharaDbConnection.h"
#include <libraries/sapphire/mysqlConnector/MySqlConnector.h>
Core::Db::CharaDbConnection::CharaDbConnection( ConnectionInfo& connInfo ) : DbConnection( connInfo )
{
}
Core::Db::CharaDbConnection::CharaDbConnection( Core::LockedWaitQueue< Operation * >* q,
ConnectionInfo& connInfo) : DbConnection( q, connInfo )
{
}
Core::Db::CharaDbConnection::~CharaDbConnection()
{
}
void Core::Db::CharaDbConnection::doPrepareStatements()
{
if( !m_reconnecting )
m_stmts.resize( MAX_STATEMENTS );
prepareStatement( CHAR_INS_TEST, "INSERT INTO zoneservers ( id, ip, port ) VALUES ( ?, ?, ?);", CONNECTION_BOTH );
}

View file

@ -0,0 +1,36 @@
#ifndef SAPPHIRE_CHARACONNECTION_H
#define SAPPHIRE_CHARACONNECTION_H
#include "DbConnection.h"
namespace Core
{
namespace Db
{
class DbConnectionInfo;
enum CharaDbStatements : uint32_t
{
CHAR_INS_TEST,
MAX_STATEMENTS
};
class CharaDbConnection : public DbConnection
{
public:
typedef CharaDbStatements Statements;
CharaDbConnection( ConnectionInfo& connInfo );
CharaDbConnection( Core::LockedWaitQueue< Operation* >* q, ConnectionInfo &connInfo );
~CharaDbConnection();
void doPrepareStatements() override;
};
}
}
#endif //SAPPHIRE_CHARACONNECTION_H

View file

@ -0,0 +1,244 @@
#include "DbConnection.h"
#include "DbWorker.h"
#include "src/libraries/sapphire/mysqlConnector/MySqlConnector.h"
#include "src/servers/Server_Common/Logging/Logger.h"
#include "PreparedStatement.h"
extern Core::Logger g_log;
Core::Db::DbConnection::DbConnection( ConnectionInfo &connInfo ) :
m_reconnecting( false ),
m_prepareError( false ),
m_queue( nullptr ),
m_pConnection( nullptr ),
m_connectionInfo( connInfo ),
m_connectionFlags( CONNECTION_SYNCH )
{
}
Core::Db::DbConnection::DbConnection( Core::LockedWaitQueue<Operation *>* queue, Core::Db::ConnectionInfo& connInfo ) :
m_reconnecting( false ),
m_prepareError( false ),
m_queue( queue ),
m_pConnection( nullptr ),
m_connectionInfo( connInfo ),
m_connectionFlags( CONNECTION_ASYNC )
{
m_worker = std::unique_ptr< DbWorker >( new DbWorker( m_queue, this ) );
}
Core::Db::DbConnection::~DbConnection()
{
close();
}
void Core::Db::DbConnection::close()
{
m_worker.reset();
m_stmts.clear();
if( m_pConnection )
{
m_pConnection->close();
m_pConnection.reset();
}
}
uint32_t Core::Db::DbConnection::open()
{
Mysql::MySqlBase base;
Mysql::optionMap options;
options[ MYSQL_OPT_RECONNECT ] = "1";
options[ MYSQL_SET_CHARSET_NAME ] = "utf8";
try
{
m_pConnection = std::shared_ptr< Mysql::Connection >( base.connect( m_connectionInfo.host,
m_connectionInfo.user,
m_connectionInfo.password,
options,
m_connectionInfo.port ) );
m_pConnection->setSchema( m_connectionInfo.database );
}
catch( std::runtime_error& e )
{
g_log.error( e.what() );
return 1;
}
return 0;
}
uint32_t Core::Db::DbConnection::getLastError()
{
return m_pConnection->getErrorNo();
}
void Core::Db::DbConnection::ping()
{
m_pConnection->ping();
}
bool Core::Db::DbConnection::lockIfReady()
{
return m_mutex.try_lock();
}
void Core::Db::DbConnection::unlock()
{
m_mutex.unlock();
}
void Core::Db::DbConnection::beginTransaction()
{
m_pConnection->beginTransaction();
}
void Core::Db::DbConnection::rollbackTransaction()
{
m_pConnection->rollbackTransaction();
}
void Core::Db::DbConnection::commitTransaction()
{
m_pConnection->commitTransaction();
}
bool Core::Db::DbConnection::execute( const std::string& sql )
{
try
{
Mysql::Statement* stmt( m_pConnection->createStatement() );
bool result = stmt->execute( sql );
return result;
}
catch( std::runtime_error& e )
{
g_log.error( e.what() );
return false;
}
}
Mysql::ResultSet *Core::Db::DbConnection::query( const std::string& sql )
{
try
{
Mysql::Statement* stmt( m_pConnection->createStatement() );
Mysql::ResultSet* result = stmt->executeQuery( sql );
return result;
}
catch( std::runtime_error& e )
{
g_log.error( e.what() );
return nullptr;
}
}
Mysql::ResultSet* Core::Db::DbConnection::query( Core::Db::PreparedStatement* stmt )
{
Mysql::ResultSet* res = nullptr;
if( !stmt )
return nullptr;
uint32_t index = stmt->getIndex();
Mysql::PreparedStatement* pStmt = getPreparedStatement( index );
if( !pStmt )
return nullptr;
stmt->setMysqlPS( pStmt );
try
{
stmt->bindParameters();
return pStmt->executeQuery();
}
catch( std::runtime_error& e )
{
g_log.error( e.what() );
return nullptr;
}
}
bool Core::Db::DbConnection::execute( Core::Db::PreparedStatement* stmt )
{
if( !stmt )
return false;
uint32_t index = stmt->getIndex();
Mysql::PreparedStatement* pStmt = getPreparedStatement( index );
if( !pStmt )
return false;
stmt->setMysqlPS( pStmt );
try
{
stmt->bindParameters();
return pStmt->execute();
}
catch( std::runtime_error& e )
{
g_log.error( e.what() );
return nullptr;
}
}
Mysql::PreparedStatement* Core::Db::DbConnection::getPreparedStatement( uint32_t index )
{
assert( index < m_stmts.size() );
Mysql::PreparedStatement* ret = m_stmts[index].get();
if( !ret )
nullptr;
return ret;
}
void Core::Db::DbConnection::prepareStatement( uint32_t index, const std::string &sql, Core::Db::ConnectionFlags flags )
{
m_queries.insert( PreparedStatementMap::value_type( index, std::make_pair( sql, flags ) ) );
// Check if specified query should be prepared on this connection
// i.e. don't prepare async statements on synchronous connections
// to save memory that will not be used.
if( !( m_connectionFlags & flags ) )
{
m_stmts[index].reset();
return;
}
Mysql::PreparedStatement* pStmt = nullptr;
try
{
pStmt = m_pConnection->prepareStatement( sql );
}
catch( std::runtime_error& e )
{
g_log.error( e.what() );
m_prepareError = true;
}
m_stmts[index] = std::unique_ptr< Mysql::PreparedStatement >( pStmt );
}
bool Core::Db::DbConnection::prepareStatements()
{
doPrepareStatements();
return !m_prepareError;
}

View file

@ -0,0 +1,107 @@
#ifndef _SAPPHIRE_DBCONNECTION_H
#define _SAPPHIRE_DBCONNECTION_H
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "src/servers/Server_Common/Util/LockedWaitQueue.h"
#include <Server_Common/Util/LockedWaitQueue.h>
namespace Mysql
{
class Connection;
class ResultSet;
class PreparedResultSet;
class PreparedStatement;
}
namespace Core
{
namespace Db
{
class DatabaseWorker;
class PreparedStatement;
class Operation;
class DbWorker;
enum ConnectionFlags
{
CONNECTION_ASYNC = 0x1,
CONNECTION_SYNCH = 0x2,
CONNECTION_BOTH = CONNECTION_ASYNC | CONNECTION_SYNCH
};
struct ConnectionInfo
{
std::string user;
std::string password;
std::string database;
std::string host;
uint16_t port;
uint8_t syncThreads;
uint8_t asyncThreads;
};
typedef std::map< uint32_t, std::pair< std::string, ConnectionFlags > > PreparedStatementMap;
class DbConnection
{
public:
// Constructor for synchronous connections.
DbConnection( ConnectionInfo& connInfo );
// Constructor for asynchronous connections.
DbConnection( Core::LockedWaitQueue< Operation* >* queue, ConnectionInfo& connInfo );
virtual ~DbConnection();
virtual uint32_t open();
void close();
bool prepareStatements();
bool execute( const std::string& sql );
bool execute( PreparedStatement* stmt );
Mysql::ResultSet* query( const std::string& sql );
Mysql::ResultSet* query( PreparedStatement* stmt );
void beginTransaction();
void rollbackTransaction();
void commitTransaction();
void ping();
uint32_t getLastError();
bool lockIfReady();
void unlock();
std::shared_ptr< Mysql::Connection > getConnection() { return m_pConnection; }
Mysql::PreparedStatement* getPreparedStatement( uint32_t index );
void prepareStatement( uint32_t index, const std::string& sql, ConnectionFlags flags );
virtual void doPrepareStatements() = 0;
protected:
std::vector< std::unique_ptr< Mysql::PreparedStatement > > m_stmts;
PreparedStatementMap m_queries;
bool m_reconnecting;
bool m_prepareError;
private:
LockedWaitQueue< Operation* >* m_queue;
std::unique_ptr< DbWorker > m_worker;
std::shared_ptr< Mysql::Connection > m_pConnection;
ConnectionInfo& m_connectionInfo;
ConnectionFlags m_connectionFlags;
std::mutex m_mutex;
DbConnection( DbConnection const& right ) = delete;
DbConnection& operator=( DbConnection const& right ) = delete;
};
}
}
#endif

View file

@ -0,0 +1,108 @@
#include "DbLoader.h"
#include <mysqld_error.h>
#include "CharaDbConnection.h"
#include "DbWorkerPool.h"
#include "src/servers/Server_Common/Logging/Logger.h"
extern Core::Logger g_log;
Core::Db::DbLoader::DbLoader()
{
}
template <class T>
Core::Db::DbLoader& Core::Db::DbLoader::addDb( Core::Db::DbWorkerPool< T >& pool, const ConnectionInfo& info )
{
m_open.push([this, info, &pool]() -> bool
{
const uint8_t asyncThreads = info.asyncThreads;
const uint8_t synchThreads = info.syncThreads;
if( asyncThreads < 1 || asyncThreads > 32 )
{
g_log.error( "database: invalid number of worker threads specified. Please pick a value between 1 and 32." );
return false;
}
pool.setConnectionInfo( info, asyncThreads, synchThreads );
if( uint32_t error = pool.open() )
{
// Database does not exist
if( error == ER_BAD_DB_ERROR )
{
return false;
}
if( error )
{
g_log.error( "DatabasePool failed to open." );
return false;
}
}
m_close.push( [&pool] { pool.close(); } );
return true;
});
m_prepare.push([this, info, &pool]() -> bool
{
if( !pool.prepareStatements() )
{
g_log.error( "Could not prepare statements of the database, see log for details." );
return false;
}
return true;
});
return *this;
}
bool Core::Db::DbLoader::initDbs()
{
if( !openDatabases() )
return false;
if( !prepareStatements() )
return false;
return true;
}
bool Core::Db::DbLoader::openDatabases()
{
return process( m_open );
}
bool Core::Db::DbLoader::prepareStatements()
{
return process( m_prepare );
}
bool Core::Db::DbLoader::process( std::queue< Predicate >& queue )
{
while( !queue.empty() )
{
if( !queue.front()() )
{
// Close all open databases which have a registered close operation
while( !m_close.empty() )
{
m_close.top()();
m_close.pop();
}
return false;
}
queue.pop();
}
return true;
}
template
Core::Db::DbLoader&
Core::Db::DbLoader::addDb< Core::Db::CharaDbConnection >( Core::Db::DbWorkerPool< Core::Db::CharaDbConnection >&,
const ConnectionInfo& );

View file

@ -0,0 +1,52 @@
#ifndef SAPPHIRE_DBLOADER_H
#define SAPPHIRE_DBLOADER_H
#include <stdint.h>
#include <functional>
#include <queue>
#include <stack>
#include <string>
#include "DbConnection.h"
namespace Core
{
namespace Db
{
template< class T >
class DbWorkerPool;
class DbLoader
{
public:
DbLoader();
template< class T >
DbLoader& addDb( DbWorkerPool< T >& pool, const ConnectionInfo& info );
bool initDbs();
enum DbTypeFlags
{
DATABASE_NONE = 0,
DATABASE_CHARACTER = 1,
DATABASE_MASK_ALL = DATABASE_CHARACTER
};
private:
bool openDatabases();
bool prepareStatements();
using Predicate = std::function< bool() >;
using Closer = std::function< void() >;
bool process( std::queue< Predicate >& queue );
std::queue< Predicate > m_open;
std::queue< Predicate > m_prepare;
std::stack< Closer > m_close;
};
}
}
#endif //SAPPHIRE_DBLOADER_H

View file

@ -0,0 +1,39 @@
#include "DbWorker.h"
#include "Operation.h"
#include <Server_Common/Util/LockedWaitQueue.h>
Core::Db::DbWorker::DbWorker( Core::LockedWaitQueue< Operation* >* newQueue, DbConnection* pConn )
{
m_pConn = pConn;
m_queue = newQueue;
m_cancelationToken = false;
m_workerThread = std::thread( &DbWorker::workerThread, this );
}
Core::Db::DbWorker::~DbWorker()
{
m_cancelationToken = true;
m_queue->cancel();
m_workerThread.join();
}
void Core::Db::DbWorker::workerThread()
{
if( !m_queue )
return;
while( true )
{
Operation* operation = nullptr;
m_queue->waitAndPop( operation );
if( m_cancelationToken || !operation )
return;
operation->setConnection( m_pConn );
operation->call();
delete operation;
}
}

View file

@ -0,0 +1,38 @@
#ifndef SAPPHIRE_DBWORKER_H
#define SAPPHIRE_DBWORKER_H
#include <atomic>
#include <thread>
#include <Server_Common/Util/LockedWaitQueue.h>
namespace Core
{
namespace Db
{
class DbConnection;
class Operation;
class DbWorker
{
public:
DbWorker( LockedWaitQueue< Operation* >* newQueue, DbConnection* connection );
~DbWorker();
private:
LockedWaitQueue< Operation* >* m_queue;
DbConnection* m_pConn;
void workerThread();
std::thread m_workerThread;
std::atomic< bool > m_cancelationToken;
DbWorker( DbWorker const& right ) = delete;
DbWorker& operator=( DbWorker const& right ) = delete;
};
}
}
#endif //SAPPHIRE_DBWORKER_H

View file

@ -0,0 +1,284 @@
#include "DbWorkerPool.h"
#include "DbConnection.h"
#include "PreparedStatement.h"
#include <libraries/sapphire/mysqlConnector/MySqlConnector.h>
#include "StatementTask.h"
#include "Operation.h"
#include "CharaDbConnection.h"
#include <Server_Common/Logging/Logger.h>
extern Core::Logger g_log;
class PingOperation : public Core::Db::Operation
{
bool execute() override
{
m_pConn->ping();
return true;
}
};
template< class T >
Core::Db::DbWorkerPool<T>::DbWorkerPool()
: m_queue( new Core::LockedWaitQueue< Operation* >() ),
m_asyncThreads( 0 ),
m_synchThreads( 0 )
{
}
template< class T >
Core::Db::DbWorkerPool< T >::~DbWorkerPool()
{
m_queue->cancel();
}
template< class T >
void Core::Db::DbWorkerPool<T>::setConnectionInfo( const ConnectionInfo& info,
uint8_t asyncThreads,
uint8_t synchThreads)
{
m_connectionInfo = info;
m_asyncThreads = asyncThreads;
m_synchThreads = synchThreads;
}
template< class T >
uint32_t Core::Db::DbWorkerPool< T >::open()
{
g_log.info( "[DbPool] Opening DatabasePool " + getDatabaseName() +
" Asynchronous connections: " + std::to_string( m_asyncThreads ) +
" Synchronous connections: " + std::to_string( m_synchThreads ) );
uint32_t error = openConnections( IDX_ASYNC, m_asyncThreads );
if( error )
return error;
error = openConnections( IDX_SYNCH, m_synchThreads );
if( !error )
{
g_log.info( "[DbPool] DatabasePool " + getDatabaseName() + " opened successfully. " +
std::to_string( ( m_connections[IDX_SYNCH].size() + m_connections[IDX_ASYNC].size() ) ) +
" total connections running." );
}
return error;
}
template< class T >
void Core::Db::DbWorkerPool< T >::close()
{
g_log.info("[DbPool] Closing down DatabasePool " + getDatabaseName() );
m_connections[IDX_ASYNC].clear();
m_connections[IDX_SYNCH].clear();
g_log.info("[DbPool] All connections on DatabasePool " + getDatabaseName() + "closed." );
}
template< class T >
bool Core::Db::DbWorkerPool<T>::prepareStatements()
{
for( auto& connections : m_connections )
for( auto& connection : connections )
{
connection->lockIfReady();
if( !connection->prepareStatements() )
{
connection->unlock();
close();
return false;
}
else
connection->unlock();
}
return true;
}
template< class T >
Mysql::ResultSet* Core::Db::DbWorkerPool<T>::query( const std::string& sql, T* connection )
{
if( !connection )
connection = getFreeConnection();
Mysql::ResultSet* result = connection->query( sql );
connection->unlock();
return result;
}
template< class T >
Mysql::ResultSet* Core::Db::DbWorkerPool<T>::query( PreparedStatement* stmt )
{
auto connection = getFreeConnection();
Mysql::ResultSet* ret = connection->query( stmt );
connection->unlock();
return ret;
}
template< class T >
Core::Db::PreparedStatement* Core::Db::DbWorkerPool< T >::getPreparedStatement( PreparedStatementIndex index )
{
return new PreparedStatement( index );
}
template< class T >
void Core::Db::DbWorkerPool< T >::escapeString( std::string& str )
{
if( str.empty() )
return;
char* buf = new char[str.size() * 2 + 1];
escapeString( buf, str.c_str(), str.size() );
str = buf;
delete[] buf;
}
template< class T >
void Core::Db::DbWorkerPool< T >::keepAlive()
{
for( auto& connection : m_connections[IDX_SYNCH] )
{
if( connection->lockIfReady() )
{
connection->ping();
connection->unlock();
}
}
const auto count = m_connections[IDX_ASYNC].size();
for( uint8_t i = 0; i < count; ++i )
enqueue( new PingOperation );
}
template< class T >
uint32_t Core::Db::DbWorkerPool< T >::openConnections( InternalIndex type, uint8_t numConnections )
{
for( uint8_t i = 0; i < numConnections; ++i )
{
// Create the connection
auto connection = [&] {
switch (type)
{
case IDX_ASYNC:
return std::unique_ptr<T>( new T( m_queue.get(), m_connectionInfo ) );
case IDX_SYNCH:
return std::unique_ptr<T>( new T( m_connectionInfo ) );
default:
return std::unique_ptr<T>();
}
}();
if( uint32_t error = connection->open() )
{
// Failed to open a connection or invalid version, abort and cleanup
m_connections[type].clear();
return error;
}
else
{
m_connections[type].push_back( std::move( connection ) );
}
}
return 0;
}
template< class T >
unsigned long Core::Db::DbWorkerPool< T >::escapeString( char *to, const char *from, unsigned long length )
{
if (!to || !from || !length)
return 0;
return mysql_real_escape_string(
m_connections[IDX_SYNCH].front()->getConnection()->getRawCon(), to, from, length);
}
template< class T >
void Core::Db::DbWorkerPool< T >::enqueue( Operation* op )
{
m_queue->push( op );
}
template< class T >
T* Core::Db::DbWorkerPool< T >::getFreeConnection()
{
uint8_t i = 0;
const auto numCons = m_connections[IDX_SYNCH].size();
T* connection = nullptr;
while( true )
{
connection = m_connections[IDX_SYNCH][i++ % numCons].get();
if (connection->lockIfReady())
break;
}
return connection;
}
template< class T >
const std::string& Core::Db::DbWorkerPool< T >::getDatabaseName() const
{
return m_connectionInfo.database;
}
template< class T >
void Core::Db::DbWorkerPool< T >::execute( const std::string& sql )
{
StatementTask* task = new StatementTask( sql );
enqueue( task );
}
template< class T >
void Core::Db::DbWorkerPool< T >::execute( PreparedStatement* stmt )
{
PreparedStatementTask* task = new PreparedStatementTask(stmt);
enqueue( task );
}
template< class T >
void Core::Db::DbWorkerPool< T >::directExecute( const std::string& sql )
{
T* connection = getFreeConnection();
connection->execute( sql );
connection->unlock();
}
template< class T >
void Core::Db::DbWorkerPool< T >::directExecute( PreparedStatement* stmt )
{
T* connection = getFreeConnection();
connection->execute( stmt );
connection->unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
}
/*
template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction& trans, const char* sql)
{
if (!trans)
Execute(sql);
else
trans->Append(sql);
}
template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt)
{
if (!trans)
Execute(stmt);
else
trans->Append(stmt);
}
*/
template class Core::Db::DbWorkerPool< Core::Db::CharaDbConnection >;
//template class TC_DATABASE_API DatabaseWorkerPool<LoginDatabaseConnection>;
//template class TC_DATABASE_API DatabaseWorkerPool<WorldDatabaseConnection>;
//template class TC_DATABASE_API DatabaseWorkerPool<CharacterDatabaseConnection>;

View file

@ -0,0 +1,89 @@
#ifndef SAPPHIRE_DBWORKERPOOL_H
#define SAPPHIRE_DBWORKERPOOL_H
#include <array>
#include <string>
#include <vector>
#include <ResultSet.h>
#include <Server_Common/Util/LockedWaitQueue.h>
#include "DbConnection.h"
namespace Core
{
namespace Db
{
template< typename T >
class LockedWaitQueue;
class Operation;
class PreparedStatement;
struct ConnectionInfo;
template< class T >
class DbWorkerPool
{
private:
enum InternalIndex
{
IDX_ASYNC,
IDX_SYNCH,
IDX_SIZE
};
public:
DbWorkerPool();
~DbWorkerPool();
void setConnectionInfo( const ConnectionInfo& info, uint8_t asyncThreads, uint8_t synchThreads);
uint32_t open();
void close();
bool prepareStatements();
inline ConnectionInfo getConnectionInfo() const
{
return m_connectionInfo;
}
// Async execution
void execute( const std::string& sql );
void execute( PreparedStatement* stmt );
// Sync execution
void directExecute( const std::string& sql );
void directExecute( PreparedStatement* stmt );
Mysql::ResultSet* query( const std::string& sql, T* connection = nullptr );
Mysql::ResultSet* query( PreparedStatement* stmt );
typedef typename T::Statements PreparedStatementIndex;
PreparedStatement* getPreparedStatement( PreparedStatementIndex index );
void escapeString( std::string& str );
void keepAlive();
private:
uint32_t openConnections( InternalIndex type, uint8_t numConnections );
unsigned long escapeString( char *to, const char *from, unsigned long length );
void enqueue( Operation* op );
T* getFreeConnection();
const std::string& getDatabaseName() const;
std::unique_ptr< Core::LockedWaitQueue< Operation* > > m_queue;
std::array< std::vector< std::unique_ptr< T > >, IDX_SIZE > m_connections;
ConnectionInfo m_connectionInfo;
uint8_t m_asyncThreads;
uint8_t m_synchThreads;
};
}
}
#endif //SAPPHIRE_DBWORKERPOOL_H

View file

@ -0,0 +1,41 @@
#ifndef SAPPHIRE_OPERATION_H
#define SAPPHIRE_OPERATION_H
namespace Mysql
{
class Connection;
}
namespace Core
{
namespace Db
{
class DbConnection;
class PreparedStatement;
class Operation
{
public:
Operation() : m_pConn( nullptr ) { }
virtual ~Operation() { }
virtual int call()
{
execute();
return 0;
}
virtual bool execute() = 0;
virtual void setConnection( DbConnection* pCon ) { m_pConn = pCon; }
DbConnection* m_pConn;
private:
Operation( Operation const& right ) = delete;
Operation& operator=( Operation const& right ) = delete;
};
}
}
#endif //SAPPHIRE_OPERATION_H

View file

@ -0,0 +1,149 @@
#include "PreparedStatement.h"
#include "DbConnection.h"
#include <MySqlConnector.h>
#include <sstream>
Core::Db::PreparedStatement::PreparedStatement( uint32_t index ) :
m_stmt( nullptr ),
m_index( index ) { }
Core::Db::PreparedStatement::~PreparedStatement() { }
void Core::Db::PreparedStatement::bindParameters()
{
assert( m_stmt );
uint8_t i = 1;
for( ; i < m_statementData.size(); i++ )
{
switch( m_statementData[i].type)
{
case TYPE_BOOL:
m_stmt->setBoolean( i, m_statementData[i].data.boolean );
break;
case TYPE_UI:
m_stmt->setUInt( i, m_statementData[i].data.ui32 );
break;
case TYPE_I:
m_stmt->setInt( i, m_statementData[i].data.i32 );
break;
case TYPE_UI64:
m_stmt->setUInt64( i, m_statementData[i].data.ui64 );
break;
case TYPE_I64:
m_stmt->setInt64( i, m_statementData[i].data.i64 );
break;
case TYPE_DOUBLE:
m_stmt->setDouble( i, m_statementData[i].data.d );
break;
case TYPE_STRING:
m_stmt->setString( i, std::string( reinterpret_cast< char* >( m_statementData[i].binary.data() ) ) );
break;
case TYPE_BINARY:
{
std::stringstream is;
is.rdbuf()->pubsetbuf( reinterpret_cast< char* >( &m_statementData[i].binary[0] ),
m_statementData[i].binary.size() );
m_stmt->setBlob( i, &is );
}
break;
case TYPE_NULL:
m_stmt->setNull( i, 0 );
break;
}
}
}
//- Bind to buffer
void Core::Db::PreparedStatement::setBool( uint8_t index, const bool value )
{
if( index >= m_statementData.size() )
m_statementData.resize( index + 1 );
m_statementData[index].data.boolean = value;
m_statementData[index].type = TYPE_BOOL;
}
void Core::Db::PreparedStatement::setUInt( uint8_t index, uint32_t value )
{
if( index >= m_statementData.size() )
m_statementData.resize(index+1);
m_statementData[index].data.ui32 = value;
m_statementData[index].type = TYPE_UI;
}
void Core::Db::PreparedStatement::setUInt64( uint8_t index, uint64_t value )
{
if( index >= m_statementData.size() )
m_statementData.resize( index + 1 );
m_statementData[index].data.ui64 = value;
m_statementData[index].type = TYPE_UI64;
}
void Core::Db::PreparedStatement::setInt( uint8_t index, int32_t value )
{
if( index >= m_statementData.size() )
m_statementData.resize( index + 1 );
m_statementData[index].data.i32 = value;
m_statementData[index].type = TYPE_I;
}
void Core::Db::PreparedStatement::setInt64( uint8_t index, int64_t value )
{
if( index >= m_statementData.size() )
m_statementData.resize(index+1);
m_statementData[index].data.i64 = value;
m_statementData[index].type = TYPE_I64;
}
void Core::Db::PreparedStatement::setDouble( uint8_t index, double value )
{
if( index >= m_statementData.size() )
m_statementData.resize( index + 1 );
m_statementData[index].data.d = value;
m_statementData[index].type = TYPE_DOUBLE;
}
void Core::Db::PreparedStatement::setString( uint8_t index, const std::string& value )
{
if( index >= m_statementData.size() )
m_statementData.resize( index + 1 );
m_statementData[index].binary.resize( value.length() + 1 );
memcpy( m_statementData[index].binary.data(), value.c_str(), value.length() + 1 );
m_statementData[index].type = TYPE_STRING;
}
void Core::Db::PreparedStatement::setBinary( uint8_t index, const std::vector< uint8_t >& value )
{
if( index >= m_statementData.size() )
m_statementData.resize( index + 1 );
m_statementData[index].binary = value;
m_statementData[index].type = TYPE_BINARY;
}
void Core::Db::PreparedStatement::setNull( uint8_t index )
{
if( index >= m_statementData.size() )
m_statementData.resize( index + 1 );
m_statementData[index].type = TYPE_NULL;
}
uint32_t Core::Db::PreparedStatement::getIndex() const
{
return m_index;
}
void Core::Db::PreparedStatement::setMysqlPS( Mysql::PreparedStatement* pStmt )
{
m_stmt = pStmt;
}

View file

@ -0,0 +1,83 @@
#ifndef SAPPHIRE_PREPAREDSTATEMENT_H
#define SAPPHIRE_PREPAREDSTATEMENT_H
#include <stdint.h>
#include <vector>
#include <string>
#include "Operation.h"
namespace Mysql
{
class PreparedStatement;
}
namespace Core
{
namespace Db
{
union PreparedStatementDataUnion
{
bool boolean;
uint32_t ui32;
int32_t i32;
uint64_t ui64;
int64_t i64;
double d;
};
enum PreparedStatementValueType
{
TYPE_BOOL,
TYPE_UI,
TYPE_UI64,
TYPE_I,
TYPE_I64,
TYPE_DOUBLE,
TYPE_STRING,
TYPE_BINARY,
TYPE_NULL
};
struct PreparedStatementData
{
PreparedStatementDataUnion data;
PreparedStatementValueType type;
std::vector< uint8_t > binary;
};
class PreparedStatement
{
public:
explicit PreparedStatement( uint32_t index );
~PreparedStatement();
void setBool( uint8_t index, bool value );
void setUInt( uint8_t index, uint32_t value );
void setUInt64( uint8_t index, uint64_t value );
void setInt( uint8_t index, int32_t value );
void setInt64( uint8_t index, int64_t value );
void setDouble( uint8_t index, double value );
void setString( uint8_t index, const std::string& value );
void setBinary( uint8_t index, const std::vector< uint8_t >& value );
void setNull( uint8_t index );
uint32_t getIndex() const;
void setMysqlPS( Mysql::PreparedStatement* pStmt );
void bindParameters();
protected:
Mysql::PreparedStatement* m_stmt;
uint32_t m_index;
std::vector< PreparedStatementData > m_statementData;
PreparedStatement( PreparedStatement const& right ) = delete;
PreparedStatement& operator=( PreparedStatement const& right ) = delete;
};
}
}
#endif //SAPPHIRE_PREPAREDSTATEMENT_H

View file

@ -0,0 +1,72 @@
#include "StatementTask.h"
#include <string.h>
#include "Operation.h"
#include "DbConnection.h"
#include "PreparedStatement.h"
Core::Db::StatementTask::StatementTask( const std::string &sql, bool async )
{
m_sql = sql;
m_hasResult = async; // If the operation is async, then there's a result
//if (async)
// m_result = new QueryResultPromise();
}
Core::Db::StatementTask::~StatementTask()
{
//if( m_hasResult && m_result != nullptr)
// delete m_result;
}
bool Core::Db::StatementTask::execute()
{
if( m_hasResult )
{
/*ResultSet* result = m_conn->Query(m_sql);
if (!result || !result->GetRowCount() || !result->NextRow())
{
delete result;
m_result->set_value(QueryResult(NULL));
return false;
}
m_result->set_value(QueryResult(result));
return true;*/
}
return m_pConn->execute( m_sql );
}
Core::Db::PreparedStatementTask::PreparedStatementTask( Core::Db::PreparedStatement *stmt, bool async ) :
m_stmt(stmt)
//, m_result(nullptr)
{
m_hasResult = async; // If the operation is async, then there's a result
}
Core::Db::PreparedStatementTask::~PreparedStatementTask()
{
delete m_stmt;
//if (m_has_result && m_result != nullptr)
// delete m_result;
}
bool Core::Db::PreparedStatementTask::execute()
{
//if (m_has_result)
//{
// PreparedResultSet* result = m_conn->Query(m_stmt);
// if (!result || !result->GetRowCount())
// {
// delete result;
// m_result->set_value(PreparedQueryResult(NULL));
// return false;
// }
// m_result->set_value(PreparedQueryResult(result));
// return true;
//}
return m_pConn->execute( m_stmt );
}

View file

@ -0,0 +1,54 @@
#ifndef SAPPHIRE_STATEMENTTASK_H
#define SAPPHIRE_STATEMENTTASK_H
#include <string>
#include "Operation.h"
namespace Core
{
namespace Db
{
class PreparedStatement;
class StatementTask : public Operation
{
public:
StatementTask( const std::string& sql, bool async = false );
~StatementTask();
bool execute() override;
// QueryResultFuture getFuture() const
// {
// return m_result->get_future();
// }
private:
std::string m_sql;
bool m_hasResult;
// QueryResultPromise *m_result;
};
//- Lower-level class, enqueuable operation
class PreparedStatementTask : public Operation
{
public:
PreparedStatementTask( PreparedStatement* stmt, bool async = false);
~PreparedStatementTask();
bool execute() override;
//PreparedQueryResultFuture getFuture() { return m_result->get_future(); }
protected:
PreparedStatement* m_stmt;
bool m_hasResult;
//PreparedQueryResultPromise* m_result;
};
}
}
#endif //SAPPHIRE_STATEMENTTASK_H

View file

@ -6,6 +6,7 @@
#include <queue>
#include <atomic>
#include <type_traits>
#include <utility>
namespace Core
{
@ -35,7 +36,7 @@ public:
{
std::lock_guard< std::mutex > lock( m_queueLock );
return _queue.empty();
return m_queue.empty();
}
bool pop( T& value )
@ -80,17 +81,17 @@ public:
m_queue.pop();
}
_shutdown = true;
m_shutdown = true;
_condition.notify_all();
m_condition.notify_all();
}
private:
template< typename E = T >
typename std::enable_if<std::is_pointer<E>::value>::type deleteQueuedObject( E& obj ) { delete obj; }
typename std::enable_if< std::is_pointer< E >::value >::type deleteQueuedObject( E& obj ) { delete obj; }
template< typename E = T >
typename std::enable_if<!std::is_pointer<E>::value>::type deleteQueuedObject( E const& ) { }
typename std::enable_if< !std::is_pointer< E >::value >::type deleteQueuedObject( E const& ) { }
};
}

View file

@ -7,6 +7,7 @@
#include <src/servers/Server_Common/Logging/Logger.h>
#include <src/servers/Server_Common/Config/XMLConfig.h>
#include <src/servers/Server_Common/Database/Database.h>
#include <src/servers/Server_Common/Version.h>
#include <MySqlBase.h>
#include <Connection.h>
@ -37,6 +38,11 @@
#include <boost/make_shared.hpp>
#include <boost/algorithm/string.hpp>
#include <Server_Common/Database/DbLoader.h>
#include <Server_Common/Database/CharaDbConnection.h>
#include <Server_Common/Database/DbWorkerPool.h>
#include <Server_Common/Database/PreparedStatement.h>
Core::Logger g_log;
Core::Db::Database g_database;
@ -45,7 +51,7 @@ Core::Scripting::ScriptManager g_scriptMgr;
Core::Data::ExdData g_exdData;
Core::ZoneMgr g_zoneMgr;
Core::LinkshellMgr g_linkshellMgr;
Core::Db::DbWorkerPool< Core::Db::CharaDbConnection > CharacterDatabase;
Core::ServerZone::ServerZone( const std::string& configPath, uint16_t serverId )
: m_configPath( configPath )
@ -167,99 +173,137 @@ bool Core::ServerZone::loadSettings( int32_t argc, char* argv[] )
return false;
}
try
Core::Db::DbLoader loader;
Core::Db::ConnectionInfo info;
info.password = m_pConfig->getValue< std::string >( "Settings.General.Mysql.Pass", "" );
info.host = m_pConfig->getValue< std::string >( "Settings.General.Mysql.Host", "127.0.0.1" );
info.database = m_pConfig->getValue< std::string >( "Settings.General.Mysql.Database", "sapphire" );
info.port = m_pConfig->getValue< uint16_t >( "Settings.General.Mysql.Port", 3306 );
info.user = m_pConfig->getValue< std::string >( "Settings.General.Mysql.Username", "root" );
info.syncThreads = m_pConfig->getValue< uint8_t >( "Settings.General.Mysql.SyncThreads", 2 );
info.asyncThreads = m_pConfig->getValue< uint8_t >( "Settings.General.Mysql.AsyncThreads", 2 );
loader.addDb( CharacterDatabase, info );
if( !loader.initDbs() )
return false;
// execute() runs asynchronous
CharacterDatabase.execute( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( 101, '127.0.0.1', 54555);" );
CharacterDatabase.execute( "DELETE FROM zoneservers WHERE id = 101" );
// query runs synchronous
boost::scoped_ptr< Mysql::ResultSet > res( CharacterDatabase.query( "SELECT id,ip,port FROM zoneservers" ) );
while( res->next() )
{
// bunch of test cases for db wrapper
Mysql::MySqlBase base;
g_log.info( base.getVersionInfo() );
Mysql::optionMap options;
options[ MYSQL_OPT_RECONNECT ] = "1";
boost::scoped_ptr< Mysql::Connection > con( base.connect( "127.0.0.1", "root", "", options ) );
if( con->getAutoCommit() )
g_log.info( "autocommit active" );
con->setAutoCommit( false );
if( !con->getAutoCommit() )
g_log.info( "autocommit inactive" );
con->setAutoCommit( true );
if( con->getAutoCommit() )
g_log.info( "autocommit active" );
con->setSchema( "sapphire" );
boost::scoped_ptr< Mysql::Statement > stmt( con->createStatement() );
bool t1 = stmt->execute( "DELETE FROM zoneservers WHERE id = 101" );
t1 = stmt->execute( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( 101, '127.0.0.1', 54555);" );
// t1 = stmt->execute( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( 101, '127.0.0.1', 54555);" ); // throws duplicate entry
t1 = stmt->execute( "DELETE FROM zoneservers WHERE id = 101" );
t1 = stmt->execute( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( 101, '127.0.0.1', 54555);" );
//t1 = stmt->execute( "DELETE FROM zoneservers WHERE id = 101" );
//boost::scoped_ptr< Mysql::Statement > stmt1( con->createStatement() );
//bool t2 = stmt1->execute( "INSERT INTO BLARGH!" ); // throws error
boost::scoped_ptr< Mysql::Statement > stmt2( con->createStatement() );
boost::scoped_ptr< Mysql::ResultSet > res( stmt2->executeQuery( "SELECT id,ip,port FROM zoneservers" ) );
while( res->next() )
{
g_log.info( "id: " + std::to_string( res->getUInt( "id" ) ) );
g_log.info( "ip: " + res->getString( "ip" ) );
g_log.info( "port: " + std::to_string( res->getUInt( "port" ) ) );
// alternatively ( slightly faster )
// g_log.info( "id: " + std::to_string( res->getUInt( 1 ) ) );
// g_log.info( "ip: " + res->getString( 2 ) );
// g_log.info( "port: " + std::to_string( res->getUInt( 3 ) ) );
}
// binary data test
boost::scoped_ptr< Mysql::Statement > stmt3( con->createStatement() );
boost::scoped_ptr< Mysql::ResultSet > res1( stmt3->executeQuery( "SELECT * FROM charabase" ) );
while( res1->next() )
{
auto blob = res1->getBlobVector( "Customize" );
}
boost::scoped_ptr< Mysql::PreparedStatement > pstmt2( con->prepareStatement( "DELETE FROM zoneservers WHERE id = ?" ) );
pstmt2->setInt( 1, 1021 );
pstmt2->execute();
pstmt2->setInt( 1, 1001 );
pstmt2->execute();
boost::scoped_ptr< Mysql::PreparedStatement > pstmt( con->prepareStatement( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( ?, ?, ?);" ) );
pstmt->setInt( 1, 1001 );
pstmt->setString( 2, "123.123.123.123" );
pstmt->setInt( 3, 5454 );
pstmt->execute();
pstmt->setInt( 1, 1021 );
pstmt->setString( 2, "173.173.173.173" );
pstmt->setInt( 3, 5151 );
pstmt->execute();
boost::scoped_ptr< Mysql::PreparedStatement > pstmt1( con->prepareStatement( "DELETE FROM zoneservers WHERE id = ?" ) );
pstmt->setInt( 1, 1021 );
pstmt->execute();
pstmt->setInt( 1, 1001 );
pstmt->execute();
}
catch( std::runtime_error e )
{
g_log.error( e.what() );
g_log.info( "id: " + std::to_string( res->getUInt( "id" ) ) );
g_log.info( "ip: " + res->getString( "ip" ) );
g_log.info( "port: " + std::to_string( res->getUInt( "port" ) ) );
}
auto stmt = CharacterDatabase.getPreparedStatement( Core::Db::CharaDbStatements::CHAR_INS_TEST );
stmt->setUInt( 1, 2345 );
stmt->setString( 2, "123.123.123.123" );
stmt->setUInt( 3, 3306 );
CharacterDatabase.execute( stmt );
//stmt->setUInt( 1, 245 );
//stmt->setString( 2, "12.12.12.12" );
//stmt->setUInt( 3, 3306 );
//CharacterDatabase.execute( stmt );
//try
//{
// // bunch of test cases for db wrapper
// Mysql::MySqlBase base;
// g_log.info( base.getVersionInfo() );
// Mysql::optionMap options;
// options[ MYSQL_OPT_RECONNECT ] = "1";
// boost::scoped_ptr< Mysql::Connection > con( base.connect( "127.0.0.1", "root", "", options, 3306 ) );
// if( con->getAutoCommit() )
// g_log.info( "autocommit active" );
// con->setAutoCommit( false );
// if( !con->getAutoCommit() )
// g_log.info( "autocommit inactive" );
// con->setAutoCommit( true );
// if( con->getAutoCommit() )
// g_log.info( "autocommit active" );
// con->setSchema( "sapphire" );
// boost::scoped_ptr< Mysql::Statement > stmt( con->createStatement() );
// bool t1 = stmt->execute( "DELETE FROM zoneservers WHERE id = 101" );
// t1 = stmt->execute( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( 101, '127.0.0.1', 54555);" );
// // t1 = stmt->execute( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( 101, '127.0.0.1', 54555);" ); // throws duplicate entry
// t1 = stmt->execute( "DELETE FROM zoneservers WHERE id = 101" );
// t1 = stmt->execute( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( 101, '127.0.0.1', 54555);" );
// //t1 = stmt->execute( "DELETE FROM zoneservers WHERE id = 101" );
// //boost::scoped_ptr< Mysql::Statement > stmt1( con->createStatement() );
// //bool t2 = stmt1->execute( "INSERT INTO BLARGH!" ); // throws error
// boost::scoped_ptr< Mysql::Statement > stmt2( con->createStatement() );
// boost::scoped_ptr< Mysql::ResultSet > res( stmt2->executeQuery( "SELECT id,ip,port FROM zoneservers" ) );
// while( res->next() )
// {
// g_log.info( "id: " + std::to_string( res->getUInt( "id" ) ) );
// g_log.info( "ip: " + res->getString( "ip" ) );
// g_log.info( "port: " + std::to_string( res->getUInt( "port" ) ) );
// // alternatively ( slightly faster )
// // g_log.info( "id: " + std::to_string( res->getUInt( 1 ) ) );
// // g_log.info( "ip: " + res->getString( 2 ) );
// // g_log.info( "port: " + std::to_string( res->getUInt( 3 ) ) );
// }
// // binary data test
// boost::scoped_ptr< Mysql::Statement > stmt3( con->createStatement() );
// boost::scoped_ptr< Mysql::ResultSet > res1( stmt3->executeQuery( "SELECT * FROM charabase" ) );
// while( res1->next() )
// {
// auto blob = res1->getBlobVector( "Customize" );
// }
// boost::scoped_ptr< Mysql::PreparedStatement > pstmt2( con->prepareStatement( "DELETE FROM zoneservers WHERE id = ?" ) );
// pstmt2->setInt( 1, 1021 );
// pstmt2->execute();
// pstmt2->setInt( 1, 1001 );
// pstmt2->execute();
// boost::scoped_ptr< Mysql::PreparedStatement > pstmt( con->prepareStatement( "INSERT INTO zoneservers ( id, ip, port ) VALUES ( ?, ?, ?);" ) );
// pstmt->setInt( 1, 1001 );
// pstmt->setString( 2, "123.123.123.123" );
// pstmt->setInt( 3, 5454 );
// pstmt->execute();
// pstmt->setInt( 1, 1021 );
// pstmt->setString( 2, "173.173.173.173" );
// pstmt->setInt( 3, 5151 );
// pstmt->execute();
// boost::scoped_ptr< Mysql::PreparedStatement > pstmt1( con->prepareStatement( "DELETE FROM zoneservers WHERE id = ?" ) );
// pstmt->setInt( 1, 1021 );
// pstmt->execute();
// pstmt->setInt( 1, 1001 );
// pstmt->execute();
//}
//catch( std::runtime_error e )
//{
// g_log.error( e.what() );
//}
Db::DatabaseParams params;
params.bufferSize = 16384;
@ -290,7 +334,8 @@ void Core::ServerZone::run( int32_t argc, char* argv[] )
g_log.info( "===========================================================" );
g_log.info( "Sapphire Server Project " );
g_log.info( "Version: x.y.z" );
g_log.info( "Version: " + Core::Version::VERSION );
g_log.info( "GitHash: " + Core::Version::GIT_HASH );
g_log.info( "Compiled: " __DATE__ " " __TIME__ );
g_log.info( "===========================================================" );