Commit 24f43d81 authored by Nigel Kukard's avatar Nigel Kukard
Browse files

* Updated the way packets are processed in prioritization, packets are now processed fairly

parent 95d5b4fa
......@@ -177,15 +177,17 @@ static inline int burst_credit_will_exceed(struct flow_t *flow, unsigned int pkt
// Function to process a flow queue, returns number of packets accepted
static int processPktQueue(struct runnerData_t *runnerData, struct pktQueue_t *pktQueue)
static unsigned int processPktQueue(struct runnerData_t *runnerData, struct pktQueue_t *pktQueue,
unsigned int pkts)
{
int status;
GList *packets;
int exceeded = 0;
int i;
// Differences in queue when we done
int acceptLen = 0, queuedLen = 0;
int acceptSize = 0, queuedSize = 0;
unsigned int acceptLen = 0, queuedLen = 0;
unsigned int acceptSize = 0, queuedSize = 0;
unsigned int processed;
// Lock, hijack packets, unlock
......@@ -194,8 +196,8 @@ static int processPktQueue(struct runnerData_t *runnerData, struct pktQueue_t *p
pktQueue->packets = NULL;
g_mutex_unlock(pktQueue->lock);
// Check that we within our boundaries
while (!exceeded)
// Check that we within our boundaries and will not accept too many packets
while (!exceeded && pkts > (acceptLen + queuedLen))
{
struct packet_t *packet;
GList *pktQueueItem;
......@@ -322,6 +324,7 @@ static int processPktQueue(struct runnerData_t *runnerData, struct pktQueue_t *p
{
float delta;
// Get the fraction of time passed since last update, predict below to 1 second
delta = flow->curThroughputAge / 1000000.0;
......@@ -454,7 +457,9 @@ static int processPktQueue(struct runnerData_t *runnerData, struct pktQueue_t *p
g_mutex_unlock(pktQueue->lock);
return acceptLen;
processed = acceptLen + queuedLen;
return processed;
}
......@@ -464,19 +469,6 @@ void *flowRunner(void *data)
{
struct runnerData_t *runnerData = (struct runnerData_t*) data;
// Our processing function
void processQueue(void *data, void *user_data)
{
struct pktQueue_t *pktQueue = (struct pktQueue_t*) data;
struct runnerData_t *aRunnerData = (struct runnerData_t*) user_data;
// Check if we found a flow with a queue
// NOTE: The function below returns the number of packets processed
processPktQueue(aRunnerData,pktQueue);
}
logMessage(LOG_DEBUG, "Flow runner started...\n");
......@@ -485,7 +477,7 @@ void *flowRunner(void *data)
{
GTimeVal mytime;
unsigned char i;
GList *queueChangeList = NULL;
GList *queueChangeList[NUM_PRIO_BANDS];
g_mutex_lock(runnerData->bandSignalLock);
......@@ -502,11 +494,13 @@ void *flowRunner(void *data)
// Hijack the queue change list items
for (i = 0; i < NUM_PRIO_BANDS; i++)
{
queueChangeList[i] = NULL;
// Zero runner data if it is non-NULL
if (runnerData->queueChangeList[i])
{
// Copy list item over
queueChangeList = g_list_concat(queueChangeList,runnerData->queueChangeList[i]);
queueChangeList[i] = g_list_concat(queueChangeList[i],runnerData->queueChangeList[i]);
// Blank used list... copy uses directly
runnerData->queueChangeList[i] = NULL;
}
......@@ -518,10 +512,31 @@ void *flowRunner(void *data)
// Process list if it is non-NULL
if (queueChangeList)
for (i = 0; i < NUM_PRIO_BANDS; i++)
{
g_list_foreach(queueChangeList,processQueue,runnerData);
g_list_free(queueChangeList);
// Loop while there are still items to be processed
while (queueChangeList[i])
{
GList *item = g_list_first(queueChangeList[i]);
// Loop with all queues that need items to be processed
while (item)
{
struct pktQueue_t *pktQueue;
unsigned int processed = 0;
// Just a harmless error check
if ((pktQueue = (struct pktQueue_t*) item->data))
processed = processPktQueue(runnerData,pktQueue,1); // Process 1 packet at a time
// Next queue
item = g_list_next(item);
// If nothing was processed remove ourselves from the change list
if (!processed)
queueChangeList[i] = g_list_remove(queueChangeList[i],(void *) pktQueue);
}
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment