276 lines
9.5 KiB
Plaintext
276 lines
9.5 KiB
Plaintext
|
|
/**
|
|||
|
|
@page tevent_queue Chapter 5: Tevent queue
|
|||
|
|
@section queue Tevent queue
|
|||
|
|
|
|||
|
|
There is a possibility that the dispatcher and its handlers may not be able to
|
|||
|
|
handle all the incoming events as quickly as they arrive. One way to deal with
|
|||
|
|
this situation is to buffer the received events by introducing an event queue
|
|||
|
|
into the events stream, between the events generator and the dispatcher. Events
|
|||
|
|
are added to the queue as they arrive, and the dispatcher pops them off the
|
|||
|
|
beginning of the queue as fast as possible. In tevent library it is
|
|||
|
|
similar, but the queue is not automatically set for any event. The queue has to
|
|||
|
|
be created on purpose, and events which should follow the order of the FIFO
|
|||
|
|
queue have to be explicitly pinpointed. Creating such a queue is crucial in
|
|||
|
|
situations when sequential processing is absolutely essential for the
|
|||
|
|
successful
|
|||
|
|
completion of a task, e.g. for a large quantity of data that are about to be
|
|||
|
|
written from a buffer into a socket. The tevent library has its own queue
|
|||
|
|
structure that is ready to use after it has been initialized and started up
|
|||
|
|
once.
|
|||
|
|
|
|||
|
|
@subsection cr_queue Creation of Queues
|
|||
|
|
|
|||
|
|
The first and most important step is the creation of the tevent queue
|
|||
|
|
(represented by struct tevent_queue), which will then be in running mode.
|
|||
|
|
|
|||
|
|
@code
|
|||
|
|
struct tevent_queue* tevent_queue_create (TALLOC_CTX *mem_ctx, const char *name)
|
|||
|
|
@endcode
|
|||
|
|
|
|||
|
|
When the program returns from this function, the allocated memory, set
|
|||
|
|
destructor and labeled queue as running has been done and the structure is
|
|||
|
|
ready to be filled with entries. Stopping and starting queues on the run. If
|
|||
|
|
you need to stop a queue from processing its entries, and then turn it on
|
|||
|
|
again, a couple of functions which serve this purpose are:
|
|||
|
|
|
|||
|
|
- bool tevent_queue_stop()
|
|||
|
|
- bool tevent_queue_start()
|
|||
|
|
|
|||
|
|
These functions actually only provide for the simple setting of a variable,
|
|||
|
|
which indicates that the queue has been stopped/started. Returned value
|
|||
|
|
indicates result.
|
|||
|
|
|
|||
|
|
@subsection add_queue Adding Requests to a Queue
|
|||
|
|
|
|||
|
|
Tevent in fact offers 3 possible ways of inserting a request into a queue.
|
|||
|
|
There are no vast differences between them, but still there might be situations
|
|||
|
|
where one of them is more suitable and desired than another.
|
|||
|
|
|
|||
|
|
@code
|
|||
|
|
bool tevent_queue_add(struct tevent_queue *queue,
|
|||
|
|
struct tevent_context *ev,
|
|||
|
|
struct tevent_req *req,
|
|||
|
|
tevent_queue_trigger_fn_t trigger,
|
|||
|
|
void *private_data)
|
|||
|
|
@endcode
|
|||
|
|
|
|||
|
|
This call is the simplest of all three. It offers only boolean verification of
|
|||
|
|
whether the operation of adding the request into a queue was successful or not.
|
|||
|
|
No additional deletion of an item from the queue is possible, i.e. it is only
|
|||
|
|
possible to deallocate the whole tevent request, which would cause triggering
|
|||
|
|
of destructor handling and also dropping the request from the queue.
|
|||
|
|
|
|||
|
|
<strong>Extended Options</strong>
|
|||
|
|
|
|||
|
|
Both of the following functions have a feature in common - they return tevent
|
|||
|
|
queue entry structure representing the item in a queue. There is no further
|
|||
|
|
possible handling with this structure except the use of the structure’s pointer
|
|||
|
|
for its deallocation (which leads also its removal from the queue). The
|
|||
|
|
difference lies in the possibility that with the following functions it is
|
|||
|
|
possible to remove the tevent request from a queue without its deallocation.
|
|||
|
|
The previous function can only deallocate the tevent request as it was from
|
|||
|
|
memory, and thereby logically cause its removal from the queue as well. There
|
|||
|
|
is no other utilization of this structure via API at this stage of tevent
|
|||
|
|
library. The possibility of easier debugging while developing with tevent could
|
|||
|
|
be considered to be an advantage of this returned pointer.
|
|||
|
|
|
|||
|
|
@code
|
|||
|
|
struct tevent_queue_entry *tevent_queue_add_entry(struct tevent_queue *queue,
|
|||
|
|
struct tevent_context *ev,
|
|||
|
|
struct tevent_req *req,
|
|||
|
|
tevent_queue_trigger_fn_t trigger,
|
|||
|
|
void *private_data)
|
|||
|
|
@endcode
|
|||
|
|
|
|||
|
|
The feature that allows for the optimized addition of entries to a queue is
|
|||
|
|
that a check for an empty queue with no items is first of all carried out. If
|
|||
|
|
it is found that the queue is empty, then the request for inserting the entry
|
|||
|
|
into a queue will be omitted and directly triggered.
|
|||
|
|
|
|||
|
|
@code
|
|||
|
|
struct tevent_queue_entry *tevent_queue_add_optimize_empty(struct tevent_queue *queue,
|
|||
|
|
struct tevent_context *ev,
|
|||
|
|
struct tevent_req *req,
|
|||
|
|
tevent_queue_trigger_fn_t trigger,
|
|||
|
|
void *private_data)
|
|||
|
|
@endcode
|
|||
|
|
|
|||
|
|
When calling any of the functions serving for inserting an item into a queue,
|
|||
|
|
it is possible to leave out the fourth argument (trigger) and instead of a
|
|||
|
|
function pass a NULL pointer. This usage sets so-called blocking entries.
|
|||
|
|
These entries, since they do not have any trigger operation to be activated,
|
|||
|
|
just sit in their position until they are labeled as a done by another
|
|||
|
|
function. Their purpose is to block other items in the queue from being
|
|||
|
|
triggered.
|
|||
|
|
|
|||
|
|
@subsection example_q Example of tevent queue
|
|||
|
|
|
|||
|
|
@code
|
|||
|
|
#include <stdio.h>
|
|||
|
|
#include <unistd.h>
|
|||
|
|
#include <tevent.h>
|
|||
|
|
|
|||
|
|
struct foo_state {
|
|||
|
|
int local_var;
|
|||
|
|
int x;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
struct juststruct {
|
|||
|
|
TALLOC_CTX * ctx;
|
|||
|
|
struct tevent_context *ev;
|
|||
|
|
int y;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
int created = 0;
|
|||
|
|
|
|||
|
|
static void timer_handler(struct tevent_context *ev, struct tevent_timer *te,
|
|||
|
|
struct timeval current_time, void *private_data)
|
|||
|
|
{
|
|||
|
|
// time event which after all sets request as done. Following item from
|
|||
|
|
// the queue may be invoked.
|
|||
|
|
struct tevent_req *req = private_data;
|
|||
|
|
struct foo_state *stateX = tevent_req_data(req, struct foo_state);
|
|||
|
|
|
|||
|
|
// processing some stuff
|
|||
|
|
|
|||
|
|
printf("time_handler\n");
|
|||
|
|
|
|||
|
|
tevent_req_done(req);
|
|||
|
|
talloc_free(req);
|
|||
|
|
|
|||
|
|
printf("Request #%d set as done.\n", stateX->x);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
static void trigger(struct tevent_req *req, void *private_data)
|
|||
|
|
{
|
|||
|
|
struct juststruct *priv = tevent_req_callback_data (req, struct juststruct);
|
|||
|
|
struct foo_state *in = tevent_req_data(req, struct foo_state);
|
|||
|
|
struct timeval schedule;
|
|||
|
|
struct tevent_timer *tim;
|
|||
|
|
schedule = tevent_timeval_current_ofs(1, 0);
|
|||
|
|
printf("Processing request #%d\n", in->x);
|
|||
|
|
|
|||
|
|
if (in->x % 3 == 0) { // just example; third request does not contain
|
|||
|
|
// any further operation and will be finished right
|
|||
|
|
// away.
|
|||
|
|
tim = NULL;
|
|||
|
|
} else {
|
|||
|
|
tim = tevent_add_timer(priv->ev, req, schedule, timer_handler, req);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (tim == NULL) {
|
|||
|
|
tevent_req_done(req);
|
|||
|
|
talloc_free(req);
|
|||
|
|
printf("Request #%d set as done.\n", in->x);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
struct tevent_req *foo_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
|
|||
|
|
const char *name, int num)
|
|||
|
|
{
|
|||
|
|
struct tevent_req *req;
|
|||
|
|
struct foo_state *state;
|
|||
|
|
struct foo_state *in;
|
|||
|
|
struct tevent_timer *tim;
|
|||
|
|
|
|||
|
|
printf("foo_send\n");
|
|||
|
|
req = tevent_req_create(mem_ctx, &state, struct foo_state);
|
|||
|
|
if (req == NULL) { // check for appropriate allocation
|
|||
|
|
tevent_req_error(req, 1);
|
|||
|
|
return NULL;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// exemplary filling of variables
|
|||
|
|
state->local_var = 1;
|
|||
|
|
state->x = num;
|
|||
|
|
|
|||
|
|
return req;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
static void foo_done(struct tevent_req *req) {
|
|||
|
|
|
|||
|
|
enum tevent_req_state state;
|
|||
|
|
uint64_t err;
|
|||
|
|
|
|||
|
|
if (tevent_req_is_error(req, &state, &err)) {
|
|||
|
|
printf("ERROR WAS SET %d\n", state);
|
|||
|
|
return;
|
|||
|
|
} else {
|
|||
|
|
// processing some stuff
|
|||
|
|
printf("Callback is done...\n");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
int main (int argc, char **argv)
|
|||
|
|
{
|
|||
|
|
TALLOC_CTX *mem_ctx;
|
|||
|
|
struct tevent_req* req[6];
|
|||
|
|
struct tevent_req* tmp;
|
|||
|
|
struct tevent_context *ev;
|
|||
|
|
struct tevent_queue *fronta = NULL;
|
|||
|
|
struct juststruct *data;
|
|||
|
|
int ret;
|
|||
|
|
int i = 0;
|
|||
|
|
|
|||
|
|
const char * const names[] = {
|
|||
|
|
"first", "second", "third", "fourth", "fifth"
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
printf("INIT\n");
|
|||
|
|
|
|||
|
|
mem_ctx = talloc_new(NULL); //parent
|
|||
|
|
talloc_parent(mem_ctx);
|
|||
|
|
ev = tevent_context_init(mem_ctx);
|
|||
|
|
if (ev == NULL) {
|
|||
|
|
fprintf(stderr, "MEMORY ERROR\n");
|
|||
|
|
return EXIT_FAILURE;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// setting up queue
|
|||
|
|
fronta = tevent_queue_create(mem_ctx, "test_queue");
|
|||
|
|
tevent_queue_stop(fronta);
|
|||
|
|
tevent_queue_start(fronta);
|
|||
|
|
if (tevent_queue_running(fronta)) {
|
|||
|
|
printf ("Queue is runnning (length: %d)\n", tevent_queue_length(fronta));
|
|||
|
|
} else {
|
|||
|
|
printf ("Queue is not runnning\n");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
data = talloc(ev, struct juststruct);
|
|||
|
|
data->ctx = mem_ctx;
|
|||
|
|
data->ev = ev;
|
|||
|
|
|
|||
|
|
|
|||
|
|
// create 4 requests
|
|||
|
|
for (i = 1; i < 5; i++) {
|
|||
|
|
req[i] = foo_send(mem_ctx, ev, names[i], i);
|
|||
|
|
tmp = req[i];
|
|||
|
|
if (req[i] == NULL) {
|
|||
|
|
fprintf(stderr, "Request error! %d \n", ret);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
tevent_req_set_callback(req[i], foo_done, data);
|
|||
|
|
created++;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// add item to a queue
|
|||
|
|
tevent_queue_add(fronta, ev, req[1], trigger, data);
|
|||
|
|
tevent_queue_add(fronta, ev, req[2], trigger, data);
|
|||
|
|
tevent_queue_add(fronta, ev, req[3], trigger, data);
|
|||
|
|
tevent_queue_add(fronta, ev, req[4], trigger, data);
|
|||
|
|
|
|||
|
|
printf("Queue length: %d\n", tevent_queue_length(fronta));
|
|||
|
|
while(tevent_queue_length(fronta) > 0) {
|
|||
|
|
tevent_loop_once(ev);
|
|||
|
|
printf("Queue: %d items left\n", tevent_queue_length(fronta));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
talloc_free(mem_ctx);
|
|||
|
|
printf("FINISH\n");
|
|||
|
|
|
|||
|
|
return EXIT_SUCCESS;
|
|||
|
|
}
|
|||
|
|
@endcode
|
|||
|
|
|
|||
|
|
*/
|