Start working on multi-threaded backend
authorSteve McIntyre <steve@einval.com>
Wed, 19 Jan 2011 21:48:22 +0000 (21:48 +0000)
committerSteve McIntyre <steve@einval.com>
Wed, 19 Jan 2011 21:48:22 +0000 (21:48 +0000)
Add a background thread, started at mount time. Send encode requests
to this background thread instead of doing them directly.

Split out the code in encoded_size(); it now simply reports a size, or
a failure instead of triggering encoding itself. Code is cleaner this
way, and may allow for easier re-use.

For now, the background thread does the encoding directly. But it will
be much easier to have intelligence here so we can detect a sequence
of encodes and trigger encoding a complete directory in the
background.

C/fuse-music.c

index 7aa11ee..ed6cb2f 100644 (file)
@@ -11,6 +11,7 @@
 #include <dirent.h>
 #include <pthread.h>
 #include <stddef.h>
+#include <libgen.h>
 #include "fmdb.h"
 #include "fmcache.h"
 #include "misc.h"
@@ -51,9 +52,27 @@ format_t formats[] =
 
 static fmt_e format_index = OUTFMT_OGG;
 char *quality = NULL;
-
 FMDB *db_state = NULL;
 
+/* Linked list of things to work on / have been worked on */
+typedef struct _work_list_t
+{
+    struct _work_list_t *next;
+    int error;
+    char flac_path[PATH_MAX];
+} work_list_t;
+
+pthread_t bg_thread;
+struct thread_work
+{
+    pthread_mutex_t lock;
+    pthread_cond_t  cond_todo;
+    work_list_t    *todo;
+    pthread_cond_t  cond_done;
+    work_list_t    *done;
+};
+struct thread_work global_work;
+
 struct mount_opts
 {
     char *basedir;
@@ -63,8 +82,8 @@ struct mount_opts
     char *format;
     char *quality;
     char *logfile;
+    int   num_threads;
 };
-
 struct mount_opts mo;
 
 #define FUSE_MOUNT_OPT(t, p) { t, offsetof(struct mount_opts, p), 1 }
@@ -77,6 +96,7 @@ static const struct fuse_opt fm_mount_opts[] = {
     FUSE_MOUNT_OPT("--format=%s",        format),
     FUSE_MOUNT_OPT("--quality=%s",       quality),
     FUSE_MOUNT_OPT("--logfile=%s",       logfile),
+    FUSE_MOUNT_OPT("--num_threads=%d",   num_threads),
     FUSE_OPT_END
 };
 
@@ -221,11 +241,86 @@ static int encode_file(const char *flac_path)
     return 0;
 }
 
-static size_t encoded_size(const char *flac_path)
+static int enqueue_encode_file(const char *flac_path)
+{
+    int error = 0;
+    int done = 0;
+    work_list_t *entry = calloc(1, sizeof *entry);
+
+    entry->next = NULL;
+    entry->error = 0;
+    strcpy(entry->flac_path, flac_path);
+
+    pthread_mutex_lock(&global_work.lock);
+    /* Empty list */
+    if (global_work.todo == NULL)
+    {
+        global_work.todo = entry;
+    }
+    else
+    {
+        /* Append to list */
+        work_list_t *current = NULL;
+        current = global_work.todo;
+        while (current->next)
+            current = current->next;
+        current->next = entry;
+    }
+
+    fprintf(logfile, "%s: signal\n", __func__);
+    /* Tell the background thread that we have work to do */
+    pthread_cond_signal(&global_work.cond_todo);
+    pthread_mutex_unlock(&global_work.lock);
+
+    /* Now wait for it to respond, either to say it's done the encode
+     * we wanted or it failed. */
+    while (!done)
+    {
+        work_list_t *current = NULL;
+        work_list_t *prev = NULL;
+
+        pthread_mutex_lock(&global_work.lock);
+        while (NULL == global_work.done) {
+            pthread_cond_wait(&global_work.cond_done, &global_work.lock);
+        }
+
+        fprintf(logfile, "%s: wakeup\n", __func__);
+
+        /* Looks like something was encoded; was it what we asked
+         * for? */
+        current = global_work.done;
+        prev = current;
+        while (current)
+        {
+            if (!strcmp(current->flac_path, flac_path))
+            {
+                fprintf(logfile, "%s: our task is done (%s), error %d\n",
+                        __func__, current->flac_path, current->error);
+                /* it's us; dequeue and return the error */
+                done = 1;
+                error = current->error;
+                free(current);
+                if (current == global_work.done)
+                {
+                    global_work.done = NULL;
+                }
+                else
+                {
+                    prev->next = NULL;
+                }
+                break;
+            }
+            current = current->next;
+        }
+        pthread_mutex_unlock(&global_work.lock);
+    }
+    return error;
+}
+
+static size_t encoded_size(const char *flac_path, size_t *size)
 {
     db_size_entry_t in, out;
     int error = 0;
-    int need_to_encode = 0;
     time_t mtime = file_mtime(flac_path);
 
     strncpy(in.flac_path, strip_leading_path(flac_path, mo.basedir), sizeof(in.flac_path));
@@ -236,40 +331,24 @@ static size_t encoded_size(const char *flac_path)
     if (error)
     {
         fprintf(logfile, "%s: returned error %d for %s\n", __func__, error, flac_path);
-        need_to_encode = 1;
+        return error;
     }
     else if (mtime > out.mtime)
     {
         fprintf(logfile, "%s: file is newer than cached size entry (%ld > %ld) for %s; re-encode\n",
                 __func__, mtime, out.mtime, flac_path);
-        need_to_encode = 1;
+        return EAGAIN;
     }
-
-    if (!need_to_encode)
-        return out.size;
     
-    /* else */
-
-    error = encode_file(flac_path);
-    if (error)
-        return -1;
-
-    /* Encoding should have updated the database, so look again */
-    error = db_lookup_size_entry(db_state, &in, &out);
-    if (error)
-    {
-        fprintf(logfile, "%s: returned error %d for %s after an encode attempt!\n",
-                __func__, error, flac_path);
-        return -1;
-    }
-    
-    return out.size;
+    *size = out.size;
+    return 0;
 }
 
 static int fm_getattr(const char *path, struct stat *stbuf)
 {
     int res = 0;
     char *flac_path = NULL;
+    int error = 0;
 
     memset(stbuf, 0, sizeof(struct stat));
     if (strcmp(path, "/") == 0)
@@ -298,9 +377,26 @@ static int fm_getattr(const char *path, struct stat *stbuf)
 
     if (S_ISREG(stbuf->st_mode) && is_flac(flac_path))
     {
+        size_t size;
+        error = encoded_size(flac_path, &size);
+        if (error)
+        {
+            /* Need to (re-)encode the file */
+            error = enqueue_encode_file(flac_path);
+            if (error)
+            {
+                free(flac_path);
+                return -error;
+            }
+
+            /* Encoding should have updated the database, so look again */
+            error = encoded_size(flac_path, &size);
+            if (error)
+                return -error;
+        }
         stbuf->st_mode = S_IFREG | 0444;
         stbuf->st_nlink = 1;
-        stbuf->st_size = encoded_size(flac_path);
+        stbuf->st_size = size;
         stbuf->st_blocks = 0;
     }
     
@@ -395,7 +491,7 @@ static int fm_open(const char *path, struct fuse_file_info *fi)
         char cache_full_path[PATH_MAX];
         if (NULL == cache_file_name)
         {
-            error = encode_file(flac_path);
+            error = enqueue_encode_file(flac_path);
             if (error)
             {
                 free(flac_path);
@@ -489,6 +585,68 @@ static struct fuse_operations fm_oper = {
     .statfs         = fm_statfs,
 };
 
+void *bg_handler(void *arg)
+{
+    struct thread_work *bg_work = arg;
+    struct
+    {
+        char last[PATH_MAX];
+        int  num;
+    } dir_state;
+
+    strcpy(dir_state.last, "");
+    dir_state.num = 0;
+    
+    while(1)
+    {
+        work_list_t *current = NULL;
+        char *tmp_path = NULL;
+        char *current_dir;
+
+        /* wait to be given some work to do */
+        pthread_mutex_lock(&bg_work->lock);
+        while (NULL == bg_work->todo) {
+            pthread_cond_wait(&bg_work->cond_todo, &bg_work->lock);
+        }
+        
+        fprintf(logfile, "%s: wakeup\n", __func__);
+        /* Yay! We've got something. Take the first item off the
+         * list. */
+        current = bg_work->todo;
+        bg_work->todo = current->next;
+        fprintf(logfile, "%s: asked to encode %s\n", __func__, current->flac_path);
+        pthread_mutex_unlock(&bg_work->lock);
+
+        tmp_path = strdup(current->flac_path);
+        current_dir = dirname(tmp_path);
+        if (!strcmp(current_dir, dir_state.last))
+        {
+            dir_state.num++;
+            fprintf(logfile, "%s: triggered again on dir %s, now called %d times in succession\n",
+                    __func__, dir_state.last, dir_state.num);
+        }
+        else
+        {
+            strcpy(dir_state.last, current_dir);
+            dir_state.num = 1;
+            fprintf(logfile, "%s: new dir %s\n", __func__, dir_state.last);
+        }
+        free(tmp_path);
+        current->error = encode_file(current->flac_path);
+
+        /* And put the same entry back at the start of the "done"
+         * queue. */
+        pthread_mutex_lock(&bg_work->lock);
+        current->next = bg_work->done;
+        bg_work->done = current;
+
+        fprintf(logfile, "%s: signal\n", __func__);
+        /* And wake up the other end */
+        pthread_cond_signal(&bg_work->cond_done);
+        pthread_mutex_unlock(&bg_work->lock);
+    }
+}
+
 int main(int argc, char *argv[])
 {
     int i = 0;
@@ -500,6 +658,7 @@ int main(int argc, char *argv[])
     logfile = stderr;
 
     mo.cachesize = CACHE_DEFAULT_SIZE;
+    mo.num_threads = 1;
     fuse_opt_parse(&args, &mo, fm_mount_opts, NULL);
 
     if (mo.logfile)
@@ -578,6 +737,11 @@ int main(int argc, char *argv[])
         return 1;
     }
 
+    pthread_mutex_init(&global_work.lock, NULL);
+    pthread_cond_init(&global_work.cond_todo, NULL);
+    pthread_cond_init(&global_work.cond_done, NULL);
+    error = pthread_create(&bg_thread, NULL, bg_handler, &global_work);
+
     error = fuse_main(args.argc, args.argv, &fm_oper, NULL);
     if (error)
         fprintf(logfile, "fuse_main failed, error %d\n", error);