Newer
Older
A Recorder fetches hits from the Queue and inserts them into Piwik using
the API.
"""
recorders = []
def __init__(self):
self.queue = Queue.Queue(maxsize=2)
benakamoorthi
a validé
# if bulk tracking disabled, make sure we can store hits outside of the Queue
if not config.options.use_bulk_tracking:
self.unrecorded_hits = []
@classmethod
def launch(cls, recorder_count):
"""
Launch a bunch of Recorder objects in a separate thread.
"""
for i in xrange(recorder_count):
recorder = Recorder()
benakamoorthi
a validé
run = recorder._run_bulk if config.options.use_bulk_tracking else recorder._run_single
t = threading.Thread(target=run)
t.daemon = True
t.start()
logging.debug('Launched recorder')
Add a set of hits to the recorders queue.
# Organize hits so that one client IP will always use the same queue.
# We have to do this so visits from the same IP will be added in the right order.
for hit in all_hits:
hits_by_client[abs(hash(hit.ip)) % len(cls.recorders)].append(hit)
recorder.queue.put(hits_by_client[i])
"""
Wait until all recorders have an empty queue.
"""
benakamoorthi
a validé
def _run_bulk(self):
hits = self.queue.get()
if len(hits) > 0:
try:
self._record_hits(hits)
except Piwik.Error, e:
fatal_error(e, hits[0].filename, hits[0].lineno) # approximate location of error
benakamoorthi
a validé
def _run_single(self):
while True:
if config.options.force_one_action_interval != False:
time.sleep(config.options.force_one_action_interval)
benakamoorthi
a validé
if len(self.unrecorded_hits) > 0:
hit = self.unrecorded_hits.pop(0)
benakamoorthi
a validé
try:
self._record_hits([hit])
except Piwik.Error, e:
fatal_error(e, hit.filename, hit.lineno)
else:
self.unrecorded_hits = self.queue.get()
self.queue.task_done()
def _wait_empty(self):
"""
Wait until the queue is empty.
"""
while True:
if self.queue.empty():
# We still have to wait for the last queue item being processed
# (queue.empty() returns True before queue.task_done() is
# called).
self.queue.join()
return
time.sleep(1)
def date_to_piwik(self, date):
date, time = date.isoformat(sep=' ').split()
return '%s %s' % (date, time.replace('-', ':'))
def _get_hit_args(self, hit):
Returns the args used in tracking a hit, without the token_auth.
"""
site_id, main_url = resolver.resolve(hit)
if site_id is None:
# This hit doesn't match any known Piwik site.
if config.options.replay_tracking:
stats.piwik_sites_ignored.add('unrecognized site ID %s' % hit.args.get('idsite'))
else:
stats.piwik_sites_ignored.add(hit.host)
stats.count_lines_no_site.increment()
return
stats.dates_recorded.add(hit.date.date())
Cyril Bay
a validé
path = hit.path
if hit.query_string and not config.options.strip_query_string:
path += config.options.query_string_delimiter + hit.query_string
args = {
'rec': '1',
'apiv': '1',
'url': (main_url + path[:1024]).encode('utf8'),
'urlref': hit.referrer[:1024].encode('utf8'),
'cip': hit.ip,
'cdt': self.date_to_piwik(hit.date),
'idsite': site_id,
'dp': '0' if config.options.reverse_dns else '1',
'ua': hit.user_agent.encode('utf8'),
if config.options.replay_tracking:
# prevent request to be force recorded when option replay-tracking
args['rec'] = '0'
args.update(hit.args)
if hit.is_download:
args['download'] = args['url']
if hit.is_robot:
args['_cvar'] = '{"1":["Bot","%s"]}' % hit.user_agent
elif config.options.enable_bots:
args['_cvar'] = '{"1":["Not-Bot","%s"]}' % hit.user_agent
if hit.is_error or hit.is_redirect:
args['action_name'] = '%s/URL = %s%s' % (
hit.status,
("/From = %s" % urllib.quote(args['urlref'], '') if args['urlref'] != '' else '')
return args
def _record_hits(self, hits):
"""
Inserts several hits into Piwik.
"""
data = {
'token_auth': config.options.piwik_token_auth,
'requests': [self._get_hit_args(hit) for hit in hits]
}
if not config.options.dry_run:
piwik.call(
'/piwik.php', args={},
expected_content=PIWIK_EXPECTED_IMAGE,
headers={'Content-type': 'application/json'},
data=data,
on_failure=self._on_tracking_failure
stats.count_lines_recorded.advance(len(hits))
def _on_tracking_failure(self, response, data):
"""
Removes the successfully tracked hits from the request payload so
they are not logged twice.
"""
try:
response = json.loads(response)
except:
# the response should be in JSON, but in case it can't be parsed just try another attempt
logging.debug("cannot parse tracker response, should be valid JSON")
return response
# remove the successfully tracked hits from payload
succeeded = response['succeeded']
data['requests'] = data['requests'][succeeded:]
return response['error']
@staticmethod
def invalidate_reports():
if config.options.dry_run or not stats.dates_recorded:
return
if config.options.invalidate_dates is not None:
dates = [date for date in config.options.invalidate_dates.split(',') if date]
else:
dates = [date.strftime('%Y-%m-%d') for date in stats.dates_recorded]
if dates:
print 'Purging Piwik archives for dates: ' + ' '.join(dates)
result = piwik.call_api(
'CoreAdminHome.invalidateArchivedReports',
dates=','.join(dates),
idSites=','.join(str(site_id) for site_id in stats.piwik_sites),
)
print('To re-process these reports with your new update data, execute the '
'piwik/misc/cron/archive.php script, or see: http://piwik.org/setup-auto-archiving/ '
'for more info.')
class Hit(object):
"""
It's a simple container.
"""
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
setattr(self, key, value)
super(Hit, self).__init__()
class Parser(object):
"""
The Parser parses the lines in a specified file and inserts them into
a Queue.
"""
def __init__(self):
self.check_methods = [method for name, method
in inspect.getmembers(self, predicate=inspect.ismethod)
if name.startswith('check_')]
## All check_* methods are called for each hit and must return True if the
## hit can be imported, False otherwise.
def check_hostname(self, hit):
# Check against config.hostnames.
if not hasattr(hit, 'host') or not config.options.hostnames:
return True
# Accept the hostname only if it matches one pattern in the list.
result = any(
fnmatch.fnmatch(hit.host, pattern)
for pattern in config.options.hostnames
)
if not result:
stats.count_lines_hostname_skipped.increment()
return result
def check_static(self, hit):
extension = hit.path.rsplit('.')[-1].lower()
if extension in STATIC_EXTENSIONS:
if config.options.enable_static:
hit.is_download = True
return True
else:
stats.count_lines_static.increment()
return False
def check_download(self, hit):
extension = hit.path.rsplit('.')[-1].lower()
if extension in DOWNLOAD_EXTENSIONS:
stats.count_lines_downloads.increment()
hit.is_download = True
def check_user_agent(self, hit):
for s in itertools.chain(EXCLUDED_USER_AGENTS, config.options.excluded_useragents):
if s in user_agent:
if config.options.enable_bots:
hit.is_robot = True
return True
else:
stats.count_lines_skipped_user_agent.increment()
return False
def check_http_error(self, hit):
if config.options.enable_http_errors:
hit.is_error = True
return True
else:
stats.count_lines_skipped_http_errors.increment()
return False
return True
def check_http_redirect(self, hit):
if config.options.enable_http_redirects:
hit.is_redirect = True
return True
else:
stats.count_lines_skipped_http_redirects.increment()
return False
return True
for excluded_path in config.options.excluded_paths:
if fnmatch.fnmatch(hit.path, excluded_path):
return False
return True
Cyril Bay
a validé
@staticmethod
def detect_format(file):
Cyril Bay
a validé
"""
Return the format matching this file, or None if none was found.
Cyril Bay
a validé
"""
logging.debug('Detecting the log format')
for name, candidate_format in FORMATS.iteritems():
format = candidate_format.check_format(file)
if format:
Cyril Bay
a validé
logging.debug('Format %s matches', name)
Cyril Bay
a validé
else:
logging.debug('Format %s does not match', name)
"""
Parse the specified filename and insert hits in the queue.
"""
def invalid_line(line, reason):
stats.count_lines_invalid.increment()
Cyril Bay
a validé
if config.options.debug >= 2:
logging.debug('Invalid line detected (%s): %s' % (reason, line))
if filename == '-':
filename = '(stdin)'
file = sys.stdin
else:
if not os.path.exists(filename):
print >> sys.stderr, 'File %s does not exist' % filename
return
else:
if filename.endswith('.bz2'):
open_func = bz2.BZ2File
elif filename.endswith('.gz'):
open_func = gzip.open
else:
open_func = open
file = open_func(filename, 'r')
if config.options.show_progress:
print 'Parsing log %s...' % filename
if config.format:
# The format was explicitely specified.
format = config.format
else:
# If the file is empty, don't bother.
data = file.read(100)
if len(data.strip()) == 0:
return
file.seek(0)
format = self.detect_format(file)
if format is None:
return fatal_error(
'Cannot guess the logs format. Please give one using '
'either the --log-format-name or --log-format-regex option'
)
# Make sure the format is compatible with the resolver.
resolver.check_format(format)
hits = []
for lineno, line in enumerate(file):
try:
line = line.decode(config.options.encoding)
except UnicodeDecodeError:
invalid_line(line, 'invalid encoding')
continue
stats.count_lines_parsed.increment()
if stats.count_lines_parsed.value <= config.options.skip:
continue
match = format.regex.match(line)
invalid_line(line, 'line did not match')
continue
hit = Hit(
filename=filename,
lineno=lineno,
status=match.group('status'),
full_path=match.group('path'),
is_error=False,
is_redirect=False,
args={},
try:
hit.query_string = match.group('query_string')
hit.path = hit.full_path
except IndexError:
hit.path, _, hit.query_string = hit.full_path.partition(config.options.query_string_delimiter)
try:
hit.referrer = match.group('referrer')
except IndexError:
hit.referrer = ''
if hit.referrer == '-':
hit.referrer = ''
try:
hit.user_agent = match.group('user_agent')
except IndexError:
hit.user_agent = ''
hit.ip = match.group('ip')
try:
hit.length = int(match.group('length'))
except (ValueError, IndexError):
# Some lines or formats don't have a length (e.g. 304 redirects, IIS logs)
if config.options.log_hostname:
hit.host = config.options.log_hostname
else:
try:
hit.host = match.group('host').lower().strip('.')
except IndexError:
# Some formats have no host.
pass
# Check if the hit must be excluded.
if not all((method(hit) for method in self.check_methods)):
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
continue
# Parse date.
# We parse it after calling check_methods as it's quite CPU hungry, and
# we want to avoid that cost for excluded hits.
date_string = match.group('date')
try:
hit.date = datetime.datetime.strptime(date_string, format.date_format)
except ValueError:
invalid_line(line, 'invalid date')
continue
# Parse timezone and substract its value from the date
try:
timezone = float(match.group('timezone'))
except IndexError:
timezone = 0
except ValueError:
invalid_line(line, 'invalid timezone')
continue
if timezone:
hit.date -= datetime.timedelta(hours=timezone/100)
if config.options.replay_tracking:
# we need a query string and we only consider requests with piwik.php
if not hit.query_string or not hit.path.lower().endswith('piwik.php'):
query_arguments = urlparse.parse_qs(hit.query_string)
if not "idsite" in query_arguments:
invalid_line(line, 'missing idsite')
continue
try:
hit.args.update((k, v.pop().encode('raw_unicode_escape').decode(config.options.encoding)) for k, v in query_arguments.iteritems())
except UnicodeDecodeError:
invalid_line(line, 'invalid encoding')
continue
# Check if the hit must be excluded.
if all((method(hit) for method in self.check_methods)):
hits.append(hit)
benakamoorthi
a validé
if len(hits) >= config.options.recorder_max_payload_size * len(Recorder.recorders):
Recorder.add_hits(hits)
hits = []
# add last chunk of hits
if len(hits) > 0:
Recorder.add_hits(hits)
def main():
"""
Start the importing process.
"""
benakamoorthi
a validé
stats.set_time_start()
if config.options.show_progress:
stats.start_monitor()
recorders = Recorder.launch(config.options.recorders)
try:
for filename in config.filenames:
parser.parse(filename)
Recorder.wait_empty()
except KeyboardInterrupt:
pass
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
stats.set_time_stop()
if config.options.show_progress:
stats.stop_monitor()
try:
Recorder.invalidate_reports()
except Piwik.Error, e:
pass
stats.print_summary()
def fatal_error(error, filename=None, lineno=None):
print >> sys.stderr, 'Fatal error: %s' % error
if filename and lineno is not None:
print >> sys.stderr, (
'You can restart the import of "%s" from the point it failed by '
'specifying --skip=%d on the command line.\n' % (filename, lineno)
)
os._exit(1)
if __name__ == '__main__':
try:
piwik = Piwik()
config = Configuration()
stats = Statistics()
resolver = config.get_resolver()
parser = Parser()
main()
except KeyboardInterrupt:
pass