Newer
Older
for hit in all_hits:
hits_by_client[abs(hash(hit.ip)) % len(cls.recorders)].append(hit)
recorder.queue.put(hits_by_client[i])
"""
Wait until all recorders have an empty queue.
"""
benakamoorthi
a validé
def _run_bulk(self):
hits = self.queue.get()
if len(hits) > 0:
try:
self._record_hits(hits)
except Piwik.Error, e:
fatal_error(e, hits[0].filename, hits[0].lineno) # approximate location of error
benakamoorthi
a validé
def _run_single(self):
while True:
if config.options.force_one_action_interval != False:
time.sleep(config.options.force_one_action_interval)
benakamoorthi
a validé
if len(self.unrecorded_hits) > 0:
hit = self.unrecorded_hits.pop(0)
benakamoorthi
a validé
try:
self._record_hits([hit])
except Piwik.Error, e:
fatal_error(e, hit.filename, hit.lineno)
else:
self.unrecorded_hits = self.queue.get()
self.queue.task_done()
def _wait_empty(self):
"""
Wait until the queue is empty.
"""
while True:
if self.queue.empty():
# We still have to wait for the last queue item being processed
# (queue.empty() returns True before queue.task_done() is
# called).
self.queue.join()
return
time.sleep(1)
def date_to_piwik(self, date):
date, time = date.isoformat(sep=' ').split()
return '%s %s' % (date, time.replace('-', ':'))
def _get_hit_args(self, hit):
Returns the args used in tracking a hit, without the token_auth.
"""
site_id, main_url = resolver.resolve(hit)
if site_id is None:
# This hit doesn't match any known Piwik site.
stats.piwik_sites_ignored.add(hit.host)
stats.count_lines_no_site.increment()
return
stats.dates_recorded.add(hit.date.date())
Cyril Bay
a validé
path = hit.path
if hit.query_string and not config.options.strip_query_string:
path += config.options.query_string_delimiter + hit.query_string
args = {
'rec': '1',
'apiv': '1',
'url': (main_url + path[:1024]).encode('utf8'),
'urlref': hit.referrer[:1024].encode('utf8'),
'cip': hit.ip,
'cdt': self.date_to_piwik(hit.date),
'idsite': site_id,
'dp': '0' if config.options.reverse_dns else '1',
'ua': hit.user_agent.encode('utf8'),
args.update(hit.args)
if hit.is_download:
args['download'] = args['url']
if hit.is_robot:
args['_cvar'] = '{"1":["Bot","%s"]}' % hit.user_agent
elif config.options.enable_bots:
args['_cvar'] = '{"1":["Not-Bot","%s"]}' % hit.user_agent
if hit.is_error or hit.is_redirect:
args['action_name'] = '%s/URL = %s%s' % (
hit.status,
("/From = %s" % urllib.quote(args['urlref'], '') if args['urlref'] != '' else '')
return args
def _record_hits(self, hits):
"""
Inserts several hits into Piwik.
"""
data = {
'token_auth': config.options.piwik_token_auth,
'requests': [self._get_hit_args(hit) for hit in hits]
}
if not config.options.dry_run:
piwik.call(
'/piwik.php', args={},
expected_content=PIWIK_EXPECTED_IMAGE,
headers={'Content-type': 'application/json'},
data=data,
on_failure=self._on_tracking_failure
stats.count_lines_recorded.advance(len(hits))
def _on_tracking_failure(self, response, data):
"""
Removes the successfully tracked hits from the request payload so
they are not logged twice.
"""
try:
response = json.loads(response)
except:
# the response should be in JSON, but in case it can't be parsed just try another attempt
logging.debug("cannot parse tracker response, should be valid JSON")
return response
# remove the successfully tracked hits from payload
succeeded = response['succeeded']
data['requests'] = data['requests'][succeeded:]
return response['error']
@staticmethod
def invalidate_reports():
if config.options.dry_run or not stats.dates_recorded:
return
if config.options.invalidate_dates is not None:
dates = [date for date in config.options.invalidate_dates.split(',') if date]
else:
dates = [date.strftime('%Y-%m-%d') for date in stats.dates_recorded]
if dates:
print 'Purging Piwik archives for dates: ' + ' '.join(dates)
result = piwik.call_api(
'CoreAdminHome.invalidateArchivedReports',
dates=','.join(dates),
idSites=','.join(str(site_id) for site_id in stats.piwik_sites),
)
print('To re-process these reports with your new update data, execute the '
'piwik/misc/cron/archive.php script, or see: http://piwik.org/setup-auto-archiving/ '
'for more info.')
class Hit(object):
"""
It's a simple container.
"""
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
setattr(self, key, value)
super(Hit, self).__init__()
class Parser(object):
"""
The Parser parses the lines in a specified file and inserts them into
a Queue.
"""
## All check_* methods are called for each hit and must return True if the
## hit can be imported, False otherwise.
@staticmethod
def check_hostname(hit):
# Check against config.hostnames.
if not hasattr(hit, 'host') or not config.options.hostnames:
return True
# Accept the hostname only if it matches one pattern in the list.
result = any(
fnmatch.fnmatch(hit.host, pattern)
for pattern in config.options.hostnames
)
if not result:
stats.count_lines_hostname_skipped.increment()
return result
@staticmethod
def check_static(hit):
extension = hit.path.rsplit('.')[-1].lower()
if extension in STATIC_EXTENSIONS:
if config.options.enable_static:
hit.is_download = True
return True
else:
stats.count_lines_static.increment()
return False
@staticmethod
def check_download(hit):
extension = hit.path.rsplit('.')[-1].lower()
if extension in DOWNLOAD_EXTENSIONS:
stats.count_lines_downloads.increment()
hit.is_download = True
@staticmethod
def check_user_agent(hit):
for s in itertools.chain(EXCLUDED_USER_AGENTS, config.options.excluded_useragents):
if s in user_agent:
if config.options.enable_bots:
hit.is_robot = True
return True
else:
stats.count_lines_skipped_user_agent.increment()
return False
@staticmethod
def check_http_error(hit):
if config.options.enable_http_errors:
hit.is_error = True
return True
else:
stats.count_lines_skipped_http_errors.increment()
return False
return True
@staticmethod
def check_http_redirect(hit):
if config.options.enable_http_redirects:
hit.is_redirect = True
return True
else:
stats.count_lines_skipped_http_redirects.increment()
return False
return True
@staticmethod
def check_path(hit):
for excluded_path in config.options.excluded_paths:
if fnmatch.fnmatch(hit.path, excluded_path):
return False
return True
Cyril Bay
a validé
@staticmethod
def detect_format(file):
Cyril Bay
a validé
"""
Return the format matching this file, or None if none was found.
Cyril Bay
a validé
"""
logging.debug('Detecting the log format')
for name, candidate_format in FORMATS.iteritems():
format = candidate_format.check_format(file)
if format:
Cyril Bay
a validé
logging.debug('Format %s matches', name)
Cyril Bay
a validé
else:
logging.debug('Format %s does not match', name)
@classmethod
def parse(cls, filename):
"""
Parse the specified filename and insert hits in the queue.
"""
def invalid_line(line, reason):
stats.count_lines_invalid.increment()
Cyril Bay
a validé
if config.options.debug >= 2:
logging.debug('Invalid line detected (%s): %s' % (reason, line))
if filename == '-':
filename = '(stdin)'
file = sys.stdin
else:
if not os.path.exists(filename):
print >> sys.stderr, 'File %s does not exist' % filename
return
else:
if filename.endswith('.bz2'):
open_func = bz2.BZ2File
elif filename.endswith('.gz'):
open_func = gzip.open
else:
open_func = open
file = open_func(filename, 'r')
if config.options.show_progress:
print 'Parsing log %s...' % filename
if config.format:
# The format was explicitely specified.
format = config.format
else:
# If the file is empty, don't bother.
data = file.read(100)
if len(data.strip()) == 0:
return
file.seek(0)
format = cls.detect_format(file)
if format is None:
return fatal_error(
'Cannot guess the logs format. Please give one using '
'either the --log-format-name or --log-format-regex option'
)
# Make sure the format is compatible with the resolver.
resolver.check_format(format)
hits = []
for lineno, line in enumerate(file):
try:
line = line.decode(config.options.encoding)
except UnicodeDecodeError:
invalid_line(line, 'invalid encoding')
continue
stats.count_lines_parsed.increment()
if stats.count_lines_parsed.value <= config.options.skip:
continue
match = format.regex.match(line)
invalid_line(line, 'line did not match')
continue
hit = Hit(
filename=filename,
lineno=lineno,
status=match.group('status'),
full_path=match.group('path'),
is_error=False,
is_redirect=False,
args={},
try:
hit.query_string = match.group('query_string')
hit.path = hit.full_path
except IndexError:
hit.path, _, hit.query_string = hit.full_path.partition(config.options.query_string_delimiter)
try:
hit.referrer = match.group('referrer')
except IndexError:
hit.referrer = ''
if hit.referrer == '-':
hit.referrer = ''
try:
hit.user_agent = match.group('user_agent')
except IndexError:
hit.user_agent = ''
hit.ip = match.group('ip')
try:
hit.length = int(match.group('length'))
except (ValueError, IndexError):
# Some lines or formats don't have a length (e.g. 304 redirects, IIS logs)
if config.options.log_hostname:
hit.host = config.options.log_hostname
else:
try:
hit.host = match.group('host').lower().strip('.')
except IndexError:
# Some formats have no host.
pass
# Check if the hit must be excluded.
if not all((getattr(cls, name)(hit) for name in cls.check_methods)):
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
continue
# Parse date.
# We parse it after calling check_methods as it's quite CPU hungry, and
# we want to avoid that cost for excluded hits.
date_string = match.group('date')
try:
hit.date = datetime.datetime.strptime(date_string, format.date_format)
except ValueError:
invalid_line(line, 'invalid date')
continue
# Parse timezone and substract its value from the date
try:
timezone = float(match.group('timezone'))
except IndexError:
timezone = 0
except ValueError:
invalid_line(line, 'invalid timezone')
continue
if timezone:
hit.date -= datetime.timedelta(hours=timezone/100)
# Check if the hit must be excluded.
if all((getattr(cls, name)(hit) for name in cls.check_methods)):
hits.append(hit)
benakamoorthi
a validé
if len(hits) >= config.options.recorder_max_payload_size * len(Recorder.recorders):
Recorder.add_hits(hits)
hits = []
if config.options.replay_tracking:
# we need a query string and we only consider requests with piwik.php
if hit.query_string and hit.path.lower().endswith('piwik.php'):
query_arguments = urlparse.parse_qs(hit.query_string)
if "idsite" in query_arguments:
try:
hit.args.update((k, v.pop().encode('raw_unicode_escape').decode(config.options.encoding)) for k, v in query_arguments.iteritems())
except UnicodeDecodeError:
invalid_line(line, 'invalid encoding')
continue
# add last chunk of hits
if len(hits) > 0:
Recorder.add_hits(hits)
for name, method in inspect.getmembers(Parser, predicate=callable):
if name.startswith('check_'):
Parser.check_methods.append(name)
def main():
"""
Start the importing process.
"""
benakamoorthi
a validé
stats.set_time_start()
if config.options.show_progress:
stats.start_monitor()
recorders = Recorder.launch(config.options.recorders)
try:
for filename in config.filenames:
parser.parse(filename)
Recorder.wait_empty()
except KeyboardInterrupt:
pass
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
stats.set_time_stop()
if config.options.show_progress:
stats.stop_monitor()
try:
Recorder.invalidate_reports()
except Piwik.Error, e:
pass
stats.print_summary()
def fatal_error(error, filename=None, lineno=None):
print >> sys.stderr, 'Fatal error: %s' % error
if filename and lineno is not None:
print >> sys.stderr, (
'You can restart the import of "%s" from the point it failed by '
'specifying --skip=%d on the command line.\n' % (filename, lineno)
)
os._exit(1)
if __name__ == '__main__':
try:
piwik = Piwik()
config = Configuration()
stats = Statistics()
resolver = config.get_resolver()
parser = Parser()
main()
except KeyboardInterrupt:
pass