Newer
Older
benakamoorthi
a validé
def _add_site(self, hit):
main_url = 'http://' + hit.host
DynamicResolver._add_site_lock.acquire()
benakamoorthi
a validé
try:
# After we obtain the lock, make sure the site hasn't already been created.
res = self._get_site_id_from_hit_host(hit)
if res:
return res[0]['idsite']
# The site doesn't exist.
logging.debug('No Piwik site found for the hostname: %s', hit.host)
if config.options.site_id_fallback is not None:
logging.debug('Using default site for hostname: %s', hit.host)
return config.options.site_id_fallback
elif config.options.add_sites_new_hosts:
if config.options.dry_run:
# Let's just return a fake ID.
logging.debug('Creating a Piwik site for hostname %s', hit.host)
result = piwik.call_api(
'SitesManager.addSite',
siteName=hit.host,
urls=[main_url],
)
if result.get('result') == 'error':
logging.error("Couldn't create a Piwik site for host %s: %s",
hit.host, result.get('message'),
)
benakamoorthi
a validé
return None
else:
site_id = result['value']
stats.piwik_sites_created.append((hit.host, site_id))
benakamoorthi
a validé
return site_id
else:
# The site doesn't exist, we don't want to create new sites and
# there's no default site ID. We thus have to ignore this hit.
benakamoorthi
a validé
return None
finally:
DynamicResolver._add_site_lock.release()
benakamoorthi
a validé
def _resolve(self, hit):
res = self._get_site_id_from_hit_host(hit)
if res:
# The site already exists.
site_id = res[0]['idsite']
else:
site_id = self._add_site(hit)
if site_id is not None:
stats.piwik_sites.add(site_id)
def _resolve_when_replay_tracking(self, hit):
"""
If parsed site ID found in the _cache['sites'] return site ID and main_url,
otherwise return (None, None) tuple.
"""
site_id = hit.args['idsite']
if site_id in self._cache['sites']:
stats.piwik_sites.add(site_id)
return (site_id, self._cache['sites'][site_id]['main_url'])
else:
return (None, None)
def _resolve_by_host(self, hit):
"""
Returns the site ID and site URL for a hit based on the hostname.
"""
try:
site_id = self._cache[hit.host]
except KeyError:
logging.debug(
'Site ID for hostname %s not in cache', hit.host
)
site_id = self._resolve(hit)
logging.debug('Site ID for hostname %s: %s', hit.host, site_id)
self._cache[hit.host] = site_id
return (site_id, 'http://' + hit.host)
def resolve(self, hit):
"""
Return the site ID from the cache if found, otherwise call _resolve.
If replay_tracking option is enabled, call _resolve_when_replay_tracking.
if config.options.replay_tracking:
# We only consider requests with piwik.php which don't need host to be imported
return self._resolve_when_replay_tracking(hit)
else:
return self._resolve_by_host(hit)
def check_format(self, format):
if config.options.replay_tracking:
pass
elif 'host' not in format.regex.groupindex and not config.options.log_hostname:
fatal_error(
"the selected log format doesn't include the hostname: you must "
"specify the Piwik site ID with the --idsite argument"
)
class Recorder(object):
"""
A Recorder fetches hits from the Queue and inserts them into Piwik using
the API.
"""
recorders = []
def __init__(self):
self.queue = Queue.Queue(maxsize=2)
benakamoorthi
a validé
# if bulk tracking disabled, make sure we can store hits outside of the Queue
if not config.options.use_bulk_tracking:
self.unrecorded_hits = []
@classmethod
def launch(cls, recorder_count):
"""
Launch a bunch of Recorder objects in a separate thread.
"""
for i in xrange(recorder_count):
recorder = Recorder()
benakamoorthi
a validé
run = recorder._run_bulk if config.options.use_bulk_tracking else recorder._run_single
t = threading.Thread(target=run)
t.daemon = True
t.start()
logging.debug('Launched recorder')
Add a set of hits to the recorders queue.
# Organize hits so that one client IP will always use the same queue.
# We have to do this so visits from the same IP will be added in the right order.
for hit in all_hits:
hits_by_client[abs(hash(hit.ip)) % len(cls.recorders)].append(hit)
recorder.queue.put(hits_by_client[i])
"""
Wait until all recorders have an empty queue.
"""
benakamoorthi
a validé
def _run_bulk(self):
hits = self.queue.get()
if len(hits) > 0:
try:
self._record_hits(hits)
except Piwik.Error, e:
fatal_error(e, hits[0].filename, hits[0].lineno) # approximate location of error
benakamoorthi
a validé
def _run_single(self):
while True:
if config.options.force_one_action_interval != False:
time.sleep(config.options.force_one_action_interval)
benakamoorthi
a validé
if len(self.unrecorded_hits) > 0:
hit = self.unrecorded_hits.pop(0)
benakamoorthi
a validé
try:
self._record_hits([hit])
except Piwik.Error, e:
fatal_error(e, hit.filename, hit.lineno)
else:
self.unrecorded_hits = self.queue.get()
self.queue.task_done()
def _wait_empty(self):
"""
Wait until the queue is empty.
"""
while True:
if self.queue.empty():
# We still have to wait for the last queue item being processed
# (queue.empty() returns True before queue.task_done() is
# called).
self.queue.join()
return
time.sleep(1)
def date_to_piwik(self, date):
date, time = date.isoformat(sep=' ').split()
return '%s %s' % (date, time.replace('-', ':'))
def _get_hit_args(self, hit):
Returns the args used in tracking a hit, without the token_auth.
"""
site_id, main_url = resolver.resolve(hit)
if site_id is None:
# This hit doesn't match any known Piwik site.
if config.options.replay_tracking:
stats.piwik_sites_ignored.add('unrecognized site ID %s' % hit.args.get('idsite'))
else:
stats.piwik_sites_ignored.add(hit.host)
stats.count_lines_no_site.increment()
return
stats.dates_recorded.add(hit.date.date())
Cyril Bay
a validé
path = hit.path
if hit.query_string and not config.options.strip_query_string:
path += config.options.query_string_delimiter + hit.query_string
Benaka Moorthi
a validé
# only prepend main url if it's a path
url = (main_url if path.startswith('/') else '') + path[:1024]
args = {
'rec': '1',
'apiv': '1',
Benaka Moorthi
a validé
'url': url.encode('utf8'),
'urlref': hit.referrer[:1024].encode('utf8'),
'cip': hit.ip,
'cdt': self.date_to_piwik(hit.date),
'idsite': site_id,
'dp': '0' if config.options.reverse_dns else '1',
'ua': hit.user_agent.encode('utf8'),
if config.options.replay_tracking:
# prevent request to be force recorded when option replay-tracking
args['rec'] = '0'
args.update(hit.args)
if hit.is_download:
args['download'] = args['url']
if hit.is_robot:
args['_cvar'] = '{"1":["Bot","%s"]}' % hit.user_agent
elif config.options.enable_bots:
args['_cvar'] = '{"1":["Not-Bot","%s"]}' % hit.user_agent
if hit.is_error or hit.is_redirect:
args['cvar'] = '{"1":["HTTP-code","%s"]}' % hit.status
args['action_name'] = '%s/URL = %s%s' % (
hit.status,
("/From = %s" % urllib.quote(args['urlref'], '') if args['urlref'] != '' else '')
Timo Besenreuther
a validé
if hit.generation_time_milli > 0:
mattab
a validé
args['gt_ms'] = hit.generation_time_milli
return args
def _record_hits(self, hits):
"""
Inserts several hits into Piwik.
"""
data = {
'token_auth': config.options.piwik_token_auth,
'requests': [self._get_hit_args(hit) for hit in hits]
}
if not config.options.dry_run:
piwik.call(
'/piwik.php', args={},
headers={'Content-type': 'application/json'},
data=data,
on_failure=self._on_tracking_failure
stats.count_lines_recorded.advance(len(hits))
def _on_tracking_failure(self, response, data):
"""
Removes the successfully tracked hits from the request payload so
they are not logged twice.
"""
try:
response = json.loads(response)
except:
# the response should be in JSON, but in case it can't be parsed just try another attempt
logging.debug("cannot parse tracker response, should be valid JSON")
return response
# remove the successfully tracked hits from payload
succeeded = response['tracked']
data['requests'] = data['requests'][tracked:]
@staticmethod
def invalidate_reports():
if config.options.dry_run or not stats.dates_recorded:
return
if config.options.invalidate_dates is not None:
dates = [date for date in config.options.invalidate_dates.split(',') if date]
else:
dates = [date.strftime('%Y-%m-%d') for date in stats.dates_recorded]
if dates:
print 'Purging Piwik archives for dates: ' + ' '.join(dates)
result = piwik.call_api(
'CoreAdminHome.invalidateArchivedReports',
dates=','.join(dates),
idSites=','.join(str(site_id) for site_id in stats.piwik_sites),
)
print('To re-process these reports with your new update data, execute the '
'piwik/misc/cron/archive.php script, or see: http://piwik.org/setup-auto-archiving/ '
'for more info.')
class Hit(object):
"""
It's a simple container.
"""
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
setattr(self, key, value)
super(Hit, self).__init__()
if config.options.force_lowercase_path:
self.full_path = self.full_path.lower()
class Parser(object):
"""
The Parser parses the lines in a specified file and inserts them into
a Queue.
"""
def __init__(self):
self.check_methods = [method for name, method
in inspect.getmembers(self, predicate=inspect.ismethod)
if name.startswith('check_')]
## All check_* methods are called for each hit and must return True if the
## hit can be imported, False otherwise.
def check_hostname(self, hit):
# Check against config.hostnames.
if not hasattr(hit, 'host') or not config.options.hostnames:
return True
# Accept the hostname only if it matches one pattern in the list.
result = any(
fnmatch.fnmatch(hit.host, pattern)
for pattern in config.options.hostnames
)
if not result:
stats.count_lines_hostname_skipped.increment()
return result
def check_static(self, hit):
extension = hit.path.rsplit('.')[-1].lower()
if extension in STATIC_EXTENSIONS:
if config.options.enable_static:
hit.is_download = True
return True
else:
stats.count_lines_static.increment()
return False
def check_download(self, hit):
extension = hit.path.rsplit('.')[-1].lower()
if extension in DOWNLOAD_EXTENSIONS:
stats.count_lines_downloads.increment()
hit.is_download = True
def check_user_agent(self, hit):
for s in itertools.chain(EXCLUDED_USER_AGENTS, config.options.excluded_useragents):
if s in user_agent:
if config.options.enable_bots:
hit.is_robot = True
return True
else:
stats.count_lines_skipped_user_agent.increment()
return False
def check_http_error(self, hit):
if config.options.enable_http_errors:
hit.is_error = True
return True
else:
stats.count_lines_skipped_http_errors.increment()
return False
return True
def check_http_redirect(self, hit):
if config.options.enable_http_redirects:
hit.is_redirect = True
return True
else:
stats.count_lines_skipped_http_redirects.increment()
return False
return True
for excluded_path in config.options.excluded_paths:
if fnmatch.fnmatch(hit.path, excluded_path):
return False
# By default, all paths are included.
if config.options.included_paths:
for included_path in config.options.included_paths:
if fnmatch.fnmatch(hit.path, included_path):
return True
return False
Cyril Bay
a validé
@staticmethod
diosmosis
a validé
def check_format(lineOrFile):
format = False
diosmosis
a validé
format_groups = 0
for name, candidate_format in FORMATS.iteritems():
logging.debug("Check format %s", name)
diosmosis
a validé
match = None
try:
if isinstance(lineOrFile, basestring):
match = candidate_format.check_format_line(lineOrFile)
else:
match = candidate_format.check_format(lineOrFile)
except:
pass
diosmosis
a validé
if match:
Cyril Bay
a validé
logging.debug('Format %s matches', name)
# compare format groups if this *BaseFormat has groups() method
try:
# if there's more info in this match, use this format
match_groups = len(match.groups())
if format_groups < match_groups:
format = candidate_format
format_groups = match_groups
except AttributeError:
diosmosis
a validé
format = candidate_format
Cyril Bay
a validé
else:
logging.debug('Format %s does not match', name)
diosmosis
a validé
return format
@staticmethod
def detect_format(file):
"""
Return the best matching format for this file, or None if none was found.
"""
logging.debug('Detecting the log format')
format = False
# check the format using the file (for formats like the IIS one)
format = Parser.check_format(file)
# check the format using the first N lines (to avoid irregular ones)
diosmosis
a validé
lineno = 0
limit = 100000
while not format and lineno < limit:
diosmosis
a validé
line = file.readline()
lineno = lineno + 1
logging.debug("Detecting format against line %i" % lineno)
format = Parser.check_format(line)
file.seek(0)
Benaka Moorthi
a validé
if not format:
fatal_error("cannot automatically determine the log format using the first %d lines of the log file. " % limit +
"\Maybe try specifying the format with the --log-format-name command line argument." )
Benaka Moorthi
a validé
return
Benaka Moorthi
a validé
logging.debug('Format %s is the best match', format.name)
diosmosis
a validé
return format
"""
Parse the specified filename and insert hits in the queue.
"""
def invalid_line(line, reason):
stats.count_lines_invalid.increment()
Cyril Bay
a validé
if config.options.debug >= 2:
logging.debug('Invalid line detected (%s): %s' % (reason, line))
if filename == '-':
filename = '(stdin)'
file = sys.stdin
else:
if not os.path.exists(filename):
print >> sys.stderr, 'File %s does not exist' % filename
return
else:
if filename.endswith('.bz2'):
open_func = bz2.BZ2File
elif filename.endswith('.gz'):
open_func = gzip.open
else:
open_func = open
file = open_func(filename, 'r')
if config.options.show_progress:
print 'Parsing log %s...' % filename
if config.format:
# The format was explicitely specified.
format = config.format
else:
# If the file is empty, don't bother.
data = file.read(100)
if len(data.strip()) == 0:
return
file.seek(0)
format = self.detect_format(file)
if format is None:
return fatal_error(
'Cannot guess the logs format. Please give one using '
'either the --log-format-name or --log-format-regex option'
)
# Make sure the format is compatible with the resolver.
resolver.check_format(format)
hits = []
for lineno, line in enumerate(file):
try:
line = line.decode(config.options.encoding)
except UnicodeDecodeError:
invalid_line(line, 'invalid encoding')
continue
stats.count_lines_parsed.increment()
if stats.count_lines_parsed.value <= config.options.skip:
continue
match = format.match(line)
invalid_line(line, 'line did not match')
continue
hit = Hit(
filename=filename,
lineno=lineno,
status=format.get('status'),
full_path=format.get('path'),
is_error=False,
is_redirect=False,
args={},
hit.query_string = format.get('query_string')
hit.path, _, hit.query_string = hit.full_path.partition(config.options.query_string_delimiter)
hit.referrer = format.get('referrer')
except BaseFormatException:
hit.referrer = ''
if hit.referrer == '-':
hit.referrer = ''
try:
hit.user_agent = format.get('user_agent')
except BaseFormatException:
hit.length = int(format.get('length'))
except (ValueError, BaseFormatException):
# Some lines or formats don't have a length (e.g. 304 redirects, IIS logs)
Timo Besenreuther
a validé
try:
hit.generation_time_milli = int(format.get('generation_time_milli'))
except BaseFormatException:
hit.generation_time_milli = int(format.get('generation_time_micro')) / 1000
except BaseFormatException:
if config.options.log_hostname:
hit.host = config.options.log_hostname
else:
try:
hit.host = format.get('host').lower().strip('.')
except BaseFormatException:
# Some formats have no host.
pass
# Check if the hit must be excluded.
if not all((method(hit) for method in self.check_methods)):
continue
# Parse date.
# We parse it after calling check_methods as it's quite CPU hungry, and
# we want to avoid that cost for excluded hits.
try:
hit.date = datetime.datetime.strptime(date_string, format.date_format)
except ValueError:
invalid_line(line, 'invalid date')
continue
# Parse timezone and substract its value from the date
try:
timezone = float(format.get('timezone'))
except BaseFormatException:
timezone = 0
except ValueError:
invalid_line(line, 'invalid timezone')
continue
if timezone:
hit.date -= datetime.timedelta(hours=timezone/100)
if config.options.replay_tracking:
# we need a query string and we only consider requests with piwik.php
if not hit.query_string or not hit.path.lower().endswith('piwik.php'):
query_arguments = urlparse.parse_qs(hit.query_string)
if not "idsite" in query_arguments:
invalid_line(line, 'missing idsite')
continue
try:
hit.args.update((k, v.pop().encode('raw_unicode_escape').decode(config.options.encoding)) for k, v in query_arguments.iteritems())
except UnicodeDecodeError:
invalid_line(line, 'invalid encoding')
continue
# Check if the hit must be excluded.
if all((method(hit) for method in self.check_methods)):
hits.append(hit)
benakamoorthi
a validé
if len(hits) >= config.options.recorder_max_payload_size * len(Recorder.recorders):
Recorder.add_hits(hits)
hits = []
# add last chunk of hits
if len(hits) > 0:
Recorder.add_hits(hits)
def main():
"""
Start the importing process.
"""
benakamoorthi
a validé
stats.set_time_start()
if config.options.show_progress:
stats.start_monitor()
recorders = Recorder.launch(config.options.recorders)
try:
for filename in config.filenames:
parser.parse(filename)
Recorder.wait_empty()
except KeyboardInterrupt:
pass
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
stats.set_time_stop()
if config.options.show_progress:
stats.stop_monitor()
try:
Recorder.invalidate_reports()
except Piwik.Error, e:
pass
stats.print_summary()
def fatal_error(error, filename=None, lineno=None):
print >> sys.stderr, 'Fatal error: %s' % error
if filename and lineno is not None:
print >> sys.stderr, (
'You can restart the import of "%s" from the point it failed by '
'specifying --skip=%d on the command line.\n' % (filename, lineno)
)
os._exit(1)
if __name__ == '__main__':
try:
piwik = Piwik()
config = Configuration()
stats = Statistics()
resolver = config.get_resolver()
parser = Parser()
main()
except KeyboardInterrupt:
pass