- bug fixes for new thread manager when in game lobby

This commit is contained in:
Mark Vejvoda 2013-01-12 04:40:44 +00:00
parent 1a49588a45
commit 90ad8581d7
2 changed files with 126 additions and 112 deletions

View File

@ -877,11 +877,12 @@ bool ServerInterface::signalClientReceiveCommands(ConnectionSlot *connectionSlot
void ServerInterface::signalClientsToRecieveData(std::map<PLATFORM_SOCKET,bool> &socketTriggeredList,
std::map<int,ConnectionSlotEvent> &eventList,
std::map<int,bool> & mapSlotSignalledList) {
//printf("====================================In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false");
if(newThreadManager == true) {
masterController.clearSlaves(true);
std::vector<SlaveThreadControllerInterface *> slaveThreadList;
for(unsigned int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
ConnectionSlot* connectionSlot = slots[i];
@ -894,18 +895,19 @@ void ServerInterface::signalClientsToRecieveData(std::map<PLATFORM_SOCKET,bool>
}
}
ConnectionSlotEvent &event = eventList[i];
event.eventType = eReceiveSocketData;
event.networkMessage = NULL;
event.connectionSlot = connectionSlot;
event.socketTriggered = socketTriggered;
event.triggerId = i;
event.eventId = getNextEventId();
if(connectionSlot != NULL) {
if(socketTriggered == true || connectionSlot->isConnected() == false) {
ConnectionSlotEvent &event = eventList[i];
event.eventType = eReceiveSocketData;
event.networkMessage = NULL;
event.connectionSlot = connectionSlot;
event.socketTriggered = socketTriggered;
event.triggerId = i;
event.eventId = getNextEventId();
slaveThreadList.push_back(connectionSlot->getWorkerThread());
mapSlotSignalledList[i] = true;
if(connectionSlot->getWorkerThread() != NULL) {
slaveThreadList.push_back(connectionSlot->getWorkerThread());
mapSlotSignalledList[i] = true;
}
}
}
}
@ -937,9 +939,13 @@ void ServerInterface::checkForCompletedClients(std::map<int,bool> & mapSlotSigna
const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false");
if(newThreadManager == true) {
bool slavesCompleted = masterController.waitTillSlavesTrigger(1000 * MAX_SLOT_THREAD_WAIT_TIME);
for(unsigned int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
//printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
bool slavesCompleted = masterController.waitTillSlavesTrigger(1000 * MAX_SLOT_THREAD_WAIT_TIME);
masterController.clearSlaves(true);
//printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
ConnectionSlot* connectionSlot = slots[i];
@ -965,7 +971,7 @@ void ServerInterface::checkForCompletedClients(std::map<int,bool> & mapSlotSigna
}
}
}
masterController.clearSlaves(true);
//printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
}
else {
time_t waitForThreadElapsed = time(NULL);
@ -1030,121 +1036,122 @@ void ServerInterface::checkForLaggingClients(std::map<int,bool> &mapSlotSignalle
std::map<PLATFORM_SOCKET,bool> &socketTriggeredList,
std::vector <string> &errorMsgList) {
bool lastGlobalLagCheckTimeUpdate = false;
time_t waitForClientsElapsed = time(NULL);
time_t waitForThreadElapsed = time(NULL);
std::map<int,bool> slotsCompleted;
std::map<int,bool> slotsWarnedList;
for(bool threadsDone = false;
exitServer == false && threadsDone == false &&
difftime((long int)time(NULL),waitForThreadElapsed) < MAX_SLOT_THREAD_WAIT_TIME;) {
threadsDone = true;
// Examine all threads for completion of delegation
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
ConnectionSlot* connectionSlot = slots[i];
if(connectionSlot != NULL && mapSlotSignalledList[i] == true &&
slotsCompleted.find(i) == slotsCompleted.end()) {
try {
std::vector<std::string> errorList = connectionSlot->getThreadErrorList();
// Show any collected errors from threads
if(errorList.empty() == false) {
for(int iErrIdx = 0; iErrIdx < errorList.size(); ++iErrIdx) {
string &sErr = errorList[iErrIdx];
if(sErr != "") {
errorMsgList.push_back(sErr);
}
}
connectionSlot->clearThreadErrorList();
}
// Not done waiting for data yet
bool updateFinished = (connectionSlot != NULL ? connectionSlot->updateCompleted(&eventList[i]) : true);
if(updateFinished == false) {
threadsDone = false;
break;
}
else {
// New lag check
std::pair<bool,bool> clientLagExceededOrWarned = std::make_pair(false,false);
if( gameHasBeenInitiated == true && connectionSlot != NULL &&
connectionSlot->isConnected() == true) {
clientLagExceededOrWarned = clientLagCheck(connectionSlot,slotsWarnedList[i]);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d, gameSettings.getNetworkPauseGameForLaggedClients() = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second,gameSettings.getNetworkPauseGameForLaggedClients());
if(clientLagExceededOrWarned.first == true) {
slotsWarnedList[i] = true;
}
}
// If the client has exceeded lag and the server wants
// to pause while they catch up, re-trigger the
// client reader thread
if((clientLagExceededOrWarned.first == true && gameSettings.getNetworkPauseGameForLaggedClients() == true)) { // ||
//(clientLagExceededOrWarned.second == true && slotsWarnedAndRetried[i] == false)) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] Line: %d, clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d, difftime(time(NULL),waitForClientsElapsed) = %.2f, MAX_CLIENT_WAIT_SECONDS_FOR_PAUSE = %.2f\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second,difftime((long int)time(NULL),waitForClientsElapsed),MAX_CLIENT_WAIT_SECONDS_FOR_PAUSE);
if(difftime((long int)time(NULL),waitForClientsElapsed) < MAX_CLIENT_WAIT_SECONDS_FOR_PAUSE) {
if(connectionSlot != NULL) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] Line: %d, clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second);
bool socketTriggered = false;
PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId();
if(clientSocket > 0) {
socketTriggered = socketTriggeredList[clientSocket];
}
ConnectionSlotEvent &event = eventList[i];
mapSlotSignalledList[i] = signalClientReceiveCommands(connectionSlot,i,socketTriggered,event);
threadsDone = false;
if(gameHasBeenInitiated == true) {
time_t waitForClientsElapsed = time(NULL);
time_t waitForThreadElapsed = time(NULL);
std::map<int,bool> slotsCompleted;
std::map<int,bool> slotsWarnedList;
for(bool threadsDone = false;
exitServer == false && threadsDone == false &&
difftime((long int)time(NULL),waitForThreadElapsed) < MAX_SLOT_THREAD_WAIT_TIME;) {
threadsDone = true;
// Examine all threads for completion of delegation
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
ConnectionSlot* connectionSlot = slots[i];
if(connectionSlot != NULL && mapSlotSignalledList[i] == true &&
slotsCompleted.find(i) == slotsCompleted.end()) {
try {
std::vector<std::string> errorList = connectionSlot->getThreadErrorList();
// Show any collected errors from threads
if(errorList.empty() == false) {
for(int iErrIdx = 0; iErrIdx < errorList.size(); ++iErrIdx) {
string &sErr = errorList[iErrIdx];
if(sErr != "") {
errorMsgList.push_back(sErr);
}
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] Line: %d, clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second);
}
connectionSlot->clearThreadErrorList();
}
// Not done waiting for data yet
bool updateFinished = (connectionSlot != NULL ? connectionSlot->updateCompleted(&eventList[i]) : true);
if(updateFinished == false) {
threadsDone = false;
break;
}
else {
slotsCompleted[i] = true;
}
}
}
catch(const exception &ex) {
SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] error detected [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
errorMsgList.push_back(ex.what());
}
}
// New lag check
std::pair<bool,bool> clientLagExceededOrWarned = std::make_pair(false,false);
if( gameHasBeenInitiated == true && connectionSlot != NULL &&
connectionSlot->isConnected() == true) {
clientLagExceededOrWarned = clientLagCheck(connectionSlot,slotsWarnedList[i]);
if(connectionSlot != NULL && connectionSlot->isConnected() == true) {
try {
if(gameHasBeenInitiated == true &&
difftime((long int)time(NULL),lastGlobalLagCheckTime) >= LAG_CHECK_GRACE_PERIOD) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d, gameSettings.getNetworkPauseGameForLaggedClients() = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second,gameSettings.getNetworkPauseGameForLaggedClients());
//printf("\n\n\n^^^^^^^^^^^^^^ PART A\n\n\n");
if(clientLagExceededOrWarned.first == true) {
slotsWarnedList[i] = true;
}
}
// If the client has exceeded lag and the server wants
// to pause while they catch up, re-trigger the
// client reader thread
if((clientLagExceededOrWarned.first == true && gameSettings.getNetworkPauseGameForLaggedClients() == true)) { // ||
//(clientLagExceededOrWarned.second == true && slotsWarnedAndRetried[i] == false)) {
// New lag check
std::pair<bool,bool> clientLagExceededOrWarned = std::make_pair(false,false);
if( gameHasBeenInitiated == true && connectionSlot != NULL &&
connectionSlot->isConnected() == true) {
//printf("\n\n\n^^^^^^^^^^^^^^ PART B\n\n\n");
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] Line: %d, clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d, difftime(time(NULL),waitForClientsElapsed) = %.2f, MAX_CLIENT_WAIT_SECONDS_FOR_PAUSE = %.2f\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second,difftime((long int)time(NULL),waitForClientsElapsed),MAX_CLIENT_WAIT_SECONDS_FOR_PAUSE);
lastGlobalLagCheckTimeUpdate = true;
clientLagExceededOrWarned = clientLagCheck(connectionSlot,slotsWarnedList[i]);
if(difftime((long int)time(NULL),waitForClientsElapsed) < MAX_CLIENT_WAIT_SECONDS_FOR_PAUSE) {
if(connectionSlot != NULL) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] Line: %d, clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d, gameSettings.getNetworkPauseGameForLaggedClients() = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second,gameSettings.getNetworkPauseGameForLaggedClients());
if(clientLagExceededOrWarned.first == true) {
slotsWarnedList[i] = true;
bool socketTriggered = false;
PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId();
if(clientSocket > 0) {
socketTriggered = socketTriggeredList[clientSocket];
}
ConnectionSlotEvent &event = eventList[i];
mapSlotSignalledList[i] = signalClientReceiveCommands(connectionSlot,i,socketTriggered,event);
threadsDone = false;
}
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] Line: %d, clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second);
}
}
else {
slotsCompleted[i] = true;
}
}
}
catch(const exception &ex) {
SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] error detected [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
errorMsgList.push_back(ex.what());
}
}
catch(const exception &ex) {
SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] error detected [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
errorMsgList.push_back(ex.what());
if(connectionSlot != NULL && connectionSlot->isConnected() == true) {
try {
if(gameHasBeenInitiated == true &&
difftime((long int)time(NULL),lastGlobalLagCheckTime) >= LAG_CHECK_GRACE_PERIOD) {
//printf("\n\n\n^^^^^^^^^^^^^^ PART A\n\n\n");
// New lag check
std::pair<bool,bool> clientLagExceededOrWarned = std::make_pair(false,false);
if( gameHasBeenInitiated == true && connectionSlot != NULL &&
connectionSlot->isConnected() == true) {
//printf("\n\n\n^^^^^^^^^^^^^^ PART B\n\n\n");
lastGlobalLagCheckTimeUpdate = true;
clientLagExceededOrWarned = clientLagCheck(connectionSlot,slotsWarnedList[i]);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d, gameSettings.getNetworkPauseGameForLaggedClients() = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second,gameSettings.getNetworkPauseGameForLaggedClients());
if(clientLagExceededOrWarned.first == true) {
slotsWarnedList[i] = true;
}
}
}
}
catch(const exception &ex) {
SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] error detected [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
errorMsgList.push_back(ex.what());
}
}
}
}
}
if(lastGlobalLagCheckTimeUpdate == true) {
lastGlobalLagCheckTime = time(NULL);
}
@ -1386,18 +1393,21 @@ void ServerInterface::update() {
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chrono.getMillis());
//printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
// Step #3 check clients for any lagging scenarios and try to deal with them
checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ============ Step #4\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chrono.getMillis());
//printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
// Step #4 dispatch network commands to the pending list so that they are done in proper order
executeNetworkCommandsFromClients();
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ============ Step #5\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chrono.getMillis());
//printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
// Step #5 dispatch pending chat messages
dispatchPendingChatMessages(errorMsgList);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] Line: %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
@ -1483,6 +1493,7 @@ void ServerInterface::update() {
}
}
//printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chrono.getMillis());
}

View File

@ -484,6 +484,7 @@ void MasterSlaveThreadController::triggerMaster(int waitMilliseconds) {
MutexSafeWrapper safeMutex(mutex);
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] semVal = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter);
//printf("In [%s::%s Line: %d] semVal = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter);
slaveTriggerCounter--;
int newCount = slaveTriggerCounter;
@ -492,6 +493,8 @@ void MasterSlaveThreadController::triggerMaster(int waitMilliseconds) {
safeMutex.ReleaseLock();
//printf("In [%s::%s Line: %d] semVal = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter);
if(newCount <= triggerBaseCount) {
slaveTriggerSem->signal();
}