Expand the directory-tracking pre-coding logic
[fuse-music.git] / C / fuse-music.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <string.h>
4 #include <errno.h>
5 #include <fcntl.h>
6 #include <sys/types.h>
7 #include <stdbool.h>
8 #include <ctype.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include <stdlib.h>
12 #include <dirent.h>
13 #include <pthread.h>
14 #include <stddef.h>
15 #include <libgen.h>
16 #include "fmdb.h"
17 #include "fmcache.h"
18 #include "misc.h"
19
20 #define FUSE_USE_VERSION 26 /* Latest version */
21 #include <fuse.h>
22 #include <fuse_opt.h>
23
24 FILE *logfile = NULL;
25
26 #define MAIN_DEBUG 0
27
28 #if MAIN_DEBUG >= 0
29 #   define MAINLOG0(x) LOGPRINT(x)
30 #else
31 #   define MAINLOG0(x)
32 #endif
33
34 #if MAIN_DEBUG >= 1
35 #   define MAINLOG1(x) LOGPRINT(x)
36 #else
37 #   define MAINLOG1(x)
38 #endif
39
40 #if MAIN_DEBUG >= 2
41 #   define MAINLOG2(x) LOGPRINT(x)
42 #else
43 #   define MAINLOG2(x)
44 #endif
45
46 typedef enum
47 {
48     OUTFMT_OGG = 0,
49     OUTFMT_MP3,
50     OUTFMT_INVALID
51 } fmt_e;
52
53 typedef struct
54 {
55     char *extension;
56     int   extension_len;
57     char *default_quality;
58     char *encode_command;
59 } format_t;
60
61 format_t formats[] =
62 {
63     { "ogg", 3, "7", "encode_ogg" },
64     { "mp3", 3, "4", "encode_mp3" },
65 };
66
67 #define KiB 1024
68 #define MiB (KiB * KiB)
69 #define GiB (KiB * KiB * KiB)
70
71 #define CACHE_MIN_SIZE (100ULL * MiB)
72 #define CACHE_DEFAULT_SIZE (10ULL * GiB)
73 #define NUM_LRU_DIRS 10
74 #define DIR_TRIGGER 2
75
76 static fmt_e format_index = OUTFMT_OGG;
77 static long long cachesize_value = CACHE_DEFAULT_SIZE;
78 static long num_encoder_threads_value = 1;
79 char *quality = NULL;
80 FMDB *db_state = NULL;
81
82 typedef enum
83 {
84     ENCODE_WAITING = 0,
85     ENCODE_IN_PROGRESS,
86     ENCODE_DONE,
87     ENCODE_INVALID
88 } encode_state_e;
89
90 typedef enum
91 {
92     ENCODE_DONT_KEEP = 0,
93     ENCODE_KEEP,
94 } encode_keep_e;
95
96 /* Linked list of things to work on / have been worked on */
97 typedef struct _work_list_t
98 {
99     struct _work_list_t *next;
100     int                  error;
101     char                 flac_path[PATH_MAX];
102     pthread_mutex_t      lock;
103     encode_state_e       state;
104     pthread_cond_t       cv;
105     int                  refcount;
106     int                  keep;
107 } work_list_t;
108
109 pthread_t *bg_thread;
110 struct thread_work
111 {
112     pthread_mutex_t lock;
113     int             num_todo;
114     pthread_cond_t  cv;
115     work_list_t    *work_list;
116 };
117 struct thread_work global_work;
118
119 #define MOUNT_OPT(n)      char *n
120 #define FUSE_MOUNT_OPT(n) {"--"#n"=%s", offsetof(struct mount_opts, n), 1}
121 #define CONFIG_OPT(n)     {#n, offsetof(struct mount_opts, n)}
122
123 struct mount_opts
124 {
125     MOUNT_OPT(config_file);    
126     MOUNT_OPT(basedir);    
127     MOUNT_OPT(cachedir);
128     MOUNT_OPT(db_file);
129     MOUNT_OPT(format);
130     MOUNT_OPT(quality);
131     MOUNT_OPT(logfile);
132     MOUNT_OPT(cachesize);
133     MOUNT_OPT(num_threads);
134 };
135 struct mount_opts mo;
136
137 static const struct fuse_opt fm_mount_opts[] = {    
138     FUSE_MOUNT_OPT(config_file),
139     FUSE_MOUNT_OPT(basedir),
140     FUSE_MOUNT_OPT(cachedir),
141     FUSE_MOUNT_OPT(db_file),
142     FUSE_MOUNT_OPT(format),
143     FUSE_MOUNT_OPT(quality),
144     FUSE_MOUNT_OPT(logfile),
145     FUSE_MOUNT_OPT(cachesize),
146     FUSE_MOUNT_OPT(num_threads),
147     FUSE_OPT_END
148 };
149
150 struct config_opt
151 {
152     char *match;
153     unsigned long offset;
154 };
155 static const struct config_opt config_opts[] = {
156     CONFIG_OPT(config_file),
157     CONFIG_OPT(basedir),
158     CONFIG_OPT(cachedir),
159     CONFIG_OPT(db_file),
160     CONFIG_OPT(format),
161     CONFIG_OPT(quality),
162     CONFIG_OPT(logfile),
163     CONFIG_OPT(cachesize),
164     CONFIG_OPT(num_threads),
165     {NULL, 0}
166 };
167
168 struct
169 {
170     pthread_mutex_t lock;
171     char *lru_dirs[NUM_LRU_DIRS];
172 } dir_state;
173 pthread_once_t once_control = PTHREAD_ONCE_INIT;
174 void *bg_handler(void *arg);
175 void *bg_db_cleanup(void *arg);
176
177 static void init_threads(void)
178 {
179     intptr_t i = 0;
180     int *threadcount;
181     pthread_mutexattr_t attr;
182     pthread_mutexattr_init(&attr);
183     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK_NP);    
184
185     pthread_mutex_init(&dir_state.lock, &attr);
186     pthread_mutex_init(&global_work.lock, &attr);
187     pthread_cond_init(&global_work.cv, NULL);
188     /* +1 for the DB cleanup thread */
189     bg_thread = malloc((1 + num_encoder_threads_value) * sizeof(pthread_t));
190     if (!bg_thread)
191     {
192         MAINLOG0((logfile, "%s: Failed to malloc thread space for thread descriptors\n", __func__));
193         db_close(db_state);
194     }
195     threadcount = malloc((1 + num_encoder_threads_value) * sizeof(int));
196     if (!threadcount)
197     {
198         MAINLOG0((logfile, "%s: Failed to malloc thread space for thread numbers\n", __func__));
199         db_close(db_state);
200     }
201     for (i = 0; i < num_encoder_threads_value; i++)
202     {
203         threadcount[i] = i;
204         (void)pthread_create(&bg_thread[i], NULL, bg_handler, &threadcount[i]);
205     }
206     (void)pthread_create(&bg_thread[i], NULL, bg_db_cleanup, &threadcount[i]);
207     i++;
208 }
209
210 static char *str_encode_state(encode_state_e state)
211 {
212     switch (state)
213     {
214         case ENCODE_WAITING:
215             return "WAITING";
216         case ENCODE_IN_PROGRESS:
217             return "IN_PROGRESS";
218         case ENCODE_DONE:
219             return "DONE";
220         default:
221             return "BOGUS";
222     }
223 }
224
225 static void dump_global_queue(void)
226 {
227     work_list_t *entry = global_work.work_list;
228     int i = 0;
229     int states[ENCODE_INVALID] = {0};
230
231     log_lock();
232     fprintf(logfile, "%s:\n", __func__);
233     fprintf(logfile, " #  enc_state   keep ref flac\n");
234     while (entry)
235     {
236         fprintf(logfile, "% 3d %11s % 2d   % 2d  %s\n",
237                 i++, str_encode_state(entry->state),
238                 entry->keep, entry->refcount, entry->flac_path);
239         states[entry->state]++;
240         entry = entry->next;
241     }
242     fprintf(logfile, "found %d entries total:\n", i);
243     for (i = 0; i < ENCODE_INVALID; i++)
244         fprintf(logfile, "  %s: %d ", str_encode_state(i), states[i]);
245     fprintf(logfile, "\n");
246     log_unlock();
247 }
248
249 static char *convert_to_base_path(const char *path)
250 {
251     char *basename = strdup(path);
252     int length;
253     char *out = NULL;
254     int ret = 0;
255
256 //    fprintf(logfile, "%s: path %s, mo.basedir %s\n", __func__, path, mo.basedir);
257     
258     if (!basename)
259         return NULL;
260
261     length = strlen(basename);
262     length -= (formats[format_index].extension_len);
263     if ((length >= 1) &&
264         (basename[length-1] == '.') &&
265         (!strcmp(&basename[length], formats[format_index].extension)))
266     {
267         basename[length] = 0;
268         ret = asprintf(&out, "%s%sflac", mo.basedir, basename);
269     }
270     else
271     {
272         ret = asprintf(&out, "%s%s", mo.basedir, basename);
273     }
274
275     if (-1 == ret)
276     {
277         errno = ENOMEM;
278         out = NULL;
279     }
280     
281     free(basename);
282     return out;
283 }
284
285 static bool is_flac(const char *path)
286 {
287     int length = strlen(path);
288     if ( (length >= 5) && /* strlen(".flac") */
289          (!strcmp(&path[length-5], ".flac")))
290         return true;
291     /* else */
292     return false;
293 }
294
295 static size_t encoded_size(const char *flac_path, long long *size)
296 {
297     db_size_entry_t in, out;
298     int error = 0;
299     time_t mtime = file_mtime(flac_path);
300
301     strncpy(in.flac_path, strip_leading_path(flac_path, mo.basedir), sizeof(in.flac_path));
302     strncpy(in.format_choice, formats[format_index].extension, sizeof(in.format_choice));
303     strncpy(in.format_quality, quality, sizeof(in.format_quality));
304
305     error = db_lookup_size_entry(db_state, &in, &out);
306     if (error)
307     {
308         MAINLOG1((logfile, "%s: returned error %d for %s\n", __func__, error, flac_path));
309         return error;
310     }
311     else if (mtime > out.mtime)
312     {
313         MAINLOG1((logfile, "%s: file is newer than cached size entry (%ld > %ld) for %s; re-encode\n",
314                   __func__, mtime, out.mtime, flac_path));
315         return EAGAIN;
316     }
317     
318     *size = out.size;
319     return 0;
320 }
321
322 static char *convert_from_flac_name(const char *path)
323 {
324     int length = strlen(path);
325
326     /* length -5(.flac) +1(".") + 1(NULL) */
327     char *out = malloc(length - 3 + formats[format_index].extension_len);
328     if (!out)
329     {
330         errno = ENOMEM;
331         return NULL;
332     }
333     memcpy(out, path, length - 5);
334     sprintf(&out[length - 5], ".%s", formats[format_index].extension);
335     return out;
336 }
337
338 static int call_encoder(int threadnum, const char *flac_path, const char *cache_path)
339 {
340     int error = 0;
341     char *command = NULL;
342
343     error = asprintf(&command, "%s -t %d -q %s \"%s\" \"%s\"",
344                      formats[format_index].encode_command,
345                      threadnum,
346                      quality,
347                      flac_path,
348                      cache_path);
349     if (-1 == error)
350         return ENOMEM;
351
352     MAINLOG1((logfile, "%s: Ready to run command %s\n", __func__, command));
353     error = system(command);
354     MAINLOG1((logfile, "%s: system() returned %d\n", __func__, error));
355     return error;
356 }
357
358 static int encode_file(int threadnum, const char *flac_path, int keep)
359 {
360     db_size_entry_t size_entry;
361     int error = 0;
362     long long needed_size = file_size(flac_path);
363     time_t flac_mtime = file_mtime(flac_path);
364     char *cache_file_name = NULL;
365     char cache_path[PATH_MAX];
366
367     /* If we don't have a db entry for the file already, then the only
368      * guess that we can make about the encoded size is that it will
369      * be smaller than the flac size. Use that as a guide for the
370      * space we need. */
371     if (0 != encoded_size(flac_path, &needed_size))
372     {
373         needed_size = file_size(flac_path);
374         MAINLOG2((logfile, "%s: No DB entry yet for %s, so we'll have to guess at the flac size (%lld) for cache space\n",
375                   __func__, flac_path, needed_size));
376     }
377
378     cache_file_name = cache_allocate(mo.basedir, flac_path, needed_size);
379     if (!cache_file_name)
380     {
381         MAINLOG0((logfile, "%s: unable to allocate %lld bytes for a file in the cache, error %d\n",
382                   __func__, needed_size, errno));
383         return errno;
384     }
385
386     sprintf(cache_path, "%s/%s", mo.cachedir, cache_file_name);
387     error = call_encoder(threadnum, flac_path, cache_path);
388     if (error)
389     {
390         MAINLOG0((logfile, "%s: encoding failed, error %d\n", __func__, error));
391         cache_entry_failed(cache_file_name);
392         free(cache_file_name);
393         return error;
394     }
395     
396     strncpy(size_entry.flac_path, strip_leading_path(flac_path, mo.basedir), sizeof(size_entry.flac_path));
397     strncpy(size_entry.format_choice, formats[format_index].extension, sizeof(size_entry.format_choice));
398     strncpy(size_entry.format_quality, quality, sizeof(size_entry.format_quality));
399     size_entry.size = file_size(cache_path);
400     size_entry.mtime = flac_mtime;
401
402     error = db_store_size_entry(db_state, &size_entry);
403     if (error)
404     {
405         MAINLOG0((logfile, "%s: unable to store size entry for %s, error %d\n",
406                   __func__, flac_path, error));
407         cache_entry_failed(cache_file_name);
408         free(cache_file_name);
409         return error;
410     }
411
412     error = cache_entry_encode_complete(cache_file_name,
413                                         file_size(cache_path),
414                                         keep);
415     if (error)
416     {
417         MAINLOG0((logfile, "%s: unable to update cache DB entry for %s, error %d\n",
418                   __func__, flac_path, error));
419         cache_entry_failed(cache_file_name);
420         free(cache_file_name);
421         return error;
422     }
423
424     free(cache_file_name);
425     return 0;
426 }
427
428 static int add_todo_entry_file(const char *flac_path,
429                                work_list_t **enqueued,
430                                encode_keep_e keep)
431 {
432     work_list_t *entry;
433     char dbgname[64];
434     pthread_mutexattr_t attr;
435     pthread_mutexattr_init(&attr);
436     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK_NP);
437
438     sprintf(dbgname, "%s(%ld):", __func__, (unsigned long)pthread_self());
439     
440     /* Check if we already have an entry for this file; if so, just
441      * add ourselves to the ref count. If not, add a new entry. */
442     entry = global_work.work_list;
443     while (entry)
444     {
445         pthread_mutex_lock(&entry->lock);
446         if (!strcmp(entry->flac_path, flac_path))
447         {
448             entry->refcount++;
449             *enqueued = entry;
450             if (keep)
451                 entry->keep++;
452             pthread_mutex_unlock(&entry->lock);
453             MAINLOG1((logfile, "%s: refcount++ on %s\n", dbgname, flac_path));
454             return 0;
455         }
456         pthread_mutex_unlock(&entry->lock);
457         entry = entry->next;
458     }
459
460     /* If we get here, we didn't find an existing entry */
461     entry = calloc(1, sizeof *entry);
462     if (!entry)
463         return ENOMEM;
464
465     entry->next = NULL;
466     entry->error = 0;
467     strcpy(entry->flac_path, flac_path);
468     pthread_mutex_init(&entry->lock, &attr);
469     pthread_cond_init(&entry->cv, NULL);    
470     entry->state = ENCODE_WAITING;
471     entry->refcount = 1;
472
473     MAINLOG1((logfile, "%s: adding file %s to encode list\n", dbgname, flac_path));
474
475     /* Empty list */
476     if (global_work.work_list == NULL)
477     {
478         global_work.work_list = entry;
479     }
480     else
481     {
482         /* Append to list */
483         work_list_t *current = NULL;
484         current = global_work.work_list;
485         while (current->next)
486             current = current->next;
487         current->next = entry;
488     }
489     global_work.num_todo++;
490     *enqueued = entry;
491     return 0;
492 }
493
494 static void free_todo_entry_file(work_list_t *entry)
495 {
496     char dbgname[64];
497     sprintf(dbgname, "%s(%ld):", __func__, (unsigned long)pthread_self());
498
499     entry->refcount--;
500     if (0 == entry->refcount)
501     {
502         MAINLOG1((logfile, "%s: entry %s is finished, free it now\n", dbgname, entry->flac_path));
503         pthread_cond_destroy(&entry->cv);
504         pthread_mutex_unlock(&entry->lock);
505         pthread_mutex_destroy(&entry->lock);
506         free(entry);
507     }
508     else
509     {
510         MAINLOG1((logfile, "%s: entry %s still in use %d time(s)\n", dbgname, entry->flac_path, entry->refcount));
511         pthread_mutex_unlock(&entry->lock);
512     }        
513 }
514
515 static void *entry_file_thread(void *arg, encode_keep_e keep)
516 {
517     char *flac_path = arg;
518     int error = 0;
519     work_list_t *entry;
520     char dbgname[64];
521     sprintf(dbgname, "%s(%ld):", __func__, (unsigned long)pthread_self());
522     
523     pthread_mutex_lock(&global_work.lock);
524     error = add_todo_entry_file(flac_path, &entry, keep);
525     free(flac_path);
526     if (error)
527     {
528         pthread_mutex_unlock(&global_work.lock);        
529         return NULL;
530     }
531     MAINLOG1((logfile, "%s: signal, %d files in queue\n", dbgname, global_work.num_todo));
532     /* Tell the background thread that we have work to do */
533     pthread_cond_broadcast(&global_work.cv);
534     pthread_mutex_unlock(&global_work.lock);    
535
536     pthread_mutex_lock(&entry->lock);
537     while (entry->state != ENCODE_DONE)
538     {
539         pthread_cond_wait(&entry->cv, &entry->lock);
540     }
541     MAINLOG1((logfile, "%s: wakeup, %s done, error %d\n",
542               dbgname, entry->flac_path, entry->error));
543     error = entry->error;
544     free_todo_entry_file(entry);
545     return NULL;
546 }
547
548 static void *entry_file_thread_keep(void *arg)
549 {
550     return entry_file_thread(arg, ENCODE_KEEP);
551 }
552
553 static void *entry_file_thread_nokeep(void *arg)
554 {
555     return entry_file_thread(arg, ENCODE_DONT_KEEP);
556 }
557
558 static int add_todo_entry_dir(const char *flac_dir, int keep)
559 {
560     char *flac_path;
561     struct dirent *de;        
562     int error = 0;
563     DIR *dp = opendir(flac_dir);
564     long long enc_size = 0;
565     if (dp == NULL)
566         return errno;
567     int should_encode;
568
569     MAINLOG1((logfile, "%s: adding contents of dir %s to encode list\n", __func__, flac_dir));
570
571     /* Start a new thread for each file we want to encode, to deal
572      * with cleanup etc. */
573     while ((de = readdir(dp)) != NULL)
574     {
575         struct stat st;
576         should_encode = 1;
577         
578         if (!asprintf(&flac_path, "%s/%s", flac_dir, de->d_name))
579         {
580             MAINLOG0((logfile, "%s: asprintf failure\n", __func__));
581             return -ENOMEM;
582         }
583         error = lstat(flac_path, &st);
584         if (error < 0)
585         {
586             MAINLOG0((logfile, "%s: lstat %s returned error %d\n", __func__, flac_path, error));
587             free(flac_path);
588             closedir(dp);
589             return errno;
590         }
591
592         if (should_encode && !S_ISREG(st.st_mode))
593         {
594             /* Not a file - don't encode it */
595             should_encode = 0;
596             MAINLOG1((logfile, "%s: dirent %s (%s) is not a file, ignore it\n", __func__, de->d_name, flac_path));
597         }
598         
599         if (should_encode && !is_flac(de->d_name))
600         {
601             /* Not a flac file - don't encode it */
602             should_encode = 0;
603             MAINLOG1((logfile, "%s: dirent %s (%s) is not a flac, ignore it\n", __func__, de->d_name, flac_path));
604         }
605         
606         if (should_encode && !keep && (0 == encoded_size(flac_path, &enc_size)))
607         {
608             /* We've been called for readdir(), so we have nokeep. The
609              * file already exists in the size cache validly at this
610              * point, so there's no need to re-encode it *again* just
611              * for a readdir lookahead at this point. */
612             should_encode = 0;
613             MAINLOG1((logfile, "%s: dirent %s (%s) does not need re-encoding for readdir, ignore it\n",
614                       __func__, de->d_name, flac_path));
615         }
616         
617         if (should_encode)
618         {
619             pthread_t thread;
620             pthread_attr_t attr;
621             pthread_attr_init(&attr);
622             pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
623             pthread_attr_setstacksize(&attr, 4 * 1024 * 1024);
624             
625             if (keep)
626                 error = pthread_create(&thread, &attr, entry_file_thread_keep, flac_path);
627             else
628                 error = pthread_create(&thread, &attr, entry_file_thread_nokeep, flac_path);
629             
630             pthread_attr_destroy(&attr);
631             if (error)
632             {
633                 MAINLOG0((logfile, "%s: pthread_create returned error %d\n", __func__, error));
634                 closedir(dp);
635                 free(flac_path);
636                 return error;
637             }
638         }
639         else
640             free(flac_path);
641     }
642     closedir(dp);
643     return 0;
644 }
645
646 /* Add the current directory to the LRU state at the beginning;
647  * shuffle all the other entries down one. Pass back the number of
648  * entries that match the new one (including that new one, so will
649  * always be 1 or more!). */
650 static int update_dir_state(char *current_dir, int *num_matches)
651 {
652     int i = 0;
653     int matches = 0;
654     char dbgname[64];
655     sprintf(dbgname, "%s(%ld):", __func__, (unsigned long)pthread_self());
656
657     pthread_mutex_lock(&dir_state.lock);
658
659     /* Free the last one on the current list */
660     free (dir_state.lru_dirs[NUM_LRU_DIRS - 1]);
661
662     /* Shuffle the others down, checking for matches as we go */
663     for (i = NUM_LRU_DIRS - 1; i >= 1; i--)
664     {
665         dir_state.lru_dirs[i] = dir_state.lru_dirs[i-1];
666         if ((NULL != dir_state.lru_dirs[i]) && 
667             (!strcmp(current_dir, dir_state.lru_dirs[i])))
668             matches++;
669     }
670
671     /* Copy the new one into place */
672     dir_state.lru_dirs[0] = strdup(current_dir);
673     if (NULL == dir_state.lru_dirs[0])
674     {
675         pthread_mutex_unlock(&dir_state.lock);
676         return ENOMEM;
677     }
678
679     /* Add 1 for the one we just added */
680     matches++;    
681
682     MAINLOG1((logfile, "%s: added saved dir %s, now called %d times in recent history (last %d)\n",
683               dbgname, current_dir, matches, NUM_LRU_DIRS));
684     *num_matches = matches;
685     pthread_mutex_unlock(&dir_state.lock);
686     return 0;
687 }    
688
689 static int enqueue_encode_file(const char *flac_path, encode_keep_e keep)
690 {
691     int error = 0;
692     char *tmp_path = NULL;
693     char *current_dir;
694     work_list_t *entry;
695     char dbgname[64];
696     sprintf(dbgname, "%s(%ld):", __func__, (unsigned long)pthread_self());
697
698     pthread_mutex_lock(&global_work.lock);
699     error = add_todo_entry_file(flac_path, &entry, keep);
700     if (error)
701     {
702         pthread_mutex_unlock(&global_work.lock);
703         return error;
704     }
705
706     pthread_mutex_unlock(&global_work.lock);
707
708     if (num_encoder_threads_value > 1)
709     {
710         int num_matches = 0;
711
712         tmp_path = strdup(flac_path);
713         current_dir = dirname(tmp_path);
714
715         error = update_dir_state(current_dir, &num_matches);
716         if (error)
717         {
718             free(tmp_path);
719             pthread_mutex_unlock(&global_work.lock);
720             MAINLOG0((logfile, "%s: update_dir_state failed, error %d\n",
721                       dbgname, error));
722             return error;
723         }
724
725         if (num_matches >= DIR_TRIGGER)
726         {
727             error = add_todo_entry_dir(tmp_path, keep);
728             if (error)
729             {
730                 free(tmp_path);
731                 pthread_mutex_unlock(&global_work.lock);
732                 MAINLOG0((logfile, "%s: add_todo_entry_dir failed, error %d\n",
733                           dbgname, error));
734                 return error;
735             }
736         }
737         free(tmp_path);
738     } /* num_encoder_threads_value > 1 */
739  
740     pthread_mutex_lock(&global_work.lock);
741     MAINLOG1((logfile, "%s: signal, %d files in queue\n", dbgname, global_work.num_todo));
742     /* Tell the background thread that we have work to do */
743     pthread_cond_broadcast(&global_work.cv);
744     pthread_mutex_unlock(&global_work.lock);
745
746     pthread_mutex_lock(&entry->lock);
747     while (entry->state != ENCODE_DONE)
748     {
749         pthread_cond_wait(&entry->cv, &entry->lock);
750     }
751     MAINLOG1((logfile, "%s: wakeup, %s done, error %d\n",
752               dbgname, entry->flac_path, entry->error));
753     error = entry->error;
754     free_todo_entry_file(entry);
755     return error;
756 }
757
758 static int fm_getattr(const char *path, struct stat *stbuf)
759 {
760     int res = 0;
761     char *flac_path = NULL;
762     int error = 0;
763
764     pthread_once(&once_control, init_threads);
765     MAINLOG1((logfile, "%s: %s\n", __func__, path));
766     memset(stbuf, 0, sizeof(struct stat));
767     if (strcmp(path, "/") == 0)
768     {
769         stbuf->st_mode = S_IFDIR | 0755;
770         stbuf->st_nlink = 2;
771         return 0;
772     }
773
774     flac_path = convert_to_base_path(path);
775     if (!flac_path)
776         return -errno;
777
778     res = lstat(flac_path, stbuf);
779     if (res < 0)
780     {
781         free(flac_path);
782         return -errno;
783     }
784
785     if (S_ISDIR(stbuf->st_mode))
786     {
787         free(flac_path);
788         return 0;
789     }
790
791     if (S_ISREG(stbuf->st_mode) && is_flac(flac_path))
792     {
793         long long size;
794         error = encoded_size(flac_path, &size);
795         if (error)
796         {
797             /* Need to (re-)encode the file. We're only looking for
798              * the size right now, so don't mark the file for
799              * keeping. */
800             error = enqueue_encode_file(flac_path, ENCODE_DONT_KEEP);
801             if (error)
802             {
803                 MAINLOG0((logfile, "%s: encoding %s failed with error %d\n",
804                           __func__, flac_path, error));
805                 free(flac_path);
806                 return -EIO;
807             }
808
809             /* Encoding should have updated the database, so look again */
810             error = encoded_size(flac_path, &size);
811             if (error)
812                 return -error;
813         }
814         stbuf->st_mode = S_IFREG | 0444;
815         stbuf->st_nlink = 1;
816         stbuf->st_size = size;
817         stbuf->st_blocks = (size+511)/512;
818     }
819     
820     free(flac_path);
821     return 0;
822 }
823
824 static int fm_readlink(const char *path, char *buf, size_t size)
825 {
826     int res = 0;
827
828     pthread_once(&once_control, init_threads);
829     MAINLOG1((logfile, "%s: %s\n", __func__, path));
830     res = readlink(path, buf, size - 1);
831     if (res == -1)
832         return -errno;
833
834     buf[res] = '\0';
835     return 0;
836 }
837
838 static int fm_readdir(const char *path, void *buf, fuse_fill_dir_t filler,
839                       off_t offset, struct fuse_file_info *fi)
840 {
841     DIR *dp;
842     struct dirent *de;
843     char *flac_path = NULL;
844
845     pthread_once(&once_control, init_threads);
846
847     (void) offset;
848     (void) fi;
849
850     MAINLOG1((logfile, "%s: %s\n", __func__, path));
851
852     flac_path = convert_to_base_path(path);
853     if (!flac_path)
854         return -errno;
855
856     dp = opendir(flac_path);
857     if (dp == NULL)
858     {
859         free(flac_path);
860         return -errno;
861     }
862
863     while ((de = readdir(dp)) != NULL)
864     {
865         struct stat st;        
866         memset(&st, 0, sizeof(st));
867         st.st_ino = de->d_ino;
868         st.st_mode = de->d_type << 12;
869         
870         /* Do we have a file, and does it end in .flac? */
871         if (S_ISREG(st.st_mode) && is_flac(de->d_name))
872         {
873             /* We'll have to convert the name */
874             int ret = 0;
875             char *out = convert_from_flac_name(de->d_name);
876             if (!out)
877             {
878                 free(flac_path);
879                 return -ENOMEM;
880             }
881             ret = filler(buf, out, &st, 0);
882             free(out);
883             if (ret)
884                 break;
885         }
886         else
887         {
888             /* Something else, just pass the name through directly */
889             if (filler(buf, de->d_name, &st, 0))
890                 break;
891         }
892     }
893     closedir(dp);
894     return 0;
895 }
896
897 static int fm_open(const char *path, struct fuse_file_info *fi)
898 {
899     int fd;
900     char *flac_path = NULL;
901     int error = 0;
902
903     pthread_once(&once_control, init_threads);
904
905     MAINLOG1((logfile, "%s: %s\n", __func__, path));
906
907     if (fi->flags & (O_RDWR|O_WRONLY))
908         return -EROFS;
909
910     flac_path = convert_to_base_path(path);
911     if (!flac_path)
912         return -errno;
913
914     /* if flac, do something special, otherwise... */
915     if (is_flac(flac_path))
916     {
917         char *cache_file_name = cache_lookup_for_read(mo.basedir, flac_path);
918         char cache_full_path[PATH_MAX];
919         if (NULL == cache_file_name)
920         {
921             /* Need to encode the file. We're looking to open the file
922              * straight afterwards, so mark the file for keeping in
923              * the cache. */
924             error = enqueue_encode_file(flac_path, ENCODE_KEEP);
925             if (error)
926             {
927                 MAINLOG0((logfile, "%s: encoding %s failed with error %d\n",
928                           __func__, flac_path, error));
929                 free(flac_path);
930                 return -EIO;
931             }
932             cache_file_name = cache_lookup_for_read(mo.basedir, flac_path);
933             if (NULL == cache_file_name)
934             {
935                 free(flac_path);
936                 return -EIO; /* Should never get here! */
937             }
938         }
939         sprintf(cache_full_path, "%s/%s", mo.cachedir, cache_file_name);
940         fd = open(cache_full_path, fi->flags);
941         if (fd == -1)
942         {
943             cache_read_finished(mo.basedir, cache_file_name);
944             free(cache_file_name);
945             free(flac_path);
946             return -errno;
947         }
948         free(cache_file_name);
949     }
950     else
951     {
952         fd = open(flac_path, fi->flags);
953         if (fd == -1)
954         {
955             free(flac_path);
956             return -errno;
957         }
958     }
959     fi->fh = fd;
960     free(flac_path);
961     return 0;
962 }
963
964 static int fm_release(const char *path, struct fuse_file_info *fi)
965 {
966     char *flac_path = NULL;
967
968     pthread_once(&once_control, init_threads);
969     MAINLOG1((logfile, "%s: %s\n", __func__, path));
970
971     close(fi->fh);
972
973     flac_path = convert_to_base_path(path);
974     if (!flac_path)
975         return -errno;
976
977     /* if flac, do something special, otherwise... */
978     if (is_flac(flac_path))
979         (void)cache_read_finished(mo.basedir, flac_path);
980
981     free(flac_path);
982     return 0;
983 }
984
985 static int fm_read(const char *path, char *buf, size_t size, off_t offset,
986                    struct fuse_file_info *fi)
987 {
988     int res;
989
990     pthread_once(&once_control, init_threads);
991     MAINLOG1((logfile, "%s: %s\n", __func__, path));
992
993     (void) path;
994     res = pread(fi->fh, buf, size, offset);
995     if (res == -1)
996     {
997         MAINLOG0((logfile, "%s: failed, error %d\n", __func__, errno));
998         res = -errno;
999     }
1000     return res;
1001 }
1002
1003 static int fm_statfs(const char *path, struct statvfs *stbuf)
1004 {
1005     int res;
1006
1007     pthread_once(&once_control, init_threads);
1008     MAINLOG1((logfile, "%s: %s\n", __func__, path));
1009
1010     res = statvfs(mo.basedir, stbuf);
1011     if (res == -1)
1012         return -errno;
1013
1014     stbuf->f_bfree = 0;
1015     stbuf->f_bavail = 0;
1016     stbuf->f_ffree = 0;
1017     stbuf->f_favail = 0;
1018
1019     return 0;
1020 }
1021
1022 static struct fuse_operations fm_oper = {
1023     .getattr        = fm_getattr,
1024     .readlink       = fm_readlink,
1025     .readdir        = fm_readdir,
1026     .open           = fm_open,
1027     .release        = fm_release,
1028     .read           = fm_read,
1029     .statfs         = fm_statfs,
1030 };
1031
1032 void *bg_handler(void *arg)
1033 {
1034     int *tmp = arg;
1035     int threadnum = *tmp;
1036     char dbgname[64];
1037     
1038     sprintf(dbgname, "%s:%d (%ld)", __func__, threadnum, (unsigned long)pthread_self());
1039     MAINLOG1((logfile, "%s: startup\n", dbgname));
1040     
1041     while(1)
1042     {
1043         work_list_t *current = NULL;
1044         int error = 0;
1045
1046         /* wait to be given some work to do */
1047         pthread_mutex_lock(&global_work.lock);
1048         while (0 == global_work.num_todo) {
1049             pthread_cond_wait(&global_work.cv, &global_work.lock);
1050         }
1051         
1052         MAINLOG1((logfile, "%s: wakeup, %d in queue\n", dbgname, global_work.num_todo));
1053         if (MAIN_DEBUG > 0)
1054             dump_global_queue();
1055         /* Yay! We've got something. Work on the first item on the
1056          * list */
1057         current = global_work.work_list;
1058         while (current)
1059         {
1060             pthread_mutex_lock(&current->lock);
1061             if (current->state == ENCODE_WAITING)
1062             {
1063                 global_work.num_todo--;
1064                 current->refcount++;
1065                 current->state = ENCODE_IN_PROGRESS;
1066                 pthread_mutex_unlock(&current->lock);
1067                 break;
1068             }
1069             pthread_mutex_unlock(&current->lock);
1070             current = current->next;
1071         }
1072         pthread_mutex_unlock(&global_work.lock);
1073
1074         if (!current)
1075         {
1076             MAINLOG0((logfile, "%s: WTF? didn't find an entry to work on\n", dbgname));
1077             continue;
1078         }
1079
1080         /* Drop the list lock and work directly on the entry we picked
1081          * up. */
1082         pthread_mutex_lock(&current->lock);
1083         MAINLOG1((logfile, "%s: asked to encode %s\n", dbgname, current->flac_path));
1084         pthread_mutex_unlock(&current->lock);
1085         
1086         error = encode_file(threadnum, current->flac_path, current->keep);
1087
1088         /* Time passes ... */
1089
1090         /* Finished. Now run through the global_work list again to
1091          * remove the current entry */
1092         pthread_mutex_lock(&global_work.lock);
1093         if (current == global_work.work_list)
1094         {
1095             global_work.work_list = current->next;
1096         }
1097         else
1098         {
1099             work_list_t *remove = global_work.work_list;
1100             work_list_t *prev = remove;
1101             while (remove)
1102             {
1103                 if (remove == current)
1104                 {
1105                     prev->next = current->next;
1106                     remove = global_work.work_list;
1107                     break;
1108                 }
1109                 prev = remove;
1110                 remove = remove->next;
1111             }
1112             if (!remove)
1113                 MAINLOG0((logfile, "%s:, failed to find global_work list entry for %s!\n",
1114                           dbgname, current->flac_path));
1115         }
1116         pthread_mutex_unlock(&global_work.lock);
1117
1118         /* Now report what happened back to any/all who are waiting on
1119          * this entry */
1120         pthread_mutex_lock(&current->lock);
1121         MAINLOG1((logfile, "%s: signal %s is done\n", dbgname, current->flac_path));
1122         current->error = error;
1123         current->state = ENCODE_DONE;
1124         pthread_cond_broadcast(&current->cv);
1125         free_todo_entry_file(current);
1126         pthread_mutex_unlock(&current->lock);
1127     }
1128 }
1129
1130 /* Background thread to periodically clean up old size database entries */
1131 void *bg_db_cleanup(void *arg)
1132 {
1133     int *tmp = arg;
1134     int threadnum = *tmp;
1135     char dbgname[64];
1136     
1137     sprintf(dbgname, "%s:%d (%ld)", __func__, threadnum, (unsigned long)pthread_self());
1138     MAINLOG1((logfile, "%s: startup\n", dbgname));
1139     
1140     while(1)
1141     {
1142         db_cleanup_sizes(db_state, mo.basedir);
1143         sleep(3600); /* Wait an hour */
1144     }
1145 }
1146
1147 /* Deal with:
1148  *
1149  * ( )*CONFIG_VAR( )*=( )*value( )*
1150  * 
1151  * and ignore 
1152  *
1153  * ( )*#( )*
1154  */
1155 static int parse_config_line(int lineno, char *ptr)
1156 {
1157     char *endptr;
1158     int len = strlen(ptr);
1159     int i = 0;
1160
1161     endptr = &ptr[len-1];
1162     /* strip leading whitespace, and dump comments */
1163     while(1)
1164     {
1165         if (isspace(*ptr))
1166             ptr++;
1167         else if ('#' == *ptr)
1168             return 0; /* ignore the rest of the line */
1169         else
1170             break;
1171     }
1172
1173     /* strip trailing whitespace */
1174     while(1)
1175     {
1176         if (endptr <= ptr)
1177             return 0; /* blank line */
1178         if (*endptr && !isspace(*endptr))
1179             break;
1180         *endptr-- = 0;
1181     }
1182
1183     for (i = 0; config_opts[i].match != NULL; i++)
1184     {
1185         char **valueptr = NULL;
1186
1187         if (strncmp(ptr, config_opts[i].match, strlen(config_opts[i].match)))
1188             continue;
1189         ptr += strlen(config_opts[i].match);
1190         while(isspace(*ptr))
1191             ptr++;
1192         if (*ptr != '=')
1193             return EINVAL;
1194         ptr++;
1195         while(isspace(*ptr))
1196             ptr++;
1197         valueptr = (void *)&mo + config_opts[i].offset;
1198         *valueptr = strdup(ptr);
1199         return 0;
1200     }
1201     return EINVAL; /* Didn't match anything */
1202 }
1203
1204 static int parse_config_file(void)
1205 {
1206     FILE *config = NULL;
1207     char *ptr = NULL;
1208     ssize_t size = 0;
1209     size_t linelen = 0;
1210     int error = 0;
1211     int lineno = 1;
1212
1213     if (!mo.config_file)
1214         return 0;
1215     
1216     config = fopen(mo.config_file, "rb");
1217     if (NULL == config)
1218     {
1219         fprintf(stderr, "Failed to open config file %s, error %d\n",
1220                 mo.config_file, errno);
1221         exit(errno);
1222     }
1223     
1224     while (1)
1225     {
1226         errno = 0;
1227         size = getline(&ptr, &linelen, config);
1228         if (-1 == size)
1229         {
1230             error = errno;            
1231             break;
1232         }
1233         error = parse_config_line(lineno++, ptr);
1234         if (error)
1235             break;
1236     }
1237     free (ptr);
1238     fclose(config);
1239     return error;
1240 }
1241
1242 int main(int argc, char *argv[])
1243 {
1244     intptr_t i = 0;
1245     int error = 0;
1246     struct fuse_args args = FUSE_ARGS_INIT(argc, argv);
1247     umask(0);
1248
1249     logfile = stderr;
1250
1251     fuse_opt_parse(&args, &mo, fm_mount_opts, NULL);
1252
1253     error = parse_config_file();
1254     if (error)
1255     {
1256         printf("Error in config file %s: %d (%s)\n",
1257                mo.config_file, error, strerror(error));
1258         return error;
1259     }
1260
1261     if (mo.logfile)
1262     {
1263         FILE *newlog;
1264         fclose(stderr);
1265         newlog = fopen(mo.logfile, "wb");
1266         if (!newlog)
1267         {
1268             printf("Can't open specified logfile %s for writing, error %d\n",
1269                     mo.logfile, errno);
1270             return errno;
1271         }
1272         logfile = newlog;
1273         setvbuf(logfile, (char *) NULL, _IONBF, 0);
1274     }
1275
1276     /* Validate options */
1277     if (!mo.basedir)
1278     {
1279         MAINLOG0((logfile, "Need to specify a basedir\n"));
1280         return EINVAL;
1281     }
1282     MAINLOG1((logfile, "mo.basedir %s\n", mo.basedir));
1283
1284     if (!mo.cachedir)
1285     {
1286         MAINLOG0((logfile, "Need to specify a cachedir\n"));
1287         return EINVAL;
1288     }
1289
1290     /* Convert strings into useful integers */
1291     if (mo.cachesize)
1292     {
1293         error = parse_string_to_long_long(mo.cachesize, &cachesize_value);
1294         if (error)
1295         {
1296             MAINLOG0((logfile, "Unable to parse cachesize value: %s\n", mo.cachesize));
1297             return EINVAL;
1298         }
1299     }
1300     
1301     if (mo.num_threads)
1302     {
1303         error = parse_string_to_long(mo.num_threads, &num_encoder_threads_value);
1304         if (error)
1305         {
1306             MAINLOG0((logfile, "Unable to parse num_threads value: %s\n", mo.num_threads));
1307             return EINVAL;
1308         }
1309     }
1310
1311     if (cachesize_value < CACHE_MIN_SIZE)
1312     {
1313         MAINLOG0((logfile, "Cache size %lld is too small; minimum allowed is %lld\n",
1314                   cachesize_value, CACHE_MIN_SIZE));
1315         return EINVAL;
1316     }
1317
1318     if (!mo.db_file)
1319     {
1320         MAINLOG0((logfile, "Need to specify a db file\n"));
1321         return EINVAL;
1322     }
1323
1324     if (mo.format)
1325     {
1326         for (i = 0; i < OUTFMT_INVALID; i++)
1327         {
1328             if (!strcmp(mo.format, formats[i].extension))
1329             {
1330                 format_index = i;
1331                 break;
1332             }
1333             if (i == OUTFMT_INVALID)
1334             {
1335                 MAINLOG0((logfile, "%s not recognised as a valid output format\n", mo.format));
1336                 return EINVAL;
1337             }
1338         }
1339     }
1340     
1341     if (mo.quality)
1342         quality = mo.quality;
1343     else
1344         quality = strdup(formats[format_index].default_quality);
1345
1346     db_state = db_open(mo.db_file);
1347     if (!db_state)
1348     {
1349         MAINLOG0((logfile, "Failed to open database %s\n", mo.db_file));
1350         return 1;
1351     }
1352
1353     error = cache_init(db_state, mo.cachedir, cachesize_value);
1354     if (error)
1355     {
1356         MAINLOG0((logfile, "Failed to init cache %s, error %d\n", mo.cachedir, error));
1357         db_close(db_state);
1358         return 1;
1359     }
1360
1361     for (i = 0; i < NUM_LRU_DIRS; i++)
1362         dir_state.lru_dirs[i] = NULL;
1363
1364     error = fuse_main(args.argc, args.argv, &fm_oper, NULL);
1365     if (error)
1366         MAINLOG0((logfile, "fuse_main failed, error %d\n", error));
1367
1368     error = cache_shutdown();
1369     if (error)
1370     {
1371         MAINLOG0((logfile, "Failed to shutdown cache %s, error %d\n", mo.cachedir, error));
1372         db_close(db_state);
1373     }
1374
1375     db_close(db_state);
1376     return error;
1377 }