libubox - uloop runqueue ustream
40600 ワード
イベント処理サイクル(uloop.c/h)
インタフェースの説明
メインフレーム
ディスクリプタイベント
タイマイベント
プロセスイベント
データ構造
ディスクリプタ
タイマ
プロセス
イベントコールバック関数
ディスクリプタ
タイマ
プロセス
イベントフラグ
タスクキュー(runqueue.c/h)
タスクキューはuloopタイマにより実現され,タイマタイムアウト時間を1に設定し,uloopイベントループによりタイマを処理するとタスクキュー内のtaskを処理する.プロセスタスクはタスクキューで基本的に実現され、サブプロセスに参加して監視を終了します.
データ構造
インタフェースの説明
タスクキュー
タスクアクション
プロセスタスク
ストリームバッファ管理(ustream.c/h/ustream-fd.c)
データ構造
きおくこうぞう
インタフェースの説明
初期/破棄
read bufferへの書き込み
リードバッファー
一般的にnotify_read()コールバックインタフェースの使用
操作write buffer
最大限の能力を尽くしてwrite()コールバック用インタフェース書き込みを呼び出し、能力を超えた場合はwrite bufferに書き込まれていないデータを格納する
write bufferでのデータを実際の場所に書き込み、write()コールバックインタフェースとnotify_を呼び出すwrite()コールバックインタフェース.一般に、記述子のpoll操作で呼び出され、記述子が書き込み可能になったときに、前回書き込みされなかったコンテンツを直ちに書き込み操作することを示す.
例
プロセスタスクキュー
初期化:
タスクの定義:
タスクの追加:
ノートに全選コピー
インタフェースの説明
メインフレーム
/** * */
int uloop_init(void) /** * */ void uloop_run(void) /** * */ void uloop_done(void)
ディスクリプタイベント
/** * */
int uloop_fd_add(struct uloop_fd *sock, unsigned int flags)
/** * */
int uloop_fd_delete(struct uloop_fd *sock)
タイマイベント
/** * */
int uloop_timeout_add(struct uloop_timeout *timeout) /** * ( ), */ int uloop_timeout_set(struct uloop_timeout *timeout, int msecs) /** * */ int uloop_timeout_cancel(struct uloop_timeout *timeout) /** * */ int uloop_timeout_remaining(struct uloop_timeout *timeout)
プロセスイベント
/** * */
int uloop_process_add(struct uloop_process *p) /** * */ int uloop_process_delete(struct uloop_process *p)
データ構造
ディスクリプタ
struct uloop_fd {
uloop_fd_handler cb; /** , */
int fd; /** , */
bool eof;
bool error;
bool registered; /** uloop */
uint8_t flags;
};
タイマ
struct uloop_timeout {
struct list_head list;
bool pending;
uloop_timeout_handler cb; /** , */
struct timeval time; /** , */
};
プロセス
struct uloop_process {
struct list_head list;
bool pending;
uloop_process_handler cb; /** , */
pid_t pid; /** , */
};
イベントコールバック関数
ディスクリプタ
typedef void (*uloop_fd_handler)(struct uloop_fd *u, unsigned int events)
タイマ
typedef void (*uloop_timeout_handler)(struct uloop_timeout *t)
プロセス
typedef void (*uloop_process_handler)(struct uloop_process *c, int ret)
イベントフラグ
#define ULOOP_READ (1 << 0)
#define ULOOP_WRITE (1 << 1)
#define ULOOP_EDGE_TRIGGER (1 << 2)
#define ULOOP_BLOCKING (1 << 3)
#define ULOOP_EVENT_MASK (ULOOP_READ | ULOOP_WRITE)
タスクキュー(runqueue.c/h)
タスクキューはuloopタイマにより実現され,タイマタイムアウト時間を1に設定し,uloopイベントループによりタイマを処理するとタスクキュー内のtaskを処理する.プロセスタスクはタスクキューで基本的に実現され、サブプロセスに参加して監視を終了します.
データ構造
struct runqueue {
struct safe_list tasks_active; /** */
struct safe_list tasks_inactive; /** */
struct uloop_timeout timeout;
int running_tasks; /** */
int max_running_tasks; /** */
bool stopped; /** */
bool empty; /** ( ) */
/* called when the runqueue is emptied */
void (*empty_cb)(struct runqueue *q);
};
struct runqueue_task_type {
const char *name;
/* * called when a task is requested to run * * The task is removed from the list before this callback is run. It * can re-arm itself using runqueue_task_add. */
void (*run)(struct runqueue *q, struct runqueue_task *t);
/* * called to request cancelling a task * * int type is used as an optional hint for the method to be used when * cancelling the task, e.g. a signal number for processes. Calls * runqueue_task_complete when done. */
void (*cancel)(struct runqueue *q, struct runqueue_task *t, int type);
/* * called to kill a task. must not make any calls to runqueue_task_complete, * it has already been removed from the list. */
void (*kill)(struct runqueue *q, struct runqueue_task *t);
};
struct runqueue_task {
struct safe_list list;
const struct runqueue_task_type *type;
struct runqueue *q;
void (*complete)(struct runqueue *q, struct runqueue_task *t);
struct uloop_timeout timeout;
int run_timeout; /** >0 run_timeout */
int cancel_timeout; /** >0 run_timeout */
int cancel_type;
bool queued; /** */
bool running; /** , */
bool cancelled; /** */
};
struct runqueue_process {
struct runqueue_task task;
struct uloop_process proc;
};
インタフェースの説明
タスクキュー
/** * */
void runqueue_init(struct runqueue *q) /** * */ void runqueue_cancel(struct runqueue *q);
/** * */
void runqueue_cancel_active(struct runqueue *q);
/** * */
void runqueue_cancel_pending(struct runqueue *q);
/** * */
void runqueue_kill(struct runqueue *q);
/** * */
void runqueue_stop(struct runqueue *q);
/** * */
void runqueue_resume(struct runqueue *q);
タスクアクション
/** * * * @running true- ;false- */
void runqueue_task_add(struct runqueue *q, struct runqueue_task *t, bool running);
/** * * * @running true- ;false- */
void runqueue_task_add_first(struct runqueue *q, struct runqueue_task *t, bool running);
/** * */
void runqueue_task_complete(struct runqueue_task *t);
/** * */
void runqueue_task_cancel(struct runqueue_task *t, int type);
/** * */
void runqueue_task_kill(struct runqueue_task *t);
プロセスタスク
void runqueue_process_add(struct runqueue *q, struct runqueue_process *p, pid_t pid);
/** * to be used only from runqueue_process callbacks */
void runqueue_process_cancel_cb(struct runqueue *q, struct runqueue_task *t, int type);
void runqueue_process_kill_cb(struct runqueue *q, struct runqueue_task *t);
ストリームバッファ管理(ustream.c/h/ustream-fd.c)
データ構造
struct ustream_buf {
struct ustream_buf *next;
char *data; /** buff */
char *tail; /** buff */
char *end; /** buf */
char head[]; /** buf */
};
struct ustream_buf_list {
struct ustream_buf *head; /** 1 ustream_buf */
struct ustream_buf *data_tail; /** ustream_buf */
struct ustream_buf *tail; /** ustream_buf */
int (*alloc)(struct ustream *s, struct ustream_buf_list *l);
int data_bytes; /** */
int min_buffers; /** ustream_buf */
int max_buffers; /** ustream_buf */
int buffer_len; /** ustream_buf */
int buffers; /** ustream_buf */
};
struct ustream {
struct ustream_buf_list r, w;
struct uloop_timeout state_change;
struct ustream *next;
/* * notify_read: (optional) * called by the ustream core to notify that new data is available * for reading. * must not free the ustream from this callback */
void (*notify_read)(struct ustream *s, int bytes_new);
/* * notify_write: (optional) * called by the ustream core to notify that some buffered data has * been written to the stream. * must not free the ustream from this callback */
void (*notify_write)(struct ustream *s, int bytes);
/* * notify_state: (optional) * called by the ustream implementation to notify that the read * side of the stream is closed (eof is set) or there was a write * error (write_error is set). * will be called again after the write buffer has been emptied when * the read side has hit EOF. */
void (*notify_state)(struct ustream *s);
/* * write: * must be defined by ustream implementation, accepts new write data. * 'more' is used to indicate that a subsequent call will provide more * data (useful for aggregating writes) * returns the number of bytes accepted, or -1 if no more writes can * be accepted (link error) */
int (*write)(struct ustream *s, const char *buf, int len, bool more);
/* * free: (optional) * defined by ustream implementation, tears down the ustream and frees data */
void (*free)(struct ustream *s);
/* * set_read_blocked: (optional) * defined by ustream implementation, called when the read_blocked flag * changes */
void (*set_read_blocked)(struct ustream *s);
/* * poll: (optional) * defined by the upstream implementation, called to request polling for * available data. * returns true if data was fetched. */
bool (*poll)(struct ustream *s);
/* * ustream user should set this if the input stream is expected * to contain string data. the core will keep all data 0-terminated. */
bool string_data; /** ustream ,true- ;false- */
bool write_error; /** ,true- ;false- */
bool eof, eof_write_done;
enum read_blocked_reason read_blocked;
};
struct ustream_fd {
struct ustream stream;
struct uloop_fd fd;
};
きおくこうぞう
インタフェースの説明
初期/破棄
/** * ustream_fd_init: create a file descriptor ustream (uses uloop) */
void ustream_fd_init(struct ustream_fd *s, int fd) /** * ustream_init_defaults: fill default callbacks and options */ void ustream_init_defaults(struct ustream *s) /** * ustream_free: free all buffers and data associated with a ustream */ void ustream_free(struct ustream *s)
read bufferへの書き込み
/* * ustream_reserve: allocate rx buffer space * len read buffer , ustream_fill_read() * * len: hint for how much space is needed (not guaranteed to be met) * maxlen: pointer to where the actual buffer size is going to be stored */
char *ustream_reserve(struct ustream *s, int len, int *maxlen)
/** * ustream_fill_read: mark rx buffer space as filled * ustream_reseve() read buffer , * notify_read() , */
void ustream_fill_read(struct ustream *s, int len)
リードバッファー
一般的にnotify_read()コールバックインタフェースの使用
/* * ustream_get_read_buf: get a pointer to the next read buffer data * , ustream_consume() */
char *ustream_get_read_buf(struct ustream *s, int *buflen)
/** * ustream_consume: remove data from the head of the read buffer */
void ustream_consume(struct ustream *s, int len)
操作write buffer
最大限の能力を尽くしてwrite()コールバック用インタフェース書き込みを呼び出し、能力を超えた場合はwrite bufferに書き込まれていないデータを格納する
/* * ustream_write: add data to the write buffer */
int ustream_write(struct ustream *s, const char *buf, int len, bool more) int ustream_printf(struct ustream *s, const char *format, ...) int ustream_vprintf(struct ustream *s, const char *format, va_list arg)
write bufferでのデータを実際の場所に書き込み、write()コールバックインタフェースとnotify_を呼び出すwrite()コールバックインタフェース.一般に、記述子のpoll操作で呼び出され、記述子が書き込み可能になったときに、前回書き込みされなかったコンテンツを直ちに書き込み操作することを示す.
/*
* ustream_write_pending: attempt to write more data from write buffers
* returns true if all write buffers have been emptied.
*/ bool ustream_write_pending(struct ustream *s)
例
プロセスタスクキュー
初期化:
static struct runqueue q;
static void q_empty(struct runnqueue *q) {
}
static void task_init(void) {
runqueue_init(&q);
q.empty_cb = q_empty;
q.max_running_tasks = 1; /** */
}
タスクの定義:
struct task {
struct runqueue_process proc;
char arg[128];
};
static void task_run(struct runqueue *q, struct runqueue_task *t) {
struct task *tk = container_of(t, struct task, proc.task);
pid_t pid;
pid = fork();
if (pid < 0)
return;
if (pid) {
/** * task , task * uloop , task , * 「runqueue_start_next」 */
runqueue_process_add(q, &tk->proc, pid);
return;
}
/** sleep */
execlp("sleep", "sleep", tk->arg, NULL);
exit(1);
}
struct const struct runqueue_task_type task_type = {
.run = task_run, /** */
.cancel = runqueue_process_cancel_cb, /** */
.kill = runqueue_process_kill_cb, /** */
};
タスクの追加:
ノートに全選コピー
static void task_complete(struct runqueue *q, struct runqueue_task *t) {
struct task *tk = container_of(t, struct task, proc.task);
free(tk);
}
void task_add(char *arg) {
struct task *tk;
tk = calloc(1, sizeof(struct task));
tk->proc.task.type = &task_type; /** */
tk->proc.task.complete = task_complete; /** hook */
tk->proc.task.run_timeout = timeout; /** */
strcpy(tk->arg, arg);
/** */
runqueue_task_add(&q, &tk->proc.task, false);
}