Newer
Older
if config.options.replay_tracking:
# We only consider requests with piwik.php which don't need host to be imported
return self._resolve_when_replay_tracking(hit)
else:
return self._resolve_by_host(hit)
def check_format(self, format):
if config.options.replay_tracking:
pass
elif 'host' not in format.regex.groupindex and not config.options.log_hostname:
fatal_error(
"the selected log format doesn't include the hostname: you must "
"specify the Piwik site ID with the --idsite argument"
)
class Recorder(object):
"""
A Recorder fetches hits from the Queue and inserts them into Piwik using
the API.
"""
recorders = []
def __init__(self):
self.queue = Queue.Queue(maxsize=2)
benakamoorthi
a validé
# if bulk tracking disabled, make sure we can store hits outside of the Queue
if not config.options.use_bulk_tracking:
self.unrecorded_hits = []
@classmethod
def launch(cls, recorder_count):
"""
Launch a bunch of Recorder objects in a separate thread.
"""
for i in xrange(recorder_count):
recorder = Recorder()
benakamoorthi
a validé
run = recorder._run_bulk if config.options.use_bulk_tracking else recorder._run_single
t = threading.Thread(target=run)
t.daemon = True
t.start()
logging.debug('Launched recorder')
Add a set of hits to the recorders queue.
# Organize hits so that one client IP will always use the same queue.
# We have to do this so visits from the same IP will be added in the right order.
for hit in all_hits:
hits_by_client[abs(hash(hit.ip)) % len(cls.recorders)].append(hit)
recorder.queue.put(hits_by_client[i])
"""
Wait until all recorders have an empty queue.
"""
benakamoorthi
a validé
def _run_bulk(self):
hits = self.queue.get()
if len(hits) > 0:
try:
self._record_hits(hits)
except Piwik.Error, e:
fatal_error(e, hits[0].filename, hits[0].lineno) # approximate location of error
benakamoorthi
a validé
def _run_single(self):
while True:
if config.options.force_one_action_interval != False:
time.sleep(config.options.force_one_action_interval)
benakamoorthi
a validé
if len(self.unrecorded_hits) > 0:
hit = self.unrecorded_hits.pop(0)
benakamoorthi
a validé
try:
self._record_hits([hit])
except Piwik.Error, e:
fatal_error(e, hit.filename, hit.lineno)
else:
self.unrecorded_hits = self.queue.get()
self.queue.task_done()
def _wait_empty(self):
"""
Wait until the queue is empty.
"""
while True:
if self.queue.empty():
# We still have to wait for the last queue item being processed
# (queue.empty() returns True before queue.task_done() is
# called).
self.queue.join()
return
time.sleep(1)
def date_to_piwik(self, date):
date, time = date.isoformat(sep=' ').split()
return '%s %s' % (date, time.replace('-', ':'))
def _get_hit_args(self, hit):
Returns the args used in tracking a hit, without the token_auth.
"""
site_id, main_url = resolver.resolve(hit)
if site_id is None:
# This hit doesn't match any known Piwik site.
if config.options.replay_tracking:
stats.piwik_sites_ignored.add('unrecognized site ID %s' % hit.args.get('idsite'))
else:
stats.piwik_sites_ignored.add(hit.host)
stats.count_lines_no_site.increment()
return
stats.dates_recorded.add(hit.date.date())
Cyril Bay
a validé
path = hit.path
if hit.query_string and not config.options.strip_query_string:
path += config.options.query_string_delimiter + hit.query_string
Benaka Moorthi
a validé
# only prepend main url if it's a path
url = (main_url if path.startswith('/') else '') + path[:1024]
args = {
'rec': '1',
'apiv': '1',
Benaka Moorthi
a validé
'url': url.encode('utf8'),
'urlref': hit.referrer[:1024].encode('utf8'),
'cip': hit.ip,
'cdt': self.date_to_piwik(hit.date),
'idsite': site_id,
'dp': '0' if config.options.reverse_dns else '1',
'ua': hit.user_agent.encode('utf8'),
if config.options.replay_tracking:
# prevent request to be force recorded when option replay-tracking
args['rec'] = '0'
args.update(hit.args)
if hit.is_download:
args['download'] = args['url']
if hit.is_robot:
args['_cvar'] = '{"1":["Bot","%s"]}' % hit.user_agent
elif config.options.enable_bots:
args['_cvar'] = '{"1":["Not-Bot","%s"]}' % hit.user_agent
if hit.is_error or hit.is_redirect:
args['cvar'] = '{"1":["HTTP-code","%s"]}' % hit.status
args['action_name'] = '%s/URL = %s%s' % (
hit.status,
("/From = %s" % urllib.quote(args['urlref'], '') if args['urlref'] != '' else '')
Timo Besenreuther
a validé
if hit.generation_time_milli > 0:
mattab
a validé
args['gt_ms'] = hit.generation_time_milli
return args
def _record_hits(self, hits):
"""
Inserts several hits into Piwik.
"""
data = {
'token_auth': config.options.piwik_token_auth,
'requests': [self._get_hit_args(hit) for hit in hits]
}
if not config.options.dry_run:
piwik.call(
'/piwik.php', args={},
headers={'Content-type': 'application/json'},
data=data,
on_failure=self._on_tracking_failure
stats.count_lines_recorded.advance(len(hits))
def _on_tracking_failure(self, response, data):
"""
Removes the successfully tracked hits from the request payload so
they are not logged twice.
"""
try:
response = json.loads(response)
except:
# the response should be in JSON, but in case it can't be parsed just try another attempt
logging.debug("cannot parse tracker response, should be valid JSON")
return response
# remove the successfully tracked hits from payload
succeeded = response['statistics']['tracked']
data['requests'] = data['requests'][succeeded:]
@staticmethod
def invalidate_reports():
if config.options.dry_run or not stats.dates_recorded:
return
if config.options.invalidate_dates is not None:
dates = [date for date in config.options.invalidate_dates.split(',') if date]
else:
dates = [date.strftime('%Y-%m-%d') for date in stats.dates_recorded]
if dates:
print 'Purging Piwik archives for dates: ' + ' '.join(dates)
result = piwik.call_api(
'CoreAdminHome.invalidateArchivedReports',
dates=','.join(dates),
idSites=','.join(str(site_id) for site_id in stats.piwik_sites),
)
print('To re-process these reports with your new update data, execute the '
'piwik/misc/cron/archive.php script, or see: http://piwik.org/setup-auto-archiving/ '
'for more info.')
class Hit(object):
"""
It's a simple container.
"""
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
setattr(self, key, value)
super(Hit, self).__init__()
if config.options.force_lowercase_path:
self.full_path = self.full_path.lower()
class Parser(object):
"""
The Parser parses the lines in a specified file and inserts them into
a Queue.
"""
def __init__(self):
self.check_methods = [method for name, method
in inspect.getmembers(self, predicate=inspect.ismethod)
if name.startswith('check_')]
## All check_* methods are called for each hit and must return True if the
## hit can be imported, False otherwise.
def check_hostname(self, hit):
# Check against config.hostnames.
if not hasattr(hit, 'host') or not config.options.hostnames:
return True
# Accept the hostname only if it matches one pattern in the list.
result = any(
fnmatch.fnmatch(hit.host, pattern)
for pattern in config.options.hostnames
)
if not result:
stats.count_lines_hostname_skipped.increment()
return result
def check_static(self, hit):
extension = hit.path.rsplit('.')[-1].lower()
if extension in STATIC_EXTENSIONS:
if config.options.enable_static:
hit.is_download = True
return True
else:
stats.count_lines_static.increment()
return False
def check_download(self, hit):
extension = hit.path.rsplit('.')[-1].lower()
if extension in DOWNLOAD_EXTENSIONS:
stats.count_lines_downloads.increment()
hit.is_download = True
def check_user_agent(self, hit):
for s in itertools.chain(EXCLUDED_USER_AGENTS, config.options.excluded_useragents):
if s in user_agent:
if config.options.enable_bots:
hit.is_robot = True
return True
else:
stats.count_lines_skipped_user_agent.increment()
return False
def check_http_error(self, hit):
if config.options.enable_http_errors:
hit.is_error = True
return True
else:
stats.count_lines_skipped_http_errors.increment()
return False
return True
def check_http_redirect(self, hit):
if config.options.enable_http_redirects:
hit.is_redirect = True
return True
else:
stats.count_lines_skipped_http_redirects.increment()
return False
return True
for excluded_path in config.options.excluded_paths:
if fnmatch.fnmatch(hit.path, excluded_path):
return False
return True
Cyril Bay
a validé
@staticmethod
def detect_format(file):
Cyril Bay
a validé
"""
diosmosis
a validé
Return the best matching format for this file, or None if none was found.
Cyril Bay
a validé
"""
logging.debug('Detecting the log format')
diosmosis
a validé
format = None
format_groups = 0
for name, candidate_format in FORMATS.iteritems():
diosmosis
a validé
match = candidate_format.check_format(file)
if match:
Cyril Bay
a validé
logging.debug('Format %s matches', name)
diosmosis
a validé
# if there's more info in this match, use this format
match_groups = len(match.groups())
if format_groups < match_groups:
format = candidate_format
format_groups = match_groups
Cyril Bay
a validé
else:
logging.debug('Format %s does not match', name)
if format:
logging.debug('Format %s is the best match', format.name)
diosmosis
a validé
return format
"""
Parse the specified filename and insert hits in the queue.
"""
def invalid_line(line, reason):
stats.count_lines_invalid.increment()
Cyril Bay
a validé
if config.options.debug >= 2:
logging.debug('Invalid line detected (%s): %s' % (reason, line))
if filename == '-':
filename = '(stdin)'
file = sys.stdin
else:
if not os.path.exists(filename):
print >> sys.stderr, 'File %s does not exist' % filename
return
else:
if filename.endswith('.bz2'):
open_func = bz2.BZ2File
elif filename.endswith('.gz'):
open_func = gzip.open
else:
open_func = open
file = open_func(filename, 'r')
if config.options.show_progress:
print 'Parsing log %s...' % filename
if config.format:
# The format was explicitely specified.
format = config.format
else:
# If the file is empty, don't bother.
data = file.read(100)
if len(data.strip()) == 0:
return
file.seek(0)
format = self.detect_format(file)
if format is None:
return fatal_error(
'Cannot guess the logs format. Please give one using '
'either the --log-format-name or --log-format-regex option'
)
# Make sure the format is compatible with the resolver.
resolver.check_format(format)
hits = []
for lineno, line in enumerate(file):
try:
line = line.decode(config.options.encoding)
except UnicodeDecodeError:
invalid_line(line, 'invalid encoding')
continue
stats.count_lines_parsed.increment()
if stats.count_lines_parsed.value <= config.options.skip:
continue
match = format.regex.match(line)
invalid_line(line, 'line did not match')
continue
hit = Hit(
filename=filename,
lineno=lineno,
status=match.group('status'),
full_path=match.group('path'),
is_error=False,
is_redirect=False,
args={},
try:
hit.query_string = match.group('query_string')
hit.path = hit.full_path
except IndexError:
hit.path, _, hit.query_string = hit.full_path.partition(config.options.query_string_delimiter)
try:
hit.referrer = match.group('referrer')
except IndexError:
hit.referrer = ''
if hit.referrer == '-':
hit.referrer = ''
try:
hit.user_agent = match.group('user_agent')
except IndexError:
hit.user_agent = ''
hit.ip = match.group('ip')
try:
hit.length = int(match.group('length'))
except (ValueError, IndexError):
# Some lines or formats don't have a length (e.g. 304 redirects, IIS logs)
Timo Besenreuther
a validé
try:
hit.generation_time_milli = int(match.group('generation_time_milli'))
Timo Besenreuther
a validé
except IndexError:
try:
hit.generation_time_milli = int(match.group('generation_time_micro')) / 1000
except IndexError:
hit.generation_time_milli = 0
if config.options.log_hostname:
hit.host = config.options.log_hostname
else:
try:
hit.host = match.group('host').lower().strip('.')
except IndexError:
# Some formats have no host.
pass
# Check if the hit must be excluded.
if not all((method(hit) for method in self.check_methods)):
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
continue
# Parse date.
# We parse it after calling check_methods as it's quite CPU hungry, and
# we want to avoid that cost for excluded hits.
date_string = match.group('date')
try:
hit.date = datetime.datetime.strptime(date_string, format.date_format)
except ValueError:
invalid_line(line, 'invalid date')
continue
# Parse timezone and substract its value from the date
try:
timezone = float(match.group('timezone'))
except IndexError:
timezone = 0
except ValueError:
invalid_line(line, 'invalid timezone')
continue
if timezone:
hit.date -= datetime.timedelta(hours=timezone/100)
if config.options.replay_tracking:
# we need a query string and we only consider requests with piwik.php
if not hit.query_string or not hit.path.lower().endswith('piwik.php'):
query_arguments = urlparse.parse_qs(hit.query_string)
if not "idsite" in query_arguments:
invalid_line(line, 'missing idsite')
continue
try:
hit.args.update((k, v.pop().encode('raw_unicode_escape').decode(config.options.encoding)) for k, v in query_arguments.iteritems())
except UnicodeDecodeError:
invalid_line(line, 'invalid encoding')
continue
# Check if the hit must be excluded.
if all((method(hit) for method in self.check_methods)):
hits.append(hit)
benakamoorthi
a validé
if len(hits) >= config.options.recorder_max_payload_size * len(Recorder.recorders):
Recorder.add_hits(hits)
hits = []
# add last chunk of hits
if len(hits) > 0:
Recorder.add_hits(hits)
def main():
"""
Start the importing process.
"""
benakamoorthi
a validé
stats.set_time_start()
if config.options.show_progress:
stats.start_monitor()
recorders = Recorder.launch(config.options.recorders)
try:
for filename in config.filenames:
parser.parse(filename)
Recorder.wait_empty()
except KeyboardInterrupt:
pass
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
stats.set_time_stop()
if config.options.show_progress:
stats.stop_monitor()
try:
Recorder.invalidate_reports()
except Piwik.Error, e:
pass
stats.print_summary()
def fatal_error(error, filename=None, lineno=None):
print >> sys.stderr, 'Fatal error: %s' % error
if filename and lineno is not None:
print >> sys.stderr, (
'You can restart the import of "%s" from the point it failed by '
'specifying --skip=%d on the command line.\n' % (filename, lineno)
)
os._exit(1)
if __name__ == '__main__':
try:
piwik = Piwik()
config = Configuration()
stats = Statistics()
resolver = config.get_resolver()
parser = Parser()
main()
except KeyboardInterrupt:
pass