root/indytube/trunk/indytube/indytube.py

Revision 446, 19.9 kB (checked in by andy, 2 weeks ago)

add in svn:igore

  • Property svn:executable set to *
Line 
1#!/usr/bin/python2.4
2
3#standard library imports
4import ConfigParser
5import os
6import logging
7import time
8import traceback
9import sys
10import signal
11import xmlrpclib
12from urlparse import urlparse
13import urllib
14import subprocess
15
16#3rd party libraries
17#templating system
18from Cheetah.Template import Template
19#
20# Define the indytube processor class
21# Uses the queueing server API to get jobs to peform, and do downloading/torrenting/transcoding/ etc
22#
23#
24
25class IndyTubeProcessor(object):
26   """ The IndyTube processing class . Uses the Indytube Queue Server API to get jobs to perform. """
27   #holds statistics of files checked, and successfully transcoded, for each pass.
28   checked = 0
29   converted = 0
30   conf_file = ""
31   available = 0
32   downloaded = 0
33
34   def __init__(self):
35    """constructor for IndyTubeProcessor"""
36
37   def rereadConfig(self,signal,frame):
38    self.parse_config(self.conf_file)
39
40   def parse_config(self,conf_file):
41    """parse config from the filename argument passed in"""
42    config = ConfigParser.RawConfigParser()
43    config.read(conf_file)
44    self.conf_file = conf_file
45
46
47    self.MENCODER_LOCATION=config.get('mencoder','MENCODER_LOCATION')
48    self.MENCODER_OPTIONS=config.get('mencoder','MENCODER_OPTIONS')
49
50    self.FFMPEG2THEORA_COMMAND=config.get('ffmpeg2theora','FFMPEG2THEORA_COMMAND')
51    self.CORTADO_LOCATION=config.get('ffmpeg2theora','CORTADO_LOCATION')
52
53    self.FLVTOOL_LOCATION=config.get('flvtool2','FLVTOOL_LOCATION')
54
55    self.BE_HOW_NICE=config.get('encoder','BE_HOW_NICE') 
56    self.CONVERT_THESE=eval(config.get('encoder','CONVERT_THESE'))
57    self.DO_ENCODING=config.getboolean('encoder','DO_ENCODING')
58    self.NUMBER_OF_PARALLEL_ENCODERS=config.getint('encoder','NUMBER_OF_PARALLEL_ENCODERS')
59    self.ENCODER_LOCKFILE_BASE=config.get('paths','BASE_DIRECTORY') + config.get('encoder','ENCODER_LOCKFILE_BASE')
60    self.POLLTIME=int(config.get('encoder','POLLTIME'))
61
62    self.VIDEO_FILE_DIRECTORY=config.get('paths','BASE_DIRECTORY') +config.get('paths','VIDEO_FILE_DIRECTORY')
63    self.FLV_FILE_DIRECTORY=config.get('paths','BASE_DIRECTORY') + config.get('paths','FLV_FILE_DIRECTORY')
64    self.INCLUDE_FILE_DIRECTORY=config.get('paths','BASE_DIRECTORY') + config.get('paths','INCLUDE_FILE_DIRECTORY')
65    self.TORRENT_DIRECTORY=config.get('paths','BASE_DIRECTORY') + config.get('paths','TORRENT_DIRECTORY')
66
67    self.INCLUDE_FILE_SUFFIX=config.get('paths','INCLUDE_FILE_SUFFIX')
68    self.INCLUDE_TEMPLATE=config.get('paths','INCLUDE_TEMPLATE')
69
70
71    self.FLOWPLAYER_LOCATION=config.get('urls','FLOWPLAYER_LOCATION')
72    self.VIDEO_SERVER_URL=config.get('urls','VIDEO_SERVER_URL')
73    self.SPLASH_IMAGE_BASE=config.get('urls','SPLASH_IMAGE_BASE')
74    self.SPLASH_IMAGE_FILE=config.get('urls','SPLASH_IMAGE_FILE')
75
76    self.LOG_FILE=config.get('paths','BASE_DIRECTORY') + config.get('logging','LOG_FILE')
77    #whoa, eval , from a conf file input. nasty
78    self.LOG_LEVEL=eval(config.get('logging','LOG_LEVEL'))
79
80    self.DOWNLOAD_QUEUE=config.get('zopetube-server','DOWNLOAD_QUEUE')
81    self.BITTORRENT_QUEUE=config.get('zopetube-server','BITTORRENT_QUEUE')
82    self.TRANSCODING_QUEUE=config.get('zopetube-server','TRANSCODING_QUEUE')
83
84    #bittorrent
85    self.TRACKER_URL=config.get('bittorrent','TRACKER_URL')
86
87    self.ENCODER_LOCKFILE = ""
88
89    logging.basicConfig(level=self.LOG_LEVEL, format='%(asctime)s %(levelname)s %(message)s', filename=self.LOG_FILE, filemode='a') 
90
91    logging.info("finished parse_config function at %s, using file %s " % (time.strftime("%D %H:%M:%S"), self.conf_file))
92
93   def check_lock_file(self):
94    """ inits the logging, and checks for the parallel lock files. If there are no more free spots as an encoder, returns False, else returns True"""
95    self.ENCODER_LOCKFILE=""  #this is dynamic, generated off the base
96
97    #check we arent already running (up to the number of parallel encoders) and if we are , exit
98    for n in range(0,self.NUMBER_OF_PARALLEL_ENCODERS):
99        #set the lockfile name, for later
100        self.ENCODER_LOCKFILE="%s.%s" % (self.ENCODER_LOCKFILE_BASE,n)
101        if os.path.exists(self.ENCODER_LOCKFILE):   
102            if n==(self.NUMBER_OF_PARALLEL_ENCODERS-1):
103                logging.info("Max encoders reached(%s), exiting." % self.NUMBER_OF_PARALLEL_ENCODERS)
104                #we should exit the program here
105                #sys.exit("Max encoders reached(%s), exiting." % self.NUMBER_OF_PARALLEL_ENCODERS)
106                return False
107        else:
108            #we have a free spot , as encoder 'n', lets make the lock file and break out
109            os.mknod(self.ENCODER_LOCKFILE)
110            break
111    return True
112
113   def do_torrent_loop(self):
114    """do one torrent loop"""
115    logging.info("Starting indytube torrent component... in %s " % self.TORRENT_DIRECTORY)
116    self.available = 0
117    self.downloaded = 0
118
119    #download them from the server.
120    try:
121        self.xmlrpc_ref = xmlrpclib.Server(self.BITTORRENT_QUEUE)
122        self.jobs = self.xmlrpc_ref.listofWaitingJobItems()
123    except Exception,e:
124        logging.info("Ending indytube downloading component because of exception!\n Exception %s" % e)
125        return
126    #processing.... 
127    self.available = len(self.jobs)
128    logging.info("We have %s files to make torrent files for, and then seed." % self.available)
129    for v in self.jobs: 
130            ###XXX put this block of work per job into a callback, and fire async.
131            ok,file = self.torrent_and_seed_file(v)
132            if ok:
133                self.downloaded = self.downloaded + 1
134                self.xmlrpc_ref.markAsFinished(v['id'])
135            else:
136                self.xmlrpc_ref.markAsFailed(v['id'])
137
138
139    logging.info("Ending indytube torrent component... We got a list of %s eligble files, torrent-ed %s files " % (self.available, self.downloaded))
140
141    ## end : do_torrent_loop
142
143   def torrent_and_seed_file(self,jobitem_dict):
144    rel_fp = jobitem_dict['video_uri']
145    incoming_filename = self.VIDEO_FILE_DIRECTORY + rel_fp
146    target_torrent = self.TORRENT_DIRECTORY + rel_fp + ".torrent"
147
148    logging.info("torrent and seed: media file from location %s into location %s " % (incoming_filename,target_torrent))
149    #lets make sure all the directories are created
150    try:
151        os.makedirs(os.path.dirname(target_torrent))
152    except:
153        #the dirs may already exist
154        pass
155    output = ""
156    error = ""
157    try:
158        video_file = incoming_filename
159        tracker_url = self.TRACKER_URL
160        #maketorrent-console API
161        #maketorrent-console --target .torrent-file tracker_url video_file
162        make_torrent_args = "maketorrent-console --target %s %s %s " % (target_torrent, tracker_url, video_file)
163        logging.debug("maketorrent-console input: %s ; cwd : %s" % (make_torrent_args,os.getcwd()))
164        output,error = subprocess.Popen([make_torrent_args],shell=True,stdout=subprocess.PIPE, cwd=os.getcwd()).communicate()
165
166        logging.debug("maketorrent-console output and error : %s %s" % (output,error))
167
168        #bittorrent-console API
169        #bittorrent-console --save_as video_file torrent_file
170        bittorrent_args = "bittorrent-console --save_as %s %s" % (video_file,target_torrent) 
171        logging.debug("bittorrent-console input: %s ; cwd : %s" % (bittorrent_args,os.getcwd()))
172        subprocess.Popen([bittorrent_args],shell=True,cwd=os.getcwd())
173
174        logging.debug("bittorrent-console output and error : %s %s" % (output,error))
175
176
177    except Exception,e:
178        logging.error(e)
179        logging.debug("maketorrent-console output and error : %s %s" % (output,error))
180        return [False,None]
181    #XXX make md5sum and compare it
182    return [True,rel_fp]
183
184
185
186   def do_downloading_loop(self):
187    """do one downloading loop"""
188    logging.info("Starting indytube downloading component... in %s " % self.VIDEO_FILE_DIRECTORY)
189    self.available = 0
190    self.downloaded = 0
191
192    #download them from the server.
193    try:
194        self.xmlrpc_ref = xmlrpclib.Server(self.DOWNLOAD_QUEUE)
195        self.jobs = self.xmlrpc_ref.listofWaitingJobItems()
196    except:
197        logging.info("Ending indytube downloading component because of exception!")
198        return
199    #processing.... 
200    self.available = len(self.jobs)
201    logging.info("We have %s files to download." % self.available)
202    for v in self.jobs: 
203            ###XXX put this block of work per job into a callback, and fire async.
204            ok,file = self.download_file(v)
205            if ok:
206                self.downloaded = self.downloaded + 1
207                self.xmlrpc_ref.markAsFinished(v['id'])
208                #add in new job item on transcoding queue
209                self.xmlrpc_ref_t = xmlrpclib.Server(self.TRANSCODING_QUEUE)
210                self.xmlrpc_ref_t.addJobItem(file,'')
211            else:
212                self.xmlrpc_ref.markAsFailed(v['id'])
213
214
215    logging.info("Ending indytube downloading component... We got a list of %s eligble files, downloaded %s files " % (self.available, self.downloaded))
216
217    ## end : do_downloading_loop
218
219   def download_file(self,jobitem_dict):
220    download_url = jobitem_dict['video_uri']
221    url_parts = urlparse(download_url)   
222    rel_fp = url_parts[1]+url_parts[2]
223    incoming_filename = self.VIDEO_FILE_DIRECTORY + rel_fp
224    logging.debug("url parts are %s %s %s %s %s %s" % (url_parts[0],url_parts[1],url_parts[2],url_parts[3],url_parts[4],url_parts[5]))
225    logging.info("downloading media file from url %s, into location %s " % (download_url,incoming_filename))
226    #lets make sure all the directories are created
227    try:
228        os.makedirs(os.path.dirname(incoming_filename))
229    except:
230        #the dirs may already exist
231        pass
232    try:
233        filename,headers = urllib.urlretrieve(download_url, incoming_filename)
234    except:
235        logging.info("downloading media file from url %s, into location %s in error! " % (download_url,incoming_filename))
236        return [False,None]
237    #XXX make md5sum and compare it
238    return [True,rel_fp]
239
240
241   def do_transcoding_loop(self):
242    """do one transcoding loop"""
243    logging.info("Starting indytube transcoding loop ... ")
244    self.checked = 0
245    self.converted = 0
246
247    #download the batch of job items from the server.
248    try:
249        self.xmlrpc_ref = xmlrpclib.Server(self.TRANSCODING_QUEUE)
250        self.jobs = self.xmlrpc_ref.listofWaitingJobItems()
251    except:
252        logging.info("Ending indytube transcoding component because of exception")
253        return
254
255    #processing.... 
256    logging.info("We have %s files to transcode." % len(self.jobs))
257    for v in self.jobs: 
258            ###XXX put this block of work per job, into a callback, and fire async.
259            #assumes video_uri is the relative filepath from the downloading server.
260            ok, file = self.attempt_transcode_file(v)
261            if ok:
262                self.converted = self.converted + 1
263                self.xmlrpc_ref.markAsFinished(v['id'])
264                #lets mark it as ready for torrent-ing and seeding
265                self.xmlrpc_ref_bt = xmlrpclib.Server(self.BITTORRENT_QUEUE)
266                self.xmlrpc_ref_bt.addJobItem(file,'')
267
268            else:
269                self.xmlrpc_ref.markAsFailed(v['id'])
270
271    logging.info("Ending indytube transcoding loop ... We checked %s eligble files, converted %s files " % (self.checked, self.converted))
272
273    ## end : do_transcoding_loop
274
275   def attempt_transcode_file(self, jobitem_dict):
276        """Start the transcoding attempt with file 'f'. Converts to FLV format, and OGG/Theora. Produces HTML snippets for including the players appropriate for the produced video format. At the moment, this is FlowPlayer for FLV, and Cortado java applet for OGG/Theora"""
277        #whether or not the transcoding worked for the given file 'f'
278        worked = False
279
280        f = jobitem_dict['video_uri']
281
282        (stem,extension)=os.path.splitext(f)
283        if extension.lower() in self.CONVERT_THESE:  #we should convert the file
284
285            videofile = self.VIDEO_FILE_DIRECTORY + f
286            lockfile = videofile+".wetube_lock"  # we are encoding already
287            skipfile = videofile+".wetube_skip"  # we tried and failed, don't bother again
288            flvfile  = self.FLV_FILE_DIRECTORY+stem+".flv"
289            theorafile = self.FLV_FILE_DIRECTORY+stem+".ogg"
290            includefile  = self.INCLUDE_FILE_DIRECTORY+stem+self.INCLUDE_FILE_SUFFIX
291            logging.info("check for %s, %s, %s " % (lockfile, skipfile, flvfile))
292
293            #check that another encoder isnt already processing this file (lockfile) or that we havent tried and failed before (skipfile)
294            if os.path.exists(skipfile):
295                #lets let the transcoding server know we are  in error
296                self.xmlrpc_ref.markAsFailed(jobitem_dict['id'])
297
298            if not(os.path.exists(lockfile) or os.path.exists(skipfile)):
299                #OK, valid video file ready to try to transcode
300                logging.info("Checking file %s, using extension %s " % ( videofile, extension))
301                self.checked = self.checked + 1
302                try:
303                        # touch the lock file
304                        #we should catch the exception here, and return FALSE!
305                        try:
306                            os.mknod(lockfile)
307                        except:
308                            logging.info("lock file creation failed! ABORTING. %s " % lockfile)
309                            return False
310
311                        # if the flv file (autogenerated) or html snippet is not there, then reencode!
312                        if not(os.path.exists(flvfile)) or not(os.path.exists(includefile)):
313                                logging.info('OK to try encoding: '+videofile)
314                                #lets make sure all the directories are created
315                                try:
316                                    os.makedirs(os.path.dirname(flvfile))
317                                except:
318                                    #the dirs may already exist
319                                    pass
320                       
321                                #pipe_to_null = '> /dev/null 2>&1'
322                                if self.DO_ENCODING: #maybe we just want to regenerate the include file!
323                                    #mencoder flv conversion
324                                    start_time=time.time()
325                                    encoder_command = self.MENCODER_LOCATION + " -quiet " + videofile + " -o " + flvfile + " " + self.MENCODER_OPTIONS
326                                    encoder_command_string = 'nice -n '+self.BE_HOW_NICE+' '+encoder_command
327                                    output,error = subprocess.Popen([encoder_command_string],shell=True,stdout=subprocess.PIPE,cwd=os.getcwd()).communicate()
328                                    logging.debug("FLV transcoder output: %s ; error: %s" %(output,error))
329                                    flvtool_command = self.FLVTOOL_LOCATION+" -U stdin "+flvfile
330                                    flvtool_command_string = "cat "+ flvfile +" | "+ 'nice -n '+ self.BE_HOW_NICE+' '+flvtool_command
331                                    output,error = subprocess.Popen([flvtool_command_string],shell=True,stdout=subprocess.PIPE,cwd=os.getcwd()).communicate()
332                                    logging.debug("FLV tool output: %s ; error: %s" %(output,error))
333
334                                    finish_time=time.time()
335                                    logging.info("Encoded %s in %.2f seconds, using cmd -- %s" % (videofile,finish_time-start_time,encoder_command_string))
336                                    #ffmpeg2theora , theora/ogg conversion
337                                    start_time=time.time() 
338                                    theora_cmd =  self.FFMPEG2THEORA_COMMAND + ' ' + videofile + " -o " + theorafile
339                                    theora_cmd_string='nice -n '+ self.BE_HOW_NICE+' '+ theora_cmd
340                                    output,error = subprocess.Popen([theora_cmd_string],shell=True,stdout=subprocess.PIPE,cwd=os.getcwd()).communicate()
341                                    logging.debug("Theora output: %s ; error: %s" %(output,error))
342
343                                    finish_time=time.time()
344                                    logging.info("Encoded %s in %.2f seconds, using cmd -- %s" % (videofile,finish_time-start_time,theora_cmd_string))
345
346                                    if os.path.exists(flvfile) and os.path.getsize(flvfile)>0:
347                                        # OK ! It Worked.
348                                        #XXX expand to OGG format checking
349                                        worked = True
350
351                                else:
352                                    logging.debug("skipped encoding, will just do html template generation, if flv exists as non-zero size")
353                   
354                        else:
355                            logging.debug("flv file and html file already exists, not doing transcoding")
356
357                        #Now, make the flash HTML snippet if the flv got created correctly.
358                        #XXX todo, separate out the flv and ogg theora (java applet) html snippet
359                        if os.path.exists(flvfile) and os.path.getsize(flvfile)>0:
360                                #lets make sure all the directories are created
361                                try:
362                                    os.makedirs(os.path.dirname(includefile))
363                                except:
364                                    #the dirs may already exist
365                                    pass
366
367                                logging.info("Making html template - original size of %s: %.1fMB, Encoded size: %.1fMB" % (videofile,os.path.getsize(videofile)/1000000.0,os.path.getsize(flvfile)/1000000.0))
368                                data_map={
369                                    'flowplayer_location':self.FLOWPLAYER_LOCATION, 
370                                    'videofile':flvfile,
371                                    'videobaseurl':self.VIDEO_SERVER_URL, 
372                                    'splashbaseurl':self.SPLASH_IMAGE_BASE, 
373                                    'splashfile':self.SPLASH_IMAGE_FILE, 
374                                    'cortado_location':self.CORTADO_LOCATION, 
375                                    'oggfile':theorafile,
376                                    'mirid':stem
377                                }
378                                template = Template(file=self.INCLUDE_TEMPLATE, searchList=[data_map]) 
379                                file_template=open(includefile, 'w')
380                                file_template.write(template.respond())
381                                file_template.close()
382                                #to get here, everything must be working.
383                                worked = True
384
385                        else:
386                                logging.info("FLV file size is zero - assuming encoding failed! Permanently skipping file!")
387                                os.mknod(skipfile)
388                       
389                        #finished transcoding block , remove lock file
390                        os.remove(lockfile)
391
392                except:
393                    logging.info("Error while processing %s: %s" % (videofile,traceback.format_exc()))
394                    os.remove(lockfile)
395
396            else:
397                logging.info(' lock file or skip file present for file %s ' % videofile)
398
399        #return a status value
400        return [ worked, f ]
401
402# Copyright John Duda, 2006
403# Copyright Andy Nicholson, 2007,2008
404
405# This program is free software; you can redistribute it and/or modify
406# it under the terms of the GNU General Public License as published
407# by the Free Software Foundation; either version 2 of the License,
408#  or (at your option) any later version.
409
410# This program is distributed in the hope that it will be useful,
411# but WITHOUT ANY WARRANTY; without even the implied warranty of
412# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
413# GNU General Public License for more details.
414
415# You should have received a copy of the GNU General Public License
416# along with this program; if not, write to the Free Software
417# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
418                     
Note: See TracBrowser for help on using the browser.