and though bugs are the bane of my existence, rest assured the wretched thing will get the best of care here

...
 
Commits (77)
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: Python package
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Test with pytest
run: |
pytest
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
name: Upload Python Package
on:
release:
types: [created]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine
- name: Build and publish
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
run: |
python setup.py sdist bdist_wheel
twine upload dist/*
......@@ -27,3 +27,4 @@ tests/.excluded_sites
# MacOS Folder Metadata File
.DS_Store
/reports/
# Changelog
## [Unreleased]
## [0.1.11] - 2021-01-16
* tags and custom data checks bugfixes
* added parsing activation logic
## [0.1.10] - 2021-01-13
* added report static resources into package
## [0.1.9] - 2021-01-11
* added HTML and PDF report export
* fixed support of Python 3.6
* fixed tags filtering and ranking
* more than 2000 sites supported
* refactored sites and engines logic
* added tests
## [0.1.8] - 2020-12-31
* added XMind export
* more than 1500 sites supported
* parallel processing of requests
## [0.1.7] - 2020-12-11
* fixed proxies support
* fixed aiohttp stuff to prevent python 3.7 bugs
* fixed self-checking database saving error
## [0.1.6] - 2020-12-05
* fixed Dockerfile and README
## [0.1.5] - 2020-12-05 [YANKED]
## [0.1.4] - 2020-12-05 [YANKED]
## [0.1.3] - 2020-12-05 [YANKED]
## [0.1.2] - 2020-12-05 [YANKED]
## [0.1.1] - 2020-12-05 [YANKED]
## [0.1.0] - 2020-12-05
* initial release
\ No newline at end of file
include LICENSE
include README.md
include requirements.txt
include maigret/resources/data.json
include maigret/resources/*
# Maigret
![PyPI](https://img.shields.io/pypi/v/maigret?style=flat-square)
![PyPI - Downloads](https://img.shields.io/pypi/dw/maigret?style=flat-square)
[![Chat - Gitter](./static/chat_gitter.svg)](https://gitter.im/maigret-osint/community)
<p align="center">
<img src="./static/maigret.png" />
</p>
......@@ -13,7 +17,7 @@ Purpose of Maigret - **collect a dossier on a person by username only**, checkin
This is a [sherlock](https://github.com/sherlock-project/) fork with cool features under heavy development.
*Don't forget to regularly update source code from repo*.
Currently supported >1300 sites ([full list](./sites.md)).
Currently supported more than 2000 sites ([full list](./sites.md)), by default search is launched against 500 popular sites in descending order of popularity.
## Main features
......@@ -25,7 +29,7 @@ Currently supported >1300 sites ([full list](./sites.md)).
## Installation
**NOTE**: Python 3.7 or higher and pip is required.
**NOTE**: Python 3.6 or higher and pip is required.
**Python 3.8 is recommended.**
......@@ -43,9 +47,20 @@ $ pip3 install .
```bash
maigret user
maigret user1 user2 user3 --print-not-found
# make HTML and PDF reports
maigret user --html --pdf
# search on sites marked with tags photo & dating
maigret user --tags photo,dating
# search for three usernames on all available sites
maigret user1 user2 user3 -a
```
Run `maigret --help` to get arguments description. Also options are documented in [the Maigret Wiki](https://github.com/soxoj/maigret/wiki/Command-line-options).
With Docker:
```
docker build -t maigret .
......@@ -55,13 +70,20 @@ docker run maigret user
## Demo with page parsing and recursive username search
[PDF report](./static/report_alexaimephotographycars.pdf), [HTML report](https://htmlpreview.github.io/?https://raw.githubusercontent.com/soxoj/maigret/main/static/report_alexaimephotographycars.html)
```bash
maigret alexaimephotographycars
```
![animation of recursive search](./static/recursive_search.svg)
[Full output](./static/recursive_search.md)
![HTML report screenshot](./static/report_alexaimephotography_html_screenshot.png)
![XMind report screenshot](./static/report_alexaimephotography_xmind_screenshot.png)
[Full console output](./static/recursive_search.md)
## License
......
#! /usr/bin/env python3
import asyncio
import sys
from maigret.maigret import main
def run():
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Maigret is interrupted.')
sys.exit(1)
if __name__ == "__main__":
run()
\ No newline at end of file
......@@ -5,8 +5,8 @@ Maigret entrypoint
"""
import asyncio
import maigret
import maigret
if __name__ == "__main__":
asyncio.run(maigret.main())
import requests
class ParsingActivator:
@staticmethod
def twitter(site, logger):
headers = dict(site.headers)
del headers['x-guest-token']
r = requests.post(site.activation['url'], headers=headers)
logger.info(r)
j = r.json()
guest_token = j[site.activation['src']]
site.headers['x-guest-token'] = guest_token
@staticmethod
def vimeo(site, logger):
headers = dict(site.headers)
if 'Authorization' in headers:
del headers['Authorization']
r = requests.get(site.activation['url'], headers=headers)
jwt_token = r.json()['jwt']
site.headers['Authorization'] = 'jwt ' + jwt_token
#! /usr/bin/env python3
"""
Maigret main module
"""
import aiohttp
import asyncio
import csv
import http.cookiejar as cookielib
......@@ -12,22 +11,26 @@ import logging
import os
import platform
import re
import requests
import ssl
import sys
import tqdm.asyncio
import xmind
from aiohttp_socks import ProxyConnector
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from http.cookies import SimpleCookie
import aiohttp
import requests
from mock import Mock
from socid_extractor import parse, extract
from python_socks import _errors as proxy_errors
from socid_extractor import parse, extract, __version__ as socid_version
from .activation import ParsingActivator
from .notify import QueryNotifyPrint
from .report import save_csv_report, save_xmind_report, save_html_report, save_pdf_report, \
generate_report_context, save_txt_report
from .result import QueryResult, QueryStatus
from .sites import SitesInformation
from .sites import MaigretDatabase, MaigretSite
__version__ = '0.1.6'
__version__ = '0.1.11'
supported_recursive_search_ids = (
'yandex_public_id',
......@@ -39,10 +42,13 @@ supported_recursive_search_ids = (
common_errors = {
'<title>Attention Required! | Cloudflare</title>': 'Cloudflare captcha',
'Please stand by, while we are checking your browser': 'Cloudflare captcha',
'<title>Доступ ограничен</title>': 'Rostelecom censorship',
'document.getElementById(\'validate_form_submit\').disabled=true': 'Mail.ru captcha',
'Verifying your browser, please wait...<br>DDoS Protection by</font> Blazingfast.io': 'Blazingfast protection',
'404</h1><p class="error-card__description">Мы&nbsp;не&nbsp;нашли страницу': 'MegaFon 404 page',
'Доступ к информационному ресурсу ограничен на основании Федерального закона': 'MGTS censorship',
'Incapsula incident ID': 'Incapsula antibot protection',
}
unsupported_characters = '#'
......@@ -50,7 +56,7 @@ unsupported_characters = '#'
cookies_file = 'cookies.txt'
async def get_response(request_future, error_type, social_network, logger):
async def get_response(request_future, site_name, logger):
html_text = None
status_code = 0
......@@ -83,8 +89,11 @@ async def get_response(request_future, error_type, social_network, logger):
except aiohttp.http_exceptions.BadHttpMessage as err:
error_text = "HTTP Error"
expection_text = str(err)
except proxy_errors.ProxyError as err:
error_text = "Proxy Error"
expection_text = str(err)
except Exception as err:
logger.warning(f'Unhandled error while requesting {social_network}: {err}')
logger.warning(f'Unhandled error while requesting {site_name}: {err}')
logger.debug(err, exc_info=True)
error_text = "Some Error"
expection_text = str(err)
......@@ -93,19 +102,19 @@ async def get_response(request_future, error_type, social_network, logger):
return html_text, status_code, error_text, expection_text
async def update_site_data_from_response(site, site_data, site_info, semaphore, logger):
async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify):
async with semaphore:
future = site_info.get('request_future')
site_obj = site_dict[sitename]
future = site_obj.request_future
if not future:
# ignore: search by incompatible id type
return
error_type = site_info['errorType']
site_data[site]['resp'] = await get_response(request_future=future,
error_type=error_type,
social_network=site,
logger=logger)
response = await get_response(request_future=future,
site_name=sitename,
logger=logger)
site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj)
# TODO: move info separate module
def detect_error_page(html_text, status_code, fail_flags, ignore_403):
......@@ -128,10 +137,180 @@ def detect_error_page(html_text, status_code, fail_flags, ignore_403):
return None, None
async def maigret(username, site_data, query_notify, logger,
def process_site_result(response, query_notify, logger, results_info, site: MaigretSite):
if not response:
return results_info
fulltags = site.tags
# Retrieve other site information again
username = results_info['username']
is_parsing_enabled = results_info['parsing_enabled']
url = results_info.get("url_user")
logger.debug(url)
status = results_info.get("status")
if status is not None:
# We have already determined the user doesn't exist here
return results_info
# Get the expected check type
check_type = site.check_type
# Get the failure messages and comments
failure_errors = site.errors
# TODO: refactor
if not response:
logger.error(f'No response for {site.name}')
return results_info
html_text, status_code, error_text, expection_text = response
site_error_text = '?'
# TODO: add elapsed request time counting
response_time = None
if logger.level == logging.DEBUG:
with open('debug.txt', 'a') as f:
status = status_code or 'No response'
f.write(f'url: {url}\nerror: {str(error_text)}\nr: {status}\n')
if html_text:
f.write(f'code: {status}\nresponse: {str(html_text)}\n')
if status_code and not error_text:
error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors,
site.ignore_403)
if site.activation and html_text:
is_need_activation = any([s for s in site.activation['marks'] if s in html_text])
if is_need_activation:
method = site.activation['method']
try:
activate_fun = getattr(ParsingActivator(), method)
# TODO: async call
activate_fun(site, logger)
except AttributeError:
logger.warning(f'Activation method {method} for site {site.name} not found!')
# presense flags
# True by default
presense_flags = site.presense_strs
is_presense_detected = False
if html_text:
if not presense_flags:
is_presense_detected = True
site.stats['presense_flag'] = None
else:
for presense_flag in presense_flags:
if presense_flag in html_text:
is_presense_detected = True
site.stats['presense_flag'] = presense_flag
logger.info(presense_flag)
break
if error_text is not None:
logger.debug(error_text)
result = QueryResult(username,
site.name,
url,
QueryStatus.UNKNOWN,
query_time=response_time,
context=f'{error_text}: {site_error_text}', tags=fulltags)
elif check_type == "message":
absence_flags = site.absence_strs
is_absence_flags_list = isinstance(absence_flags, list)
absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags}
# Checks if the error message is in the HTML
is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set])
if not is_absence_detected and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
elif check_type == "status_code":
# Checks if the status code of the response is 2XX
if (not status_code >= 300 or status_code < 200) and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
elif check_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
# match the request. Instead, we will ensure that the response
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = QueryResult(username,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown check type '{check_type}' for "
f"site '{site.name}'")
extracted_ids_data = {}
if is_parsing_enabled and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f'Error while parsing {site.name}: {e}', exc_info=True)
if extracted_ids_data:
new_usernames = {}
for k, v in extracted_ids_data.items():
if 'username' in k:
new_usernames[v] = 'username'
if k in supported_recursive_search_ids:
new_usernames[v] = k
results_info['ids_usernames'] = new_usernames
result.ids_data = extracted_ids_data
# Notify caller about results of query.
query_notify.update(result, site.similar_search)
# Save status of request
results_info['status'] = result
# Save results from request
results_info['http_status'] = status_code
results_info['is_similar'] = site.similar_search
# results_site['response_text'] = html_text
results_info['rank'] = site.alexa_rank
return results_info
async def maigret(username, site_dict, query_notify, logger,
proxy=None, timeout=None, recursive_search=False,
id_type='username', tags=None, debug=False, forced=False,
max_connections=100):
id_type='username', debug=False, forced=False,
max_connections=100, no_progressbar=False):
"""Main search func
Checks for existence of username on various social media sites.
......@@ -139,7 +318,7 @@ async def maigret(username, site_data, query_notify, logger,
Keyword Arguments:
username -- String indicating username that report
should be created against.
site_data -- Dictionary containing all of the site data.
site_dict -- Dictionary containing all of the site data.
query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about
query results.
......@@ -163,53 +342,66 @@ async def maigret(username, site_data, query_notify, logger,
"""
# Notify caller that we are starting the query.
if tags is None:
tags = set()
query_notify.start(username, id_type)
# TODO: connector
connector = aiohttp.TCPConnector(ssl=False)
session = aiohttp.ClientSession(connector=connector)
connector = ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
# connector = aiohttp.TCPConnector(ssl=False)
connector.verify_ssl=False
session = aiohttp.ClientSession(connector=connector, trust_env=True)
if logger.level == logging.DEBUG:
future = session.get(url='https://icanhazip.com')
ip, status, error, expection = await get_response(future, None, logger)
if ip:
logger.debug(f'My IP is: {ip.strip()}')
else:
logger.debug(f'IP requesting {error}: {expection}')
# Results from analysis of all sites
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for social_network, net_info in site_data.items():
if net_info.get('type', 'username') != id_type:
continue
for site_name, site in site_dict.items():
site_tags = set(net_info.get('tags', []))
if tags:
if not set(tags).intersection(site_tags):
continue
if site.type != id_type:
continue
if 'disabled' in net_info and net_info['disabled'] and not forced:
if site.disabled and not forced:
logger.debug(f'Site {site.name} is disabled, skipping...')
continue
# Results from analysis of this specific site
results_site = {}
# Record URL of main site
results_site['url_main'] = net_info.get("urlMain")
# Record URL of main site and username
results_site['username'] = username
results_site['parsing_enabled'] = recursive_search
results_site['url_main'] = site.url_main
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0',
}
if "headers" in net_info:
# Override/append any extra headers required by a given site.
headers.update(net_info["headers"])
headers.update(site.headers)
if not 'url' in site.__dict__:
logger.error('No URL for site %s', site.name)
# URL of user on site (if it exists)
url = net_info.get('url').format(username)
url = site.url.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username
)
# workaround to prevent slash errors
url = re.sub('(?<!:)/+', '/', url)
# Don't make request if username is invalid for the site
regex_check = net_info.get("regexCheck")
if regex_check and re.search(regex_check, username) is None:
if site.regex_check and re.search(site.regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
results_site['status'] = QueryResult(username,
social_network,
site_name,
url,
QueryStatus.ILLEGAL)
results_site["url_user"] = ""
......@@ -219,16 +411,21 @@ async def maigret(username, site_data, query_notify, logger,
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = net_info.get("urlProbe")
url_probe = site.url_probe
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(username)
url_probe = url_probe.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username,
)
if net_info["errorType"] == 'status_code' and net_info.get("request_head_only", True):
if site.check_type == 'status_code' and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
......@@ -239,7 +436,7 @@ async def maigret(username, site_data, query_notify, logger,
# not respond properly unless we request the whole page.
request_method = session.get
if net_info["errorType"] == "response_url":
if site.check_type == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
......@@ -250,196 +447,44 @@ async def maigret(username, site_data, query_notify, logger,
allow_redirects = True
# TODO: cookies using
def parse_cookies(cookies_str):
cookies = SimpleCookie()
cookies.load(cookies_str)
return {key: morsel.value for key, morsel in cookies.items()}
if os.path.exists(cookies_file):
cookies_obj = cookielib.MozillaCookieJar(cookies_file)
cookies_obj.load(ignore_discard=True, ignore_expires=True)
else:
cookies_obj = []
# This future starts running the request in a new thread, doesn't block the main thread
if proxy is not None:
proxies = {"http": proxy, "https": proxy}
future = request_method(url=url_probe, headers=headers,
proxies=proxies,
allow_redirects=allow_redirects,
timeout=timeout,
)
else:
future = request_method(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
# def parse_cookies(cookies_str):
# cookies = SimpleCookie()
# cookies.load(cookies_str)
# return {key: morsel.value for key, morsel in cookies.items()}
#
# if os.path.exists(cookies_file):
# cookies_obj = cookielib.MozillaCookieJar(cookies_file)
# cookies_obj.load(ignore_discard=True, ignore_expires=True)
future = request_method(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
# Store future in data for access later
net_info["request_future"] = future
# TODO: move to separate obj
site.request_future = future
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
results_total[site_name] = results_site
# TODO: move into top-level function
sem = asyncio.Semaphore(max_connections)
tasks = []
for social_network, net_info in site_data.items():
future = asyncio.ensure_future(update_site_data_from_response(social_network, site_data, net_info, sem, logger))
for sitename, result_obj in results_total.items():
update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify)
future = asyncio.ensure_future(update_site_coro)
tasks.append(future)
await asyncio.gather(*tasks)
await session.close()
# TODO: split to separate functions
for social_network, net_info in site_data.items():
# Retrieve results again
results_site = results_total.get(social_network)
if not results_site:
continue
# Retrieve other site information again
url = results_site.get("url_user")
logger.debug(url)
status = results_site.get("status")
if status is not None:
# We have already determined the user doesn't exist here
continue
# Get the expected error type
error_type = net_info["errorType"]
# Get the failure messages and comments
failure_errors = net_info.get("errors", {})
# TODO: refactor
resp = net_info.get('resp')
if not resp:
logger.error(f'No response for {social_network}')
continue
html_text, status_code, error_text, expection_text = resp
# TODO: add elapsed request time counting
response_time = None
if debug:
with open('debug.txt', 'a') as f:
status = status_code or 'No response'
f.write(f'url: {url}\nerror: {str(error_text)}\nr: {status}\n')
if html_text:
f.write(f'code: {status}\nresponse: {str(html_text)}\n')
if status_code and not error_text:
error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors,
'ignore_403' in net_info)
# presense flags
# True by default
presense_flags = net_info.get("presenseStrs", [])
is_presense_detected = html_text and all(
[(presense_flag in html_text) for presense_flag in presense_flags]) or not presense_flags
if error_text is not None:
logger.debug(error_text)
result = QueryResult(username,
social_network,
url,
QueryStatus.UNKNOWN,
query_time=response_time,
context=error_text)
elif error_type == "message":
absence_flags = net_info.get("errorMsg")
is_absence_flags_list = isinstance(absence_flags, list)
absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags}
# Checks if the error message is in the HTML
is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set])
if not is_absence_detected and is_presense_detected:
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
elif error_type == "status_code":
# Checks if the status code of the response is 2XX
if (not status_code >= 300 or status_code < 200) and is_presense_detected:
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
elif error_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
# match the request. Instead, we will ensure that the response
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown Error Type '{error_type}' for "
f"site '{social_network}'")
extracted_ids_data = {}
if recursive_search and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f'Error while parsing {social_network}: {e}', exc_info=True)
if extracted_ids_data:
new_usernames = {}
for k, v in extracted_ids_data.items():
if 'username' in k:
new_usernames[v] = 'username'
if k in supported_recursive_search_ids:
new_usernames[v] = k
results_site['ids_usernames'] = new_usernames
result.ids_data = extracted_ids_data
is_similar = net_info.get('similarSearch', False)
# Notify caller about results of query.
query_notify.update(result, is_similar)
# Save status of request
results_site['status'] = result
# Save results from request
results_site['http_status'] = status_code
results_site['is_similar'] = is_similar
# results_site['response_text'] = html_text
results_site['rank'] = net_info.get('rank', 0)
if no_progressbar:
await asyncio.gather(*tasks)
else:
for f in tqdm.asyncio.tqdm.as_completed(tasks):
await f
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
await session.close()
# Notify caller that all queries are finished.
query_notify.finish()
......@@ -472,102 +517,126 @@ def timeout_check(value):
return timeout
async def site_self_check(site_name, site_data, logger):
async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=False):
query_notify = Mock()
changes = {
'disabled': False,
}
check_data = [
(site_data['username_claimed'], QueryStatus.CLAIMED),
(site_data['username_unclaimed'], QueryStatus.AVAILABLE),
]
try:
check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
]
except:
print(site.__dict__)
logger.info(f'Checking {site_name}...')
logger.info(f'Checking {site.name}...')
for username, status in check_data:
results = await maigret(
username,
{site_name: site_data},
query_notify,
logger,
timeout=30,
forced=True,
)
# don't disable entries with other ids types
if site_name not in results:
logger.info(results)
changes['disabled'] = True
continue
site_status = results[site_name]['status'].status
async with semaphore:
results_dict = await maigret(
username,
{site.name: site},
query_notify,
logger,
timeout=30,
id_type=site.type,
forced=True,
no_progressbar=True,
)
# don't disable entries with other ids types
# TODO: make normal checking
if site.name not in results_dict:
logger.info(results_dict)
changes['disabled'] = True
continue
result = results_dict[site.name]['status']
site_status = result.status
if site_status != status:
if site_status == QueryStatus.UNKNOWN:
msg = site_data.get('errorMsg')
etype = site_data.get('errorType')
logger.info(f'Error while searching {username} in {site_name}: {msg}, type {etype}')
msgs = site.absence_strs
etype = site.check_type
logger.warning(f'Error while searching {username} in {site.name}: {result.context}, {msgs}, type {etype}')
# don't disable in case of available username
if status == QueryStatus.CLAIMED:
changes['disabled'] = True
elif status == QueryStatus.CLAIMED:
logger.info(f'Not found `{username}` in {site_name}, must be claimed')
logger.warning(f'Not found `{username}` in {site.name}, must be claimed')
logger.info(results_dict[site.name])
changes['disabled'] = True
else:
logger.info(f'Found `{username}` in {site_name}, must be available')
logger.warning(f'Found `{username}` in {site.name}, must be available')
logger.info(results_dict[site.name])
changes['disabled'] = True
logger.info(f'Site {site_name} is okay')
return changes
logger.info(f'Site {site.name} checking is finished')
if changes['disabled'] != site.disabled:
site.disabled = changes['disabled']
db.update_site(site)
if not silent:
action = 'Disabled' if site.disabled else 'Enabled'
print(f'{action} site {site.name}...')
async def self_check(json_file, logger):
sites = SitesInformation(json_file)
all_sites = {}
return changes
def disabled_count(data):
return len(list(filter(lambda x: x.get('disabled', False), data)))
async def update_site_data(site_name, site_data, all_sites, logger):
updates = await site_self_check(site_name, dict(site_data), logger)
all_sites[site_name].update(updates)
async def self_check(db: MaigretDatabase, site_data: dict, logger, silent=False) -> bool:
sem = asyncio.Semaphore(10)
tasks = []
all_sites = site_data
for site in sites:
all_sites[site.name] = site.information
def disabled_count(lst):
return len(list(filter(lambda x: x.disabled, lst)))
disabled_old_count = disabled_count(all_sites.values())
tasks = []
for site_name, site_data in all_sites.items():
future = asyncio.ensure_future(update_site_data(site_name, site_data, all_sites, logger))
for _, site in all_sites.items():
check_coro = site_self_check(site, logger, sem, db, silent)
future = asyncio.ensure_future(check_coro)
tasks.append(future)
await asyncio.gather(*tasks)
for f in tqdm.asyncio.tqdm.as_completed(tasks):
await f
disabled_new_count = disabled_count(all_sites.values())
total_disabled = disabled_new_count - disabled_old_count
if total_disabled > 0:
if total_disabled >= 0:
message = 'Disabled'
else:
message = 'Enabled'
total_disabled *= -1
print(f'{message} {total_disabled} checked sites. Run with `--info` flag to get more information')
with open(json_file, 'w') as f:
json.dump(all_sites, f, indent=4)
if not silent:
print(f'{message} {total_disabled} ({disabled_old_count} => {disabled_new_count}) checked sites. Run with `--info` flag to get more information')
return total_disabled != 0
async def main():
version_string = f"%(prog)s {__version__}\n" + \
f"{requests.__description__}: {requests.__version__}\n" + \
f"Python: {platform.python_version()}"
version_string = '\n'.join([
f'%(prog)s {__version__}',
f'Socid-extractor: {socid_version}',
f'Aiohttp: {aiohttp.__version__}',
f'Requests: {requests.__version__}',
f'Python: {platform.python_version()}',
])
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
description=f"Maigret v{__version__})"
description=f"Maigret v{__version__}"
)
parser.add_argument("--version",
action="version", version=version_string,
help="Display version information and dependencies."
)
parser.add_argument("--info",
parser.add_argument("--info", "-vv",
action="store_true", dest="info", default=False,
help="Display service information."
)
......@@ -575,26 +644,13 @@ async def main():
action="store_true", dest="verbose", default=False,
help="Display extra information and metrics."
)
parser.add_argument("-d", "--debug",
parser.add_argument("-d", "--debug", "-vvv",
action="store_true", dest="debug", default=False,
help="Saving debugging information and sites responses in debug.txt."
)
parser.add_argument("--rank", "-r",
action="store_true", dest="rank", default=False,
help="Present websites ordered by their Alexa.com global rank in popularity.")
parser.add_argument("--folderoutput", "-fo", dest="folderoutput",
help="If using multiple usernames, the output of the results will be saved to this folder."
)
parser.add_argument("--output", "-o", dest="output",
help="If using single username, the output of the result will be saved to this file."
)
parser.add_argument("--csv",
action="store_true", dest="csv", default=False,
help="Create Comma-Separated Values (CSV) File."
)
parser.add_argument("--site",
action="append", metavar='SITE_NAME',
dest="site_list", default=None,
dest="site_list", default=[],
help="Limit analysis to just the listed sites (use several times to specify more than one)"
)
parser.add_argument("--proxy", "-p", metavar='PROXY_URL',
......@@ -612,6 +668,19 @@ async def main():
"A longer timeout will be more likely to get results from slow sites."
"On the other hand, this may cause a long delay to gather all results."
)
parser.add_argument("-n", "--max-connections",
action="store", type=int,
dest="connections", default=100,
help="Allowed number of concurrent connections."
)
parser.add_argument("-a", "--all-sites",
action="store_true", dest="all_sites", default=False,
help="Use all sites for scan."
)
parser.add_argument("--top-sites",
action="store", default=500, type=int,
help="Count of sites for scan ranked by Alexa Top (default: 500)."
)
parser.add_argument("--print-not-found",
action="store_true", dest="print_not_found", default=False,
help="Print sites where the username was not found."
......@@ -644,6 +713,10 @@ async def main():
dest="parse_url", default='',
help="Parse page by URL and extract username and IDs to use for search."
)
parser.add_argument("--id-type",
dest="id_type", default='username',
help="Specify identifier(s) type (default: username)."
)
parser.add_argument("username",
nargs='+', metavar='USERNAMES',
action="store",
......@@ -653,14 +726,41 @@ async def main():
dest="tags", default='',
help="Specify tags of sites."
)
# reports options
parser.add_argument("--folderoutput", "-fo", dest="folderoutput", default="reports",
help="If using multiple usernames, the output of the results will be saved to this folder."
)
parser.add_argument("-T", "--txt",
action="store_true", dest="txt", default=False,
help="Create a TXT report (one report per username)."
)
parser.add_argument("-C", "--csv",
action="store_true", dest="csv", default=False,
help="Create a CSV report (one report per username)."
)
parser.add_argument("-H", "--html",
action="store_true", dest="html", default=False,
help="Create an HTML report file (general report on all usernames)."
)
parser.add_argument("-X","--xmind",
action="store_true",
dest="xmind", default=False,
help="Generate an XMind 8 mindmap report (one report per username)."
)
parser.add_argument("-P", "--pdf",
action="store_true",
dest="pdf", default=False,
help="Generate a PDF report (general report on all usernames)."
)
args = parser.parse_args()
# Logging
# Logging
log_level = logging.ERROR
logging.basicConfig(
format='[%(filename)s:%(lineno)d] %(levelname)-3s %(asctime)s %(message)s',
datefmt='%H:%M:%S',
level=logging.ERROR
level=log_level
)
if args.debug:
......@@ -675,7 +775,7 @@ async def main():
# Usernames initial list
usernames = {
u: 'username'
u: args.id_type
for u in args.username
if u not in ['-']
}
......@@ -686,16 +786,6 @@ async def main():
if args.proxy is not None:
print("Using the proxy: " + args.proxy)
# Check if both output methods are entered as input.
if args.output is not None and args.folderoutput is not None:
print("You can only use one of the output methods.")
sys.exit(1)
# Check validity for single username output.
if args.output is not None and len(args.username) != 1:
print("You can only use --output with a single username")
sys.exit(1)
if args.parse_url:
page, _ = parse(args.parse_url, cookies_str='')
info = extract(page)
......@@ -708,7 +798,7 @@ async def main():
usernames[v] = k
if args.tags:
args.tags = set(str(args.tags).split(','))
args.tags = list(set(str(args.tags).split(',')))
if args.json_file is None:
args.json_file = \
......@@ -716,59 +806,49 @@ async def main():
"resources/data.json"
)
# Database self-checking
if args.self_check:
print('Maigret sites database self-checking...')
await self_check(args.json_file, logger)
if args.top_sites == 0 or args.all_sites:
args.top_sites = sys.maxsize
# Create object with all information about sites we are aware of.
try:
sites = SitesInformation(args.json_file)
db = MaigretDatabase().load_from_file(args.json_file)
site_data = db.ranked_sites_dict(top=args.top_sites, tags=args.tags, names=args.site_list)
except Exception as error:
print(f"ERROR: {error}")
sys.exit(1)
# Create original dictionary from SitesInformation() object.
# Eventually, the rest of the code will be updated to use the new object
# directly, but this will glue the two pieces together.
site_data_all = {}
for site in sites:
site_data_all[site.name] = site.information
# Database self-checking
if args.self_check:
print('Maigret sites database self-checking...')
is_need_update = await self_check(db, site_data, logger)
if is_need_update:
if input('Do you want to save changes permanently? [yYnN]\n').lower() == 'y':
db.save_to_file(args.json_file)
print('Database was successfully updated.')
else:
print('Updates will be applied only for current search session.')
print(db.get_stats(site_data))
if args.site_list is None:
# Not desired to look at a sub-set of sites
site_data = site_data_all
else:
# User desires to selectively run queries on a sub-set of the site list.
# Make sure that the sites are supported & build up pruned site database.
site_data = {}
site_missing = []
for site in args.site_list:
for existing_site in site_data_all:
if site.lower() == existing_site.lower():
site_data[existing_site] = site_data_all[existing_site]
if not site_data:
# Build up list of sites not supported for future error message.
site_missing.append(f"'{site}'")
if site_missing:
print(
f"Error: Desired sites not found: {', '.join(site_missing)}.")
sys.exit(1)
if args.rank:
# Sort data by rank
site_dataCpy = dict(site_data)
ranked_sites = sorted(site_data, key=lambda k: ("rank" not in k, site_data[k].get("rank", sys.maxsize)))
site_data = {}
for site in ranked_sites:
site_data[site] = site_dataCpy.get(site)
# Make reports folder is not exists
os.makedirs(args.folderoutput, exist_ok=True)
report_path = args.folderoutput
# Define one report filename template
report_filepath_tpl = os.path.join(args.folderoutput, 'report_{username}{postfix}')
# Database consistency
enabled_count = len(list(filter(lambda x: not x.get('disabled', False), site_data.values())))
enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
if not enabled_count:
print('No sites to check, exiting!')
sys.exit(2)
if usernames == ['-']:
# magic params to exit after init
print('No usernames to check, exiting.')
sys.exit(0)
# Create notify object for query results.
query_notify = QueryNotifyPrint(result=None,
verbose=args.verbose,
......@@ -778,6 +858,8 @@ async def main():
already_checked = set()
general_results = []
while usernames:
username, id_type = list(usernames.items())[0]
del usernames[username]
......@@ -796,77 +878,76 @@ async def main():
continue
results = await maigret(username,
site_data,
dict(site_data),
query_notify,
proxy=args.proxy,
timeout=args.timeout,
recursive_search=recursive_search_enabled,
id_type=id_type,
tags=args.tags,
debug=args.verbose,
logger=logger,
forced=args.use_disabled_sites,
max_connections=args.connections,
)
if args.output:
result_file = args.output
elif args.folderoutput:
# The usernames results should be stored in a targeted folder.
# If the folder doesn't exist, create it first
os.makedirs(args.folderoutput, exist_ok=True)
result_file = os.path.join(args.folderoutput, f"{username}.txt")
else:
result_file = f"{username}.txt"
with open(result_file, "w", encoding="utf-8") as file:
exists_counter = 0
for website_name in results:
dictionary = results[website_name]
username_result = (username, id_type, results)
general_results.append((username, id_type, results))
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():
usernames[u] = utype
# TODO: tests
for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary:
continue
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():
usernames[u] = utype
if dictionary.get("status").status == QueryStatus.CLAIMED:
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
file.write(f"Total Websites Username Detected On : {exists_counter}")
# reporting for a one username
if args.xmind:
filename = report_filepath_tpl.format(username=username, postfix='.xmind')
save_xmind_report(filename, username, results)
print(f'XMind report for {username} saved in {filename}')
if args.csv:
with open(username + ".csv", "w", newline='', encoding="utf-8") as csv_report:
writer = csv.writer(csv_report)
writer.writerow(['username',
'name',
'url_main',
'url_user',
'exists',
'http_status',
'response_time_s'
]
)
for site in results:
response_time_s = results[site]['status'].query_time
if response_time_s is None:
response_time_s = ""
writer.writerow([username,
site,
results[site]['url_main'],
results[site]['url_user'],
str(results[site]['status'].status),
results[site]['http_status'],
response_time_s
]
)
filename = report_filepath_tpl.format(username=username, postfix='.csv')
save_csv_report(filename, username, results)
print(f'CSV report for {username} saved in {filename}')
if args.txt:
filename = report_filepath_tpl.format(username=username, postfix='.txt')
save_txt_report(filename, username, results)
print(f'TXT report for {username} saved in {filename}')
# reporting for all the result
if general_results:
if args.html or args.pdf:
print('Generating report info...')
report_context = generate_report_context(general_results)
# determine main username
username = report_context['username']
if args.html:
filename = report_filepath_tpl.format(username=username, postfix='.html')
save_html_report(filename, report_context)
print(f'HTML report on all usernames saved in {filename}')
if args.pdf:
filename = report_filepath_tpl.format(username=username, postfix='.pdf')
save_pdf_report(filename, report_context)
print(f'PDF report on all usernames saved in {filename}')
# update database
db.save_to_file(args.json_file)
def run():
try:
asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Maigret is interrupted.')
sys.exit(1)
if __name__ == "__main__":
run()
\ No newline at end of file
......@@ -3,7 +3,9 @@
This module defines the objects for notifying the caller about the
results of queries.
"""
import sys
from colorama import Fore, Style, init
from .result import QueryStatus
......@@ -265,6 +267,7 @@ class QueryNotifyPrint(QueryNotify):
f"site '{self.result.site_name}'")
if notify:
sys.stdout.write('\x1b[1K\r')
print(notify)
return
......
import csv
import io
import logging
import os
import pycountry
import xmind
from datetime import datetime
from jinja2 import Template
from xhtml2pdf import pisa
from dateutil.parser import parse as parse_datetime_str
from .result import QueryStatus
from .utils import is_country_tag, CaseConverter, enrich_link_str
'''
UTILS
'''
def filter_supposed_data(data):
### interesting fields
allowed_fields = ['fullname', 'gender', 'location', 'age']
filtered_supposed_data = {CaseConverter.snake_to_title(k): v[0]
for k, v in data.items()
if k in allowed_fields}
return filtered_supposed_data
'''
REPORTS SAVING
'''
def save_csv_report(filename: str, username: str, results: dict):
with open(filename, 'w', newline='', encoding='utf-8') as f:
generate_csv_report(username, results, f)
def save_txt_report(filename: str, username: str, results: dict):
with open(filename, 'w', encoding='utf-8') as f:
generate_txt_report(username, results, f)
def save_html_report(filename: str, context: dict):
template, _ = generate_report_template(is_pdf=False)
filled_template = template.render(**context)
with open(filename, 'w') as f:
f.write(filled_template)
def save_pdf_report(filename: str, context: dict):
template, css = generate_report_template(is_pdf=True)
filled_template = template.render(**context)
with open(filename, 'w+b') as f:
pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css)
'''
REPORTS GENERATING
'''
def generate_report_template(is_pdf: bool):
"""
HTML/PDF template generation
"""
def get_resource_content(filename):
return open(os.path.join(maigret_path, 'resources', filename)).read()
maigret_path = os.path.dirname(os.path.realpath(__file__))
if is_pdf:
template_content = get_resource_content('simple_report_pdf.tpl')
css_content = get_resource_content('simple_report_pdf.css')
else:
template_content = get_resource_content('simple_report.tpl')
css_content = None
template = Template(template_content)
template.globals['title'] = CaseConverter.snake_to_title
template.globals['detect_link'] = enrich_link_str
return template, css_content
def generate_report_context(username_results: list):
brief_text = []
usernames = {}
extended_info_count = 0
tags = {}
supposed_data = {}
first_seen = None
for username, id_type, results in username_results:
found_accounts = 0
new_ids = []
usernames[username] = {'type': id_type}
for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary:
continue
if dictionary.get('is_similar'):
continue
status = dictionary.get('status')
if status.ids_data:
dictionary['ids_data'] = status.ids_data
extended_info_count += 1
# detect first seen
created_at = status.ids_data.get('created_at')
if created_at:
if first_seen is None:
first_seen = created_at
else:
try:
known_time = parse_datetime_str(first_seen)
new_time = parse_datetime_str(created_at)
if new_time < known_time:
first_seen = created_at
except:
logging.debug('Problems with converting datetime %s/%s', first_seen, created_at)
for k, v in status.ids_data.items():
# suppose target data
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
# suppose country
if k in ['country', 'locale']:
try:
if is_country_tag(k):
tag = pycountry.countries.get(alpha_2=v).alpha_2.lower()
else:
tag = pycountry.countries.search_fuzzy(v)[0].alpha_2.lower()
# TODO: move countries to another struct
tags[tag] = tags.get(tag, 0) + 1
except Exception as e:
logging.debug('pycountry exception', exc_info=True)
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():
if not u in usernames:
new_ids.append((u, utype))
usernames[u] = {'type': utype}
if status.status == QueryStatus.CLAIMED:
found_accounts += 1
dictionary['found'] = True
else:
continue
# ignore non-exact search results
if status.tags:
for t in status.tags:
tags[t] = tags.get(t, 0) + 1
brief_text.append(f'Search by {id_type} {username} returned {found_accounts} accounts.')
if new_ids:
ids_list = []
for u, t in new_ids:
ids_list.append(f'{u} ({t})' if t != 'username' else u)
brief_text.append(f'Found target\'s other IDs: ' + ', '.join(ids_list) + '.')
brief_text.append(f'Extended info extracted from {extended_info_count} accounts.')
brief = ' '.join(brief_text).strip()
tuple_sort = lambda d: sorted(d, key=lambda x: x[1], reverse=True)
if 'global' in tags:
# remove tag 'global' useless for country detection
del tags['global']
first_username = username_results[0][0]
countries_lists = list(filter(lambda x: is_country_tag(x[0]), tags.items()))
interests_list = list(filter(lambda x: not is_country_tag(x[0]), tags.items()))
filtered_supposed_data = filter_supposed_data(supposed_data)
return {
'username': first_username,
'brief': brief,
'results': username_results,
'first_seen': first_seen,
'interests_tuple_list': tuple_sort(interests_list),
'countries_tuple_list': tuple_sort(countries_lists),
'supposed_data': filtered_supposed_data,
'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
}
def generate_csv_report(username: str, results: dict, csvfile):
writer = csv.writer(csvfile)
writer.writerow(['username',
'name',
'url_main',
'url_user',
'exists',
'http_status'
]
)
for site in results:
writer.writerow([username,
site,
results[site]['url_main'],
results[site]['url_user'],
str(results[site]['status'].status),
results[site]['http_status'],
])
def generate_txt_report(username: str, results: dict, file):
exists_counter = 0
for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary:
continue
if dictionary.get("status").status == QueryStatus.CLAIMED:
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
file.write(f'Total Websites Username Detected On : {exists_counter}')
'''
XMIND 8 Functions
'''
def save_xmind_report(filename, username, results):
if os.path.exists(filename):
os.remove(filename)
workbook = xmind.load(filename)
sheet = workbook.getPrimarySheet()
design_sheet(sheet, username, results)
xmind.save(workbook, path=filename)
def design_sheet(sheet, username, results):
##all tag list
alltags = {}
supposed_data = {}
sheet.setTitle("%s Analysis"%(username))
root_topic1 = sheet.getRootTopic()
root_topic1.setTitle("%s"%(username))
undefinedsection = root_topic1.addSubTopic()
undefinedsection.setTitle("Undefined")
alltags["undefined"] = undefinedsection
for website_name in results:
dictionary = results[website_name]
if dictionary.get("status").status == QueryStatus.CLAIMED:
## firsttime I found that entry
for tag in dictionary.get("status").tags:
if tag.strip() == "":
continue
if tag not in alltags.keys():
if not is_country_tag(tag):
tagsection = root_topic1.addSubTopic()
tagsection.setTitle(tag)
alltags[tag] = tagsection
category = None
for tag in dictionary.get("status").tags:
if tag.strip() == "":
continue
if not is_country_tag(tag):
category = tag
if category is None:
userlink = undefinedsection.addSubTopic()
userlink.addLabel(dictionary.get("status").site_url_user)
else:
userlink = alltags[category].addSubTopic()
userlink.addLabel(dictionary.get("status").site_url_user)
if dictionary.get("status").ids_data:
for k, v in dictionary.get("status").ids_data.items():
# suppose target data
if not isinstance(v, list):
currentsublabel = userlink.addSubTopic()
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
supposed_data[field] = []
supposed_data[field].append(v)
currentsublabel.setTitle("%s: %s" % (k, v))
else:
for currentval in v:
currentsublabel = userlink.addSubTopic()
field = 'fullname' if k == 'name' else k
if not field in supposed_data:
supposed_data[field] = []
supposed_data[field].append(currentval)
currentsublabel.setTitle("%s: %s" % (k, currentval))
### Add Supposed DATA
filterede_supposed_data = filter_supposed_data(supposed_data)
if(len(filterede_supposed_data) >0):
undefinedsection = root_topic1.addSubTopic()
undefinedsection.setTitle("SUPPOSED DATA")
for k, v in filterede_supposed_data.items():
currentsublabel = undefinedsection.addSubTopic()
currentsublabel.setTitle("%s: %s" % (k, v))
This source diff could not be displayed because it is too large. You can view the blob instead.
<html>
<head>
<meta charset="utf-8" />
</head>
<meta name="viewport" content="width=device-width, initial-scale=1.0, shrink-to-fit=no" />
<title>{{ username }} -- Maigret username search report</title>
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<style>
.table td, .table th {
padding: .4rem;
}
@media print {
.pagebreak { page-break-before: always; }
}
</style>
<body>
<div class="container">
<div class="row-mb">
<div class="col-12 card-body" style="padding-bottom: 0.5rem;">
<h4 class="mb-0">
<a class="blog-header-logo text-dark" href="#">Username search report for {{ username }}</a>
</h4>
<small class="text-muted">Generated by <a href="https://github.com/soxoj/maigret">Maigret</a> at {{ generated_at }}</small>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<div class="card-body d-flex flex-column align-items-start">
<h5>Supposed personal data</h5>
{% for k, v in supposed_data.items() %}
<span>
{{ k }}: {{ v }}
</span>
{% endfor %}
{% if countries_tuple_list %}
<span>
Geo: {% for k, v in countries_tuple_list %}{{ k }} <span class="text-muted">({{ v }})</span>{{ ", " if not loop.last }}{% endfor %}
</span>
{% endif %}{% if interests_tuple_list %}
<span>
Interests: {% for k, v in interests_tuple_list %}{{ k }} <span class="text-muted">({{ v }})</span>{{ ", " if not loop.last }}{% endfor %}
</span>
{% endif %}{% if first_seen %}
<span>
First seen: {{ first_seen }}
</span>
{% endif %}
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<div class="card-body d-flex flex-column align-items-start">
<h5>Brief</h5>
<span>
{{ brief }}
</span>
</div>
</div>
</div>
</div>
{% for u, t, data in results %}
{% for k, v in data.items() %}
{% if v.found and not v.is_similar %}
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="{{ v.status.ids_data.image or 'https://i.imgur.com/040fmbw.png' }}" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="{{ v.url_main }}" target="_blank">{{ k }}</a>
</h3>
{% if v.status.tags %}
<div class="mb-1 text-muted">Tags: {{ v.status.tags | join(', ') }}</div>
{% endif %}
<p class="card-text">
<a href="{{ v.url_user }}" target="_blank">{{ v.url_user }}</a>
</p>
{% if v.ids_data %}
<table class="table table-striped">
<tbody>
{% for k1, v1 in v.ids_data.items() %}
{% if k1 != 'image' %}
<tr>
<th>{{ title(k1) }}</th>
<td>{% if v1 is iterable and (v1 is not string and v1 is not mapping) %}{{ v1 | join(', ') }}{% else %}{{ detect_link(v1) }}{% endif %}
</td>
</tr>
{% endif %}
{% endfor %}
</tbody>
</table>
{% endif %}
</p>
</div>
</div>
</div>
</div>
{% endif %}
{% endfor %}
{% endfor %}
</div>
<script src="https://code.jquery.com/jquery-3.3.1.slim.min.js" integrity="sha384-q8i/X+965DzO0rT7abK41JStQIAqVgRVzpbzo5smXKp4YfRvH+8abtTE1Pi6jizo" crossorigin="anonymous"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.14.7/umd/popper.min.js" integrity="sha384-UO2eT0CpHqdSJQ6hJty5KVphtPhzWj9WO1clHTMGa3JDZwrnQq4sF86dIHNDz0W1" crossorigin="anonymous"></script>
<script src="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js" integrity="sha384-JjSmVgyd0p3pXB1rRibZUAYoIIy6OrQ6VrjIEaFf/nJGzIxFDsf4x0xIM+B07jRM" crossorigin="anonymous"></script>
</html>
\ No newline at end of file
h2 {
font-size: 30px;
width: 100%;
display:block;
}
h3 {
font-size: 25px;
width: 100%;
display:block;
}
h4 {
font-size: 20px;
width: 100%;
display:block;
}
p {
margin: 0 0 5px;
display: block;
}
table {
margin-bottom: 10px;
width:100%;
}
th {
font-weight: bold;
}
th,td,caption {
padding: 4px 10px 4px 5px;
}
table tr:nth-child(even) td,
table tr.even td {
background-color: #e5ecf9;
}
div {
border-bottom-color: #3e3e3e;
border-bottom-width: 1px;
border-bottom-style: solid;
}
\ No newline at end of file
<html>type="text/css"
<head>
<meta charset="utf-8" />
</head>
<meta name="viewport" content="width=device-width, initial-scale=1.0, shrink-to-fit=no" />
<title>{{ username }} -- Maigret username search report</title>
<body>
<div class="container">
<div class="row-mb">
<div class="col-12 card-body" style="padding-bottom: 0.5rem; width:100%">
<h2 class="mb-0">
Username search report for {{ username }}
</h2>
<small>Generated by <a href="https://github.com/soxoj/maigret">Maigret</a> at {{ generated_at }}</small>
</div>
</div>
<br/><br/>
<div>
<div>
<div>
<div>
<h3>Supposed personal data</h3>
{% for k, v in supposed_data.items() %}
<p>
{{ k }}: {{ v }}
</p>
{% endfor %}
{% if countries_tuple_list %}
<p>
Geo: {% for k, v in countries_tuple_list %}{{ k }} <span class="text-muted">({{ v }})</span>{{ ", " if not loop.last }}{% endfor %}
</p>
{% endif %}{% if interests_tuple_list %}
<p>
Interests: {% for k, v in interests_tuple_list %}{{ k }} <span class="text-muted">({{ v }})</span>{{ ", " if not loop.last }}{% endfor %}
</p>
{% endif %}{% if first_seen %}
<p>
First seen: {{ first_seen }}
</p>
{% endif %}
</div>
</div>
</div>
</div>
<br/>
<div>
<div>
<div>
<div>
<h3>Brief</h3>
<p>
{{ brief }}
</p>
</div>
</div>
</div>
</div>
{% for u, t, data in results %}
{% for k, v in data.items() %}
{% if v.found and not v.is_similar %}
<split></split>
<hr>
<br/>
<div class="sitebox" style="margin-top: 20px;" >
<div>
<div>
<table>
<tr>
<td valign="top">
<div class="textbox" style="padding-top: 10px;" >
<h3>
<a class="text-dark" href="{{ v.url_main }}" target="_blank">{{ k }}</a>
</h3>
{% if v.status.tags %}
<div class="mb-1 text-muted">Tags: {{ v.status.tags | join(', ') }}</div>
{% endif %}
<p class="card-text">
<a href="{{ v.url_user }}" target="_blank">{{ v.url_user }}</a>
</p>
</div>
{% if v.ids_data %}
<div style="clear:both;"></div>
<div style="width:100%">
<br/>
<h4>Details</h4>
<table class="table table-striped;" style="margin-top:5px;">
<tbody>
{% for k1, v1 in v.ids_data.items() %}
{% if k1 != 'image' %}
<tr>
<th style="width:200px;">{{ title(k1) }}</th>
<td>{% if v1 is iterable and (v1 is not string and v1 is not mapping) %}{{ v1 | join(', ') }}{% else %}{{ detect_link(v1) }}{% endif %}</td>
</tr>
{% endif %}
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
</td>
<td style="width:201px; position: relative;" valign="top">
<img alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="{{ v.status.ids_data.image or 'https://i.imgur.com/040fmbw.png' }}" data-holder-rendered="true">
</td>
</tr>
</table>
</div>
</div>
</div>
{% endif %}
{% endfor %}
{% endfor %}
</div>
</body>
</html>
\ No newline at end of file
......@@ -34,7 +34,7 @@ class QueryResult():
"""
def __init__(self, username, site_name, site_url_user, status, ids_data=None,
query_time=None, context=None):
query_time=None, context=None, tags=[]):
"""Create Query Result Object.
Contains information about a specific method of detecting usernames on
......@@ -72,8 +72,8 @@ class QueryResult():
self.query_time = query_time
self.context = context
self.ids_data = ids_data
self.tags = tags
return
def __str__(self):
"""Convert Object To String.
......
"""Sherlock Sites Information Module
This module supports storing information about web sites.
This is the raw data that will be used to search for usernames.
"""
# -*- coding: future_annotations -*-
"""Maigret Sites Information"""
import copy
import json
import operator
import requests
import sys
import requests
from .utils import CaseConverter
class MaigretEngine:
def __init__(self, name, data):
self.name = name
self.site = {}
self.__dict__.update(data)
@property
def json(self):
return self.__dict__
class SiteInformation():
def __init__(self, name, url_home, url_username_format, popularity_rank,
username_claimed, username_unclaimed,
information):
"""Create Site Information Object.
Contains information about a specific web site.
Keyword Arguments:
self -- This object.
name -- String which identifies site.
url_home -- String containing URL for home of site.
url_username_format -- String containing URL for Username format
on site.
NOTE: The string should contain the
token "{}" where the username should
be substituted. For example, a string
of "https://somesite.com/users/{}"
indicates that the individual
usernames would show up under the
"https://somesite.com/users/" area of
the web site.
popularity_rank -- Integer indicating popularity of site.
In general, smaller numbers mean more
popular ("0" or None means ranking
information not available).
username_claimed -- String containing username which is known
to be claimed on web site.
username_unclaimed -- String containing username which is known
to be unclaimed on web site.
information -- Dictionary containing all known information
about web site.
NOTE: Custom information about how to
actually detect the existence of the
username will be included in this
dictionary. This information will
be needed by the detection method,
but it is only recorded in this
object for future use.
Return Value:
Nothing.
"""
class MaigretSite:
def __init__(self, name, information):
self.name = name
self.url_home = url_home
self.url_username_format = url_username_format
if (popularity_rank is None) or (popularity_rank == 0):
self.disabled = False
self.similar_search = False
self.ignore_403 = False
self.tags = []
self.type = 'username'
self.headers = {}
self.errors = {}
self.activation = {}
self.url_subpath = ''
self.regex_check = None
self.url_probe = None
self.check_type = ''
self.request_head_only = ''
self.presense_strs = []
self.absence_strs = []
self.stats = {}
self.engine = None
self.engine_data = {}
self.engine_obj = None
self.request_future = None
self.alexa_rank = None
for k, v in information.items():
self.__dict__[CaseConverter.camel_to_snake(k)] = v
if (self.alexa_rank is None) or (self.alexa_rank == 0):
# We do not know the popularity, so make site go to bottom of list.
popularity_rank = sys.maxsize
self.popularity_rank = popularity_rank
self.alexa_rank = sys.maxsize
self.username_claimed = username_claimed
self.username_unclaimed = username_unclaimed
self.information = information
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
return f"{self.name} ({self.url_main})"
@property
def json(self):
result = {}
for k, v in self.__dict__.items():
# convert to camelCase
field = CaseConverter.snake_to_camel(k)
# strip empty elements
if v in (False, '', [], {}, None, sys.maxsize, 'username'):
continue
if field in ['name', 'engineData', 'requestFuture', 'detectedEngine', 'engineObj', 'stats']:
continue
result[field] = v
return result
def update(self, updates: dict) -> MaigretSite:
self.__dict__.update(updates)
return self
def update_from_engine(self, engine: MaigretEngine) -> MaigretSite:
engine_data = engine.site
for k, v in engine_data.items():
field = CaseConverter.camel_to_snake(k)
if isinstance(v, dict):
# TODO: assertion of intersecting keys
# update dicts like errors
self.__dict__.get(field, {}).update(v)
elif isinstance(v, list):
self.__dict__[field] = self.__dict__.get(field, []) + v
else:
self.__dict__[field] = v
self.engine_obj = engine
return self
def strip_engine_data(self) -> MaigretSite:
if not self.engine_obj:
return self
self.request_future = None
self_copy = copy.deepcopy(self)
engine_data = self_copy.engine_obj.site
site_data_keys = list(self_copy.__dict__.keys())
for k in engine_data.keys():
field = CaseConverter.camel_to_snake(k)
is_exists = field in site_data_keys
# remove dict keys
if isinstance(engine_data[k], dict) and is_exists:
for f in engine_data[k].keys():
del self_copy.__dict__[field][f]
continue
# remove list items
if isinstance(engine_data[k], list) and is_exists:
for f in engine_data[k]:
self_copy.__dict__[field].remove(f)
continue
if is_exists:
del self_copy.__dict__[field]
return self_copy
class MaigretDatabase:
def __init__(self):
self._sites = []
self._engines = []
@property
def sites(self):
return self._sites
@property
def sites_dict(self):
return {site.name: site for site in self._sites}
def ranked_sites_dict(self, reverse=False, top=sys.maxsize, tags=[], names=[]):
normalized_names = list(map(str.lower, names))
normalized_tags = list(map(str.lower, tags))
is_tags_ok = lambda x: set(x.tags).intersection(set(normalized_tags))
is_name_ok = lambda x: x.name.lower() in normalized_names
is_engine_ok = lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags
if not tags and not names:
filtered_list = self.sites
else:
filtered_list = [s for s in self.sites if is_tags_ok(s) or is_name_ok(s) or is_engine_ok(s)]
Return Value:
Nicely formatted string to get information about this object.
"""
sorted_list = sorted(filtered_list, key=lambda x: x.alexa_rank, reverse=reverse)[:top]
return {site.name: site for site in sorted_list}
return f"{self.name} ({self.url_home})"
@property
def engines(self):
return self._engines
@property
def engines_dict(self):
return {engine.name: engine for engine in self._engines}
class SitesInformation():
def __init__(self, data_file_path=None):
"""Create Sites Information Object.
def update_site(self, site: MaigretSite) -> MaigretDatabase:
for s in self._sites:
if s.name == site.name:
s = site
return self
Contains information about all supported web sites.
self._sites.append(site)
return self
Keyword Arguments:
self -- This object.
data_file_path -- String which indicates path to data file.
The file name must end in ".json".
def save_to_file(self, filename: str) -> MaigretDatabase:
db_data = {
'sites': {site.name: site.strip_engine_data().json for site in self._sites},
'engines': {engine.name: engine.json for engine in self._engines},
}
There are 3 possible formats:
* Absolute File Format
For example, "c:/stuff/data.json".
* Relative File Format
The current working directory is used
as the context.
For example, "data.json".
* URL Format
For example,
"https://example.com/data.json", or
"http://example.com/data.json".
json_data = json.dumps(db_data, indent=4)
An exception will be thrown if the path
to the data file is not in the expected
format, or if there was any problem loading
the file.
with open(filename, 'w') as f:
f.write(json_data)
If this option is not specified, then a
default site list will be used.
return self
Return Value:
Nothing.
"""
# Ensure that specified data file has correct extension.
if ".json" != data_file_path[-5:].lower():
raise FileNotFoundError(f"Incorrect JSON file extension for "
f"data file '{data_file_path}'."
)
def load_from_json(self, json_data: dict) -> MaigretDatabase:
# Add all of site information from the json file to internal site list.
site_data = json_data.get("sites", {})
engines_data = json_data.get("engines", {})
if (("http://" == data_file_path[:7].lower()) or
("https://" == data_file_path[:8].lower())
):
# Reference is to a URL.
try:
response = requests.get(url=data_file_path)
except Exception as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file URL '{data_file_path}': "
f"{str(error)}"
)
if response.status_code == 200:
try:
site_data = response.json()
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
else:
raise FileNotFoundError(f"Bad response while accessing "
f"data file URL '{data_file_path}'."
)
else:
# Reference is to a file.
try:
with open(data_file_path, "r", encoding="utf-8") as file:
try:
data = json.load(file)
site_data = data.get("sites")
engines_data = data.get("engines")
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
except FileNotFoundError as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file '{data_file_path}'."
)
self.sites = {}
for engine_name in engines_data:
self._engines.append(MaigretEngine(engine_name, engines_data[engine_name]))
# Add all of site information from the json file to internal site list.
for site_name in site_data:
try:
site = {}
site_user_info = site_data[site_name]
# If popularity unknown, make site be at bottom of list.
popularity_rank = site_user_info.get("rank", sys.maxsize)
if 'engine' in site_user_info:
engine_info = engines_data[site_user_info['engine']]['site']
site.update(engine_info)
site.update(site_user_info)
self.sites[site_name] = \
SiteInformation(site_name,
site["urlMain"],
site["url"],
popularity_rank,
site["username_claimed"],
site["username_unclaimed"],
site
)
maigret_site = MaigretSite(site_name, site_data[site_name])
engine = site_data[site_name].get('engine')
if engine:
maigret_site.update_from_engine(self.engines_dict[engine])
self._sites.append(maigret_site)
except KeyError as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': "
raise ValueError(f"Problem parsing json content for site {site_name}: "
f"Missing attribute {str(error)}."
)
return
def site_name_list(self, popularity_rank=False):
"""Get Site Name List.
Keyword Arguments:
self -- This object.
popularity_rank -- Boolean indicating if list should be sorted
by popularity rank.
Default value is False.
NOTE: List is sorted in ascending
alphabetical order is popularity rank
is not requested.
Return Value:
List of strings containing names of sites.
"""
if popularity_rank:
# Sort in ascending popularity rank order.
site_rank_name = \
sorted([(site.popularity_rank, site.name) for site in self],
key=operator.itemgetter(0)
)
site_names = [name for _, name in site_rank_name]
else:
# Sort in ascending alphabetical order.
site_names = sorted([site.name for site in self], key=str.lower)
return self
def load_from_str(self, db_str: str) -> MaigretDatabase:
try:
data = json.loads(db_str)
except Exception as error:
raise ValueError(f"Problem parsing json contents from str"
f"'{db_str[:50]}'...: {str(error)}."
)
return site_names
return self.load_from_json(data)
def __iter__(self):
"""Iterator For Object.
Keyword Arguments:
self -- This object.
def load_from_url(self, url: str) -> MaigretDatabase:
is_url_valid = url.startswith('http://') or url.startswith('https://')
Return Value:
Iterator for sites object.
"""
if not is_url_valid:
raise FileNotFoundError(f"Invalid data file URL '{url}'.")
for site_name in self.sites:
yield self.sites[site_name]
try:
response = requests.get(url=url)
except Exception as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file URL '{url}': "
f"{str(error)}"
)
if response.status_code == 200:
try:
data = response.json()
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{url}': {str(error)}."
)
else:
raise FileNotFoundError(f"Bad response while accessing "
f"data file URL '{url}'."
)
return self.load_from_json(data)
def load_from_file(self, filename: str) -> MaigretDatabase:
try:
with open(filename, 'r', encoding='utf-8') as file:
try:
data = json.load(file)
except Exception as error:
raise ValueError(f"Problem parsing json contents from "
f"file '{filename}': {str(error)}."
)
except FileNotFoundError as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file '{filename}'."
)
def __len__(self):
"""Length For Object.
return self.load_from_json(data)
Keyword Arguments:
self -- This object.
def get_stats(self, sites_dict):
sites = sites_dict or self.sites_dict
found_flags = {}
for _, s in sites.items():
if 'presense_flag' in s.stats:
flag = s.stats['presense_flag']
found_flags[flag] = found_flags.get(flag, 0) + 1
Return Value:
Length of sites object.
"""
return len(self.sites)
return found_flags
import re
class CaseConverter:
@staticmethod
def camel_to_snake(camelcased_string: str) -> str:
return re.sub(r'(?<!^)(?=[A-Z])', '_', camelcased_string).lower()
@staticmethod
def snake_to_camel(snakecased_string: str) -> str:
formatted = ''.join(word.title() for word in snakecased_string.split('_'))
result = formatted[0].lower() + formatted[1:]
return result
@staticmethod
def snake_to_title(snakecased_string: str) -> str:
words = snakecased_string.split('_')
words[0] = words[0].title()
return ' '.join(words)
def is_country_tag(tag: str) -> bool:
"""detect if tag represent a country"""
return bool(re.match("^([a-zA-Z]){2}$", tag)) or tag == 'global'
def enrich_link_str(link: str) -> str:
link = link.strip()
if link.startswith('www.') or (link.startswith('http') and '//' in link):
return f'<a class="auto-link" href="{link}">{link}</a>'
return link
\ No newline at end of file
# pytest.ini
[pytest]
filterwarnings =
error
ignore::UserWarning
beautifulsoup4>=4.8.0
bs4>=0.0.1
certifi>=2019.6.16
colorama>=0.4.1
lxml>=4.4.0
PySocks>=1.7.0
requests>=2.22.0
requests-futures>=1.0.0
soupsieve>=1.9.2
stem>=1.8.0
torrequest>=0.1.0
socid-extractor>=0.0.2
aiohttp==3.5.4
aiohttp==3.7.3
aiohttp-socks==0.5.5
arabic-reshaper==2.1.1
async-timeout==3.0.1
attrs==20.3.0
beautifulsoup4==4.9.3
bs4==0.0.1
certifi==2020.12.5
chardet==3.0.4
colorama==0.4.4
python-dateutil==2.8.1
future==0.18.2
future-annotations==1.0.0
html5lib==1.1
idna==2.10
Jinja2==2.11.2
lxml==4.6.2
MarkupSafe==1.1.1
mock==4.0.2
multidict==5.1.0
Pillow==8.1.0
pycountry==20.7.3
PyPDF2==1.26.0
PySocks==1.7.1
python-bidi==0.4.2
python-socks==1.1.2
reportlab==3.5.59
requests==2.25.1
requests-futures==1.0.0
six==1.15.0
socid-extractor>=0.0.4
soupsieve==2.1
stem==1.8.0
torrequest==0.1.0
tqdm==4.55.0
typing-extensions==3.7.4.3
urllib3==1.26.2
webencodings==0.5.1
xhtml2pdf==0.2.5
XMind==1.2.0
yarl==1.6.3
......@@ -12,7 +12,7 @@ with open('requirements.txt') as rf:
requires = rf.read().splitlines()
setup(name='maigret',
version='0.1.6',
version='0.1.11',
description='Collect a dossier on a person by username from a huge number of sites',
long_description=long_description,
long_description_content_type="text/markdown",
......
This source diff could not be displayed because it is too large. You can view the blob instead.
<svg xmlns="http://www.w3.org/2000/svg" width="92" height="20"><linearGradient id="b" x2="0" y2="100%"><stop offset="0" stop-color="#bbb" stop-opacity=".1"/><stop offset="1" stop-opacity=".1"/></linearGradient><mask id="a"><rect width="92" height="20" fill="#fff"/></mask><g mask="url(#a)"><path fill="#555" d="M0 0h34v20H0z"/><path fill="#46BC99" d="M34 0h58v20H34z"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11"><text x="17" y="14">chat</text><text x="62" y="14">on gitter</text></g></svg>
\ No newline at end of file
<html>
<head>
<meta charset="utf-8" />
</head>
<meta name="viewport" content="width=device-width, initial-scale=1.0, shrink-to-fit=no" />
<title>alexaimephotographycars -- Maigret username search report</title>
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<style>
.table td, .table th {
padding: .4rem;
}
@media print {
.pagebreak { page-break-before: always; }
}
</style>
<body>
<div class="container">
<div class="row-mb">
<div class="col-12 card-body" style="padding-bottom: 0.5rem;">
<h4 class="mb-0">
<a class="blog-header-logo text-dark" href="#">Username search report for alexaimephotographycars</a>
</h4>
<small class="text-muted">Generated by <a href="https://github.com/soxoj/maigret">Maigret</a> at 2021-01-16 17:06:15</small>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<div class="card-body d-flex flex-column align-items-start">
<h5>Supposed personal data</h5>
<span>
Fullname: Alex Aimé
</span>
<span>
Gender: m
</span>
<span>
Location: France
</span>
<span>
Geo: fr <span class="text-muted">(3)</span>, ru <span class="text-muted">(1)</span>
</span>
<span>
Interests: photo <span class="text-muted">(6)</span>, blogs <span class="text-muted">(2)</span>, art <span class="text-muted">(2)</span>, news <span class="text-muted">(1)</span>, discussions <span class="text-muted">(1)</span>, video <span class="text-muted">(1)</span>, instagram <span class="text-muted">(1)</span>
</span>
<span>
First seen: 2017-12-06T11:49:28+00:00
</span>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<div class="card-body d-flex flex-column align-items-start">
<h5>Brief</h5>
<span>
Search by username alexaimephotographycars returned 1 accounts. Found target's other IDs: alexaimephotography, Alexaimephotogr. Search by username alexaimephotography returned 10 accounts. Found target's other IDs: AlexAimePhotography, Alexaimephotography, alexaimephotogr. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 6 accounts.
</span>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://500px.com/" target="_blank">500px</a>
</h3>
<div class="mb-1 text-muted">Tags: global, photo</div>
<p class="card-text">
<a href="https://500px.com/p/alexaimephotographycars" target="_blank">https://500px.com/p/alexaimephotographycars</a>
</p>
<table class="table table-striped">
<tbody>
<tr>
<th>Uid</th>
<td>dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==
</td>
</tr>
<tr>
<th>Legacy id</th>
<td>26403415
</td>
</tr>
<tr>
<th>Username</th>
<td>alexaimephotographycars
</td>
</tr>
<tr>
<th>Name</th>
<td>Alex Aimé
</td>
</tr>
<tr>
<th>Created at</th>
<td>2018-05-04T10:17:01.000+0000
</td>
</tr>
<tr>
<th>Image bg</th>
<td><a class="auto-link" href="https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201">https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201</a>
</td>
</tr>
<tr>
<th>Website</th>
<td><a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>
</td>
</tr>
<tr>
<th>Facebook link</th>
<td><a class="auto-link" href="www.instagram.com/street.reality.photography/">www.instagram.com/street.reality.photography/</a>
</td>
</tr>
<tr>
<th>Instagram username</th>
<td>alexaimephotography
</td>
</tr>
<tr>
<th>Twitter username</th>
<td>Alexaimephotogr
</td>
</tr>
</tbody>
</table>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://i.imgur.com/040fmbw.png" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://www.facebook.com/" target="_blank">Facebook</a>
</h3>
<div class="mb-1 text-muted">Tags: global</div>
<p class="card-text">
<a href="https://www.facebook.com/alexaimephotography" target="_blank">https://www.facebook.com/alexaimephotography</a>
</p>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://styles.redditmedia.com/t5_1nytpy/styles/profileIcon_7vmhdwzd3g931.jpg?width=256&height=256&crop=256:256,smart&frame=1&s=4f355f16b4920844a3f4eacd4237a7bf76b2e97e" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://www.reddit.com/" target="_blank">Reddit</a>
</h3>
<div class="mb-1 text-muted">Tags: news, discussions</div>
<p class="card-text">
<a href="https://www.reddit.com/user/alexaimephotography" target="_blank">https://www.reddit.com/user/alexaimephotography</a>
</p>
<table class="table table-striped">
<tbody>
<tr>
<th>Reddit id</th>
<td>t5_1nytpy
</td>
</tr>
<tr>
<th>Reddit username</th>
<td>alexaimephotography
</td>
</tr>
<tr>
<th>Fullname</th>
<td>alexaimephotography
</td>
</tr>
<tr>
<th>Is employee</th>
<td>False
</td>
</tr>
<tr>
<th>Is nsfw</th>
<td>False
</td>
</tr>
<tr>
<th>Is mod</th>
<td>True
</td>
</tr>
<tr>
<th>Is following</th>
<td>True
</td>
</tr>
<tr>
<th>Has user profile</th>
<td>True
</td>
</tr>
<tr>
<th>Hide from robots</th>
<td>False
</td>
</tr>
<tr>
<th>Created at</th>
<td>2019-07-10 12:20:03
</td>
</tr>
<tr>
<th>Total karma</th>
<td>54632
</td>
</tr>
<tr>
<th>Post karma</th>
<td>53376
</td>
</tr>
</tbody>
</table>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://i.imgur.com/040fmbw.png" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://vk.com/" target="_blank">VK</a>
</h3>
<div class="mb-1 text-muted">Tags: global, ru</div>
<p class="card-text">
<a href="https://vk.com/alexaimephotography" target="_blank">https://vk.com/alexaimephotography</a>
</p>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://i.imgur.com/040fmbw.png" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://tumblr.com/" target="_blank">Tumblr</a>
</h3>
<div class="mb-1 text-muted">Tags: blogs, global</div>
<p class="card-text">
<a href="https://alexaimephotography.tumblr.com/" target="_blank">https://alexaimephotography.tumblr.com/</a>
</p>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://s.pinimg.com/images/user/default_280.png" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://www.pinterest.com/" target="_blank">Pinterest</a>
</h3>
<div class="mb-1 text-muted">Tags: art, photo</div>
<p class="card-text">
<a href="https://www.pinterest.com/alexaimephotography/" target="_blank">https://www.pinterest.com/alexaimephotography/</a>
</p>
<table class="table table-striped">
<tbody>
<tr>
<th>Pinterest username</th>
<td>alexaimephotography
</td>
</tr>
<tr>
<th>Fullname</th>
<td>alexaimephotography
</td>
</tr>
<tr>
<th>Board count</th>
<td>3
</td>
</tr>
<tr>
<th>Pin count</th>
<td>4
</td>
</tr>
<tr>
<th>Country</th>
<td>FR
</td>
</tr>
<tr>
<th>Follower count</th>
<td>0
</td>
</tr>
<tr>
<th>Following count</th>
<td>1
</td>
</tr>
<tr>
<th>Is website verified</th>
<td>False
</td>
</tr>
<tr>
<th>Is indexed</th>
<td>True
</td>
</tr>
<tr>
<th>Is verified merchant</th>
<td>False
</td>
</tr>
<tr>
<th>Locale</th>
<td>fr
</td>
</tr>
</tbody>
</table>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://i.vimeocdn.com/portrait/22443952_360x360" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://vimeo.com/" target="_blank">Vimeo</a>
</h3>
<div class="mb-1 text-muted">Tags: global, video</div>
<p class="card-text">
<a href="https://vimeo.com/alexaimephotography" target="_blank">https://vimeo.com/alexaimephotography</a>
</p>
<table class="table table-striped">
<tbody>
<tr>
<th>Uid</th>
<td>75857717
</td>
</tr>
<tr>
<th>Gender</th>
<td>m
</td>
</tr>
<tr>
<th>Bio</th>
<td>Hello
Passionate about photography for several years. I set the video recently.
I use my Nikon d7200 and Nikkor 50mm 1.8d . Premiere Pro software.
Follow me on :
https://www.instagram.com/alexaimephotography/
https://500px.com/alexaimephotography
Bonjour
Passionné par la photographie depuis quelques années . Je me suis mis à la video depuis peu.
J'utilise mon Nikon d7200 et l'objectif Nikkor 50mm 1.8d .Comme logiciel Premiere pro cc.
Suivez moi sur :
https://www.instagram.com/alexaimephotography/
https://500px.com/alexaimephotography
</td>
</tr>
<tr>
<th>Location</th>
<td>France
</td>
</tr>
<tr>
<th>Username</th>
<td>AlexAimePhotography
</td>
</tr>
<tr>
<th>Is verified</th>
<td>True
</td>
</tr>
<tr>
<th>Created at</th>
<td>2017-12-06T11:49:28+00:00
</td>
</tr>
<tr>
<th>Videos</th>
<td>14
</td>
</tr>
<tr>
<th>Is looking for job</th>
<td>False
</td>
</tr>
<tr>
<th>Is working remotely</th>
<td>False
</td>
</tr>
</tbody>
</table>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://i.imgur.com/040fmbw.png" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://deviantart.com" target="_blank">DeviantART</a>
</h3>
<div class="mb-1 text-muted">Tags: global, photo, art</div>
<p class="card-text">
<a href="https://alexaimephotography.deviantart.com" target="_blank">https://alexaimephotography.deviantart.com</a>
</p>
<table class="table table-striped">
<tbody>
<tr>
<th>Country</th>
<td>France
</td>
</tr>
<tr>
<th>Created at</th>
<td>2018-12-09 16:02:10
</td>
</tr>
<tr>
<th>Gender</th>
<td>male
</td>
</tr>
<tr>
<th>Username</th>
<td>Alexaimephotography
</td>
</tr>
<tr>
<th>Twitter username</th>
<td>alexaimephotogr
</td>
</tr>
<tr>
<th>Website</th>
<td><a class="auto-link" href="www.instagram.com/alexaimephotography/">www.instagram.com/alexaimephotography/</a>
</td>
</tr>
<tr>
<th>Links</th>
<td>['https://www.instagram.com/alexaimephotography/']
</td>
</tr>
</tbody>
</table>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://i.imgur.com/040fmbw.png" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://www.picuki.com/" target="_blank">Picuki</a>
</h3>
<div class="mb-1 text-muted">Tags: global, instagram, photo</div>
<p class="card-text">
<a href="https://www.picuki.com/profile/alexaimephotography" target="_blank">https://www.picuki.com/profile/alexaimephotography</a>
</p>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://i.imgur.com/040fmbw.png" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://weheartit.com/" target="_blank">We Heart It</a>
</h3>
<div class="mb-1 text-muted">Tags: photo, global, blogs</div>
<p class="card-text">
<a href="https://weheartit.com/alexaimephotography" target="_blank">https://weheartit.com/alexaimephotography</a>
</p>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://i.imgur.com/040fmbw.png" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://www.eyeem.com/" target="_blank">EyeEm</a>
</h3>
<div class="mb-1 text-muted">Tags: global, photo</div>
<p class="card-text">
<a href="https://www.eyeem.com/u/alexaimephotography" target="_blank">https://www.eyeem.com/u/alexaimephotography</a>
</p>
</p>
</div>
</div>
</div>
</div>
<div class="row-mb">
<div class="col-md">
<div class="card flex-md-row mb-4 box-shadow h-md-250">
<img class="card-img-right flex-auto d-none d-md-block" alt="Photo" style="width: 200px; height: 200px; object-fit: scale-down;" src="https://pbs.twimg.com/profile_images/1089860309895049218/5DucgDw1.jpg" data-holder-rendered="true">
<div class="card-body d-flex flex-column align-items-start" style="padding-top: 0;">
<h3 class="mb-0" style="padding-top: 1rem;">
<a class="text-dark" href="https://www.twitter.com/" target="_blank">Twitter</a>
</h3>
<div class="mb-1 text-muted">Tags: global</div>
<p class="card-text">
<a href="https://twitter.com/Alexaimephotogr" target="_blank">https://twitter.com/Alexaimephotogr</a>
</p>
<table class="table table-striped">
<tbody>
<tr>
<th>Uid</th>
<td>VXNlcjo5NDYzODMzNTA3ODAxMDQ3MDQ=
</td>
</tr>
<tr>
<th>Fullname</th>
<td>AlexAimephotography
</td>
</tr>
<tr>
<th>Bio</th>
<td>Photographe amateur
New gear :
Sony A7 ii
Sony FE 85mm 1.8
</td>
</tr>
<tr>
<th>Created at</th>
<td>2017-12-28 14:12:28+00:00
</td>
</tr>
<tr>
<th>Image bg</th>
<td><a class="auto-link" href="https://pbs.twimg.com/profile_banners/946383350780104704/1548759346">https://pbs.twimg.com/profile_banners/946383350780104704/1548759346</a>
</td>
</tr>
<tr>
<th>Is protected</th>
<td>False
</td>
</tr>
<tr>
<th>Follower count</th>
<td>300
</td>
</tr>
<tr>
<th>Following count</th>
<td>76
</td>
</tr>
<tr>
<th>Location</th>
<td>France
</td>
</tr>
<tr>
<th>Favourites count</th>
<td>6408
</td>
</tr>
</tbody>
</table>
</p>
</div>
</div>
</div>
</div>
</div>
<script src="https://code.jquery.com/jquery-3.3.1.slim.min.js" integrity="sha384-q8i/X+965DzO0rT7abK41JStQIAqVgRVzpbzo5smXKp4YfRvH+8abtTE1Pi6jizo" crossorigin="anonymous"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.14.7/umd/popper.min.js" integrity="sha384-UO2eT0CpHqdSJQ6hJty5KVphtPhzWj9WO1clHTMGa3JDZwrnQq4sF86dIHNDz0W1" crossorigin="anonymous"></script>
<script src="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js" integrity="sha384-JjSmVgyd0p3pXB1rRibZUAYoIIy6OrQ6VrjIEaFf/nJGzIxFDsf4x0xIM+B07jRM" crossorigin="anonymous"></script>
</html>
\ No newline at end of file
import glob
import logging
import os
import pytest
from _pytest.mark import Mark
from mock import Mock
from maigret.sites import MaigretDatabase, MaigretSite
CUR_PATH = os.path.dirname(os.path.realpath(__file__))
JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json')
empty_mark = Mark('', [], {})
def by_slow_marker(item):
return item.get_closest_marker('slow', default=empty_mark)
def pytest_collection_modifyitems(items):
items.sort(key=by_slow_marker, reverse=False)
def get_test_reports_filenames():
return glob.glob(os.path.join('report_*'), recursive=False)
def remove_test_reports():
reports_list = get_test_reports_filenames()
for f in reports_list: os.remove(f)
logging.error(f'Removed test reports {reports_list}')
@pytest.fixture(scope='session')
def default_db():
db = MaigretDatabase().load_from_file(JSON_FILE)
return db
@pytest.fixture(autouse=True)
def reports_autoclean():
remove_test_reports()
yield
remove_test_reports()
"""Maigret activation test functions"""
import pytest
from mock import Mock
from maigret.activation import ParsingActivator
@pytest.mark.slow
def test_twitter_activation(default_db):
twitter_site = default_db.sites_dict['Twitter']
token1 = twitter_site.headers['x-guest-token']
ParsingActivator.twitter(twitter_site, Mock())
token2 = twitter_site.headers['x-guest-token']
assert token1 != token2
"""Maigret main module test functions"""
import asyncio
import pytest
from mock import Mock
from maigret.maigret import self_check
from maigret.sites import MaigretDatabase, MaigretSite
EXAMPLE_DB = {
'engines': {
},
'sites': {
"GooglePlayStore": {
"tags": [
"global",
"us"
],
"disabled": False,
"checkType": "status_code",
"alexaRank": 1,
"url": "https://play.google.com/store/apps/developer?id={username}",
"urlMain": "https://play.google.com/store",
"usernameClaimed": "Facebook_nosuchname",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"Reddit": {
"tags": [
"news",
"social",
"us"
],
"checkType": "status_code",
"presenseStrs": [
"totalKarma"
],
"disabled": True,
"alexaRank": 17,
"url": "https://www.reddit.com/user/{username}",
"urlMain": "https://www.reddit.com/",
"usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7"
},
}
}
@pytest.mark.slow
def test_self_check_db_positive_disable():
logger = Mock()
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
assert db.sites[0].disabled == False
loop = asyncio.get_event_loop()
loop.run_until_complete(self_check(db, db.sites_dict, logger, silent=True))
assert db.sites[0].disabled == True
@pytest.mark.slow
def test_self_check_db_positive_enable():
logger = Mock()
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
db.sites[0].disabled = True
db.sites[0].username_claimed = 'Facebook'
assert db.sites[0].disabled == True
loop = asyncio.get_event_loop()
loop.run_until_complete(self_check(db, db.sites_dict, logger, silent=True))
assert db.sites[0].disabled == False
@pytest.mark.slow
def test_self_check_db_negative_disabled():
logger = Mock()
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
db.sites[0].disabled = True
assert db.sites[0].disabled == True
loop = asyncio.get_event_loop()
loop.run_until_complete(self_check(db, db.sites_dict, logger, silent=True))
assert db.sites[0].disabled == True
@pytest.mark.slow
def test_self_check_db_negative_enabled():
logger = Mock()
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
db.sites[0].disabled = False
db.sites[0].username_claimed = 'Facebook'
assert db.sites[0].disabled == False
loop = asyncio.get_event_loop()
loop.run_until_complete(self_check(db, db.sites_dict, logger, silent=True))
assert db.sites[0].disabled == False
"""Maigret reports test functions"""
import copy
import os
from io import StringIO
import xmind
from jinja2 import Template
from maigret.report import generate_csv_report, generate_txt_report, save_xmind_report, save_html_report, \
save_pdf_report, generate_report_template, generate_report_context
from maigret.result import QueryResult, QueryStatus
EXAMPLE_RESULTS = {
'GitHub': {
'username': 'test',
'parsing_enabled': True,
'url_main': 'https://www.github.com/',
'url_user': 'https://www.github.com/test',
'status': QueryResult('test',
'GitHub',
'https://www.github.com/test',
QueryStatus.CLAIMED,
tags=['test_tag']),
'http_status': 200,
'is_similar': False,
'rank': 78
}
}
GOOD_RESULT = QueryResult('', '', '', QueryStatus.CLAIMED)
BAD_RESULT = QueryResult('', '', '', QueryStatus.AVAILABLE)
GOOD_500PX_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_500PX_RESULT.tags = ['photo', 'us', 'global']
GOOD_500PX_RESULT.ids_data = {"uid": "dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==", "legacy_id": "26403415",
"username": "alexaimephotographycars", "name": "Alex Aim\u00e9",
"website": "www.flickr.com/photos/alexaimephotography/",
"facebook_link": " www.instagram.com/street.reality.photography/",
"instagram_username": "alexaimephotography", "twitter_username": "Alexaimephotogr"}
GOOD_REDDIT_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_REDDIT_RESULT.tags = ['news', 'us']
GOOD_REDDIT_RESULT.ids_data = {"reddit_id": "t5_1nytpy", "reddit_username": "alexaimephotography",
"fullname": "alexaimephotography",
"image": "https://styles.redditmedia.com/t5_1nytpy/styles/profileIcon_7vmhdwzd3g931.jpg?width=256&height=256&crop=256:256,smart&frame=1&s=4f355f16b4920844a3f4eacd4237a7bf76b2e97e",
"is_employee": "False", "is_nsfw": "False", "is_mod": "True", "is_following": "True",
"has_user_profile": "True", "hide_from_robots": "False",
"created_at": "2019-07-10 12:20:03", "total_karma": "53959", "post_karma": "52738"}
GOOD_IG_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_IG_RESULT.tags = ['photo', 'global']
GOOD_IG_RESULT.ids_data = {"instagram_username": "alexaimephotography", "fullname": "Alexaimephotography",
"id": "6828488620",
"image": "https://scontent-hel3-1.cdninstagram.com/v/t51.2885-19/s320x320/95420076_1169632876707608_8741505804647006208_n.jpg?_nc_ht=scontent-hel3-1.cdninstagram.com&_nc_ohc=jd87OUGsX4MAX_Ym5GX&tp=1&oh=0f42badd68307ba97ec7fb1ef7b4bfd4&oe=601E5E6F",
"bio": "Photographer \nChild of fine street arts",
"external_url": "https://www.flickr.com/photos/alexaimephotography2020/"}
GOOD_TWITTER_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_TWITTER_RESULT.tags = ['social', 'us']
TEST = [('alexaimephotographycars', 'username', {
'500px': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/alexaimephotographycars',
'ids_usernames': {'alexaimephotographycars': 'username', 'alexaimephotography': 'username',
'Alexaimephotogr': 'username'}, 'status': GOOD_500PX_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 2981},
'Reddit': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/alexaimephotographycars', 'status': BAD_RESULT,
'http_status': 404, 'is_similar': False, 'rank': 17},
'Twitter': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/alexaimephotographycars', 'status': BAD_RESULT, 'http_status': 400,
'is_similar': False, 'rank': 55},
'Instagram': {'username': 'alexaimephotographycars', 'parsing_enabled': True,
'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/alexaimephotographycars', 'status': BAD_RESULT,
'http_status': 404, 'is_similar': False, 'rank': 29}}), ('alexaimephotography', 'username', {
'500px': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/alexaimephotography', 'status': BAD_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 2981},
'Reddit': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/alexaimephotography',
'ids_usernames': {'alexaimephotography': 'username'}, 'status': GOOD_REDDIT_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 17},
'Twitter': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/alexaimephotography', 'status': BAD_RESULT, 'http_status': 400,
'is_similar': False, 'rank': 55},
'Instagram': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/alexaimephotography',
'ids_usernames': {'alexaimephotography': 'username'}, 'status': GOOD_IG_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 29}}), ('Alexaimephotogr', 'username', {
'500px': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 200,
'is_similar': False, 'rank': 2981},
'Reddit': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 404,
'is_similar': False, 'rank': 17},
'Twitter': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/Alexaimephotogr', 'status': GOOD_TWITTER_RESULT, 'http_status': 400,
'is_similar': False, 'rank': 55},
'Instagram': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 404,
'is_similar': False, 'rank': 29}})]
SUPPOSED_BRIEF = """Search by username alexaimephotographycars returned 1 accounts. Found target's other IDs: alexaimephotography, Alexaimephotogr. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 3 accounts."""
SUPPOSED_INTERESTS = "Interests: photo <span class=\"text-muted\">(2)</span>, news <span class=\"text-muted\">(1)</span>, social <span class=\"text-muted\">(1)</span>"
SUPPOSED_GEO = "Geo: us <span class=\"text-muted\">(3)</span>"
def test_generate_report_template():
report_template, css = generate_report_template(is_pdf=True)
assert isinstance(report_template, Template)
assert isinstance(css, str)
report_template, css = generate_report_template(is_pdf=False)
assert isinstance(report_template, Template)
assert css is None
def test_generate_csv_report():
csvfile = StringIO()
generate_csv_report('test', EXAMPLE_RESULTS, csvfile)
csvfile.seek(0)
data = csvfile.readlines()
assert data == [
'username,name,url_main,url_user,exists,http_status\r\n',
'test,GitHub,https://www.github.com/,https://www.github.com/test,Claimed,200\r\n',
]
def test_generate_txt_report():
txtfile = StringIO()
generate_txt_report('test', EXAMPLE_RESULTS, txtfile)
txtfile.seek(0)
data = txtfile.readlines()
assert data == [
'https://www.github.com/test\n',
'Total Websites Username Detected On : 1',
]
def test_save_xmind_report():
filename = 'report_test.xmind'
save_xmind_report(filename, 'test', EXAMPLE_RESULTS)
workbook = xmind.load(filename)
sheet = workbook.getPrimarySheet()
data = sheet.getData()
assert data['title'] == 'test Analysis'
assert data['topic']['title'] == 'test'
assert len(data['topic']['topics']) == 2
assert data['topic']['topics'][0]['title'] == 'Undefined'
assert data['topic']['topics'][1]['title'] == 'test_tag'
assert len(data['topic']['topics'][1]['topics']) == 1
assert data['topic']['topics'][1]['topics'][0]['label'] == 'https://www.github.com/test'
def test_html_report():
report_name = 'report_test.html'
context = generate_report_context(TEST)
save_html_report(report_name, context)
report_text = open(report_name).read()
assert SUPPOSED_BRIEF in report_text
assert SUPPOSED_GEO in report_text
assert SUPPOSED_INTERESTS in report_text
def test_pdf_report():
report_name = 'report_test.pdf'
context = generate_report_context(TEST)
save_pdf_report(report_name, context)
assert os.path.exists(report_name)
"""Maigret Database test functions"""
from maigret.sites import MaigretDatabase, MaigretSite
EXAMPLE_DB = {
'engines': {
"XenForo": {
"presenseStrs": ["XenForo"],
"site": {
"absenceStrs": [
"The specified member cannot be found. Please enter a member's entire name.",
],
"checkType": "message",
"errors": {
"You must be logged-in to do that.": "Login required"
},
"url": "{urlMain}{urlSubpath}/members/?username={username}"
}
},
},
'sites': {
"Amperka": {
"engine": "XenForo",
"rank": 121613,
"tags": [
"ru"
],
"urlMain": "http://forum.amperka.ru",
"usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7"
},
}
}
def test_load_empty_db_from_str():
db = MaigretDatabase()
db.load_from_str('{"engines": {}, "sites": {}}')
assert db.sites == []
assert db.engines == []
def test_load_valid_db():
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
assert len(db.sites) == 1
assert len(db.engines) == 1
assert db.sites[0].name == 'Amperka'
assert db.engines[0].name == 'XenForo'
def test_site_json_dump():
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
init_keys = EXAMPLE_DB['sites']['Amperka'].keys()
# contains engine data
obj_keys = db.sites[0].json.keys()
assert set(init_keys).issubset(set(obj_keys))
def test_site_correct_initialization():
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
xenforo = db.engines[0]
assert xenforo.name == 'XenForo'
assert xenforo.site['checkType'] == 'message'
amperka = db.sites[0]
assert amperka.name == 'Amperka'
assert amperka.check_type == 'message'
def test_site_strip_engine_data():
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
amperka = db.sites[0]
amperka_stripped = amperka.strip_engine_data()
assert amperka_stripped.json == EXAMPLE_DB['sites']['Amperka']
def test_site_strip_engine_data_with_site_prior_updates():
db = MaigretDatabase()
UPDATED_EXAMPLE_DB = dict(EXAMPLE_DB)
UPDATED_EXAMPLE_DB['sites']['Amperka']['absenceStrs'] = ["test"]
db.load_from_json(UPDATED_EXAMPLE_DB)
amperka = db.sites[0]
amperka_stripped = amperka.strip_engine_data()
assert amperka_stripped.json == UPDATED_EXAMPLE_DB['sites']['Amperka']
def test_saving_site_error():
db = MaigretDatabase()
DB = dict(EXAMPLE_DB)
DB['sites']['Amperka']['errors'] = {'error1': 'text1'}
db.load_from_json(DB)
amperka = db.sites[0]
assert len(amperka.errors) == 2
assert amperka.strip_engine_data().errors == {'error1': 'text1'}
assert amperka.strip_engine_data().json['errors'] == {'error1': 'text1'}
def test_ranked_sites_dict():
db = MaigretDatabase()
db.update_site(MaigretSite('3', {'alexaRank': 1000, 'engine': 'ucoz'}))
db.update_site(MaigretSite('1', {'alexaRank': 2, 'tags': ['forum']}))
db.update_site(MaigretSite('2', {'alexaRank': 10, 'tags': ['ru', 'forum']}))
# sorting
assert list(db.ranked_sites_dict().keys()) == ['1', '2', '3']
assert list(db.ranked_sites_dict(top=2).keys()) == ['1', '2']
assert list(db.ranked_sites_dict(reverse=True, top=2).keys()) == ['3', '2']
# filtering by tags
assert list(db.ranked_sites_dict(tags=['ru'], top=2).keys()) == ['2']
assert list(db.ranked_sites_dict(tags=['forum']).keys()) == ['1', '2']
# filtering by engine
assert list(db.ranked_sites_dict(tags=['ucoz']).keys()) == ['3']
# filtering by names
assert list(db.ranked_sites_dict(names=['1', '2']).keys()) == ['1', '2']
assert list(db.ranked_sites_dict(names=['2', '3']).keys()) == ['2', '3']
# disjunction
assert list(db.ranked_sites_dict(names=['2'], tags=['forum']).keys()) == ['1', '2']
assert list(db.ranked_sites_dict(names=['2'], tags=['forum'], reverse=True).keys()) == ['2', '1']
assert list(db.ranked_sites_dict(names=['2'], tags=['ucoz']).keys()) == ['2', '3']
assert list(db.ranked_sites_dict(names=['4'], tags=['ru']).keys()) == ['2']
assert list(db.ranked_sites_dict(names=['4'], tags=['nosuchtag']).keys()) == []
"""Maigret utils test functions"""
from maigret.utils import CaseConverter, is_country_tag, enrich_link_str
def test_case_convert_camel_to_snake():
a = 'SnakeCasedString'
b = CaseConverter.camel_to_snake(a)
assert b == 'snake_cased_string'
def test_case_convert_snake_to_camel():
a = 'camel_cased_string'
b = CaseConverter.snake_to_camel(a)
assert b == 'camelCasedString'
def test_case_convert_snake_to_title():
a = 'camel_cased_string'
b = CaseConverter.snake_to_title(a)
assert b == 'Camel cased string'
def test_is_country_tag():
assert is_country_tag('ru') == True
assert is_country_tag('FR') == True
assert is_country_tag('a1') == False
assert is_country_tag('dating') == False
assert is_country_tag('global') == True
def test_enrich_link_str():
assert enrich_link_str('test') == 'test'
assert enrich_link_str(' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>'
......@@ -12,6 +12,8 @@ import xml.etree.ElementTree as ET
from datetime import datetime
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from maigret.maigret import MaigretDatabase
RANKS = {str(i):str(i) for i in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 50, 100, 500]}
RANKS.update({
'1000': '1K',
......@@ -22,7 +24,7 @@ RANKS.update({
'50000000': '10M',
})
def get_rank(domain_to_query, dest, print_errors=True):
def get_rank(domain_to_query, site, print_errors=True):
#Retrieve ranking data via alexa API
url = f"http://data.alexa.com/data?cli=10&url={domain_to_query}"
xml_data = requests.get(url).text
......@@ -30,16 +32,16 @@ def get_rank(domain_to_query, dest, print_errors=True):
try:
#Get ranking for this site.
dest['rank'] = int(root.find('.//REACH').attrib['RANK'])
site.alexa_rank = int(root.find('.//REACH').attrib['RANK'])
country = root.find('.//COUNTRY')
if not country is None and country.attrib:
country_code = country.attrib['CODE']
tags = set(dest.get('tags', []))
tags = set(site.tags)
if country_code:
tags.add(country_code.lower())
dest['tags'] = sorted(list(tags))
if 'type' in dest and dest['type'] != 'username':
dest['disabled'] = False
site.tags = sorted(list(tags))
if site.type != 'username':
site.disabled = False
except Exception as e:
if print_errors:
logging.error(e)
......@@ -67,38 +69,40 @@ if __name__ == '__main__':
dest="base_file", default="maigret/resources/data.json",
help="JSON file with sites data to update.")
parser.add_argument('--empty-only', help='update only sites without rating', action='store_true')
pool = list()
args = parser.parse_args()
with open(args.base_file, "r", encoding="utf-8") as data_file:
sites_info = json.load(data_file)
data = sites_info['sites']
engines = sites_info['engines']
db = MaigretDatabase()
sites_subset = db.load_from_file(args.base_file).sites
with open("sites.md", "w") as site_file:
data_length = len(data)
site_file.write(f"""
## List of supported sites: total {data_length}\n
## List of supported sites: total {len(sites_subset)}\n
Rank data fetched from Alexa by domains.
""")
for social_network in data:
url_main = data.get(social_network).get("urlMain")
data.get(social_network)["rank"] = 0
th = threading.Thread(target=get_rank, args=(url_main, data.get(social_network)))
pool.append((social_network, url_main, th))
for site in sites_subset:
url_main = site.url_main
if site.alexa_rank < sys.maxsize and args.empty_only:
continue
site.alexa_rank = 0
th = threading.Thread(target=get_rank, args=(url_main, site))
pool.append((site.name, url_main, th))
th.start()
index = 1
for social_network, url_main, th in pool:
for site_name, url_main, th in pool:
th.join()
sys.stdout.write("\r{0}".format(f"Updated {index} out of {data_length} entries"))
sys.stdout.write("\r{0}".format(f"Updated {index} out of {len(sites_subset)} entries"))
sys.stdout.flush()
index = index + 1
sites_full_list = [(site, site_data['rank']) for site, site_data in data.items()]
sites_full_list = [(s, s.alexa_rank) for s in sites_subset]
sites_full_list.sort(reverse=False, key=lambda x: x[1])
while sites_full_list[0][1] == 0:
......@@ -107,20 +111,17 @@ Rank data fetched from Alexa by domains.
for num, site_tuple in enumerate(sites_full_list):
site, rank = site_tuple
url_main = data[site]['urlMain']
url_main = site.url_main
valid_rank = get_step_rank(rank)
all_tags = data[site].get('tags', [])
all_tags = site.tags
tags = ', ' + ', '.join(all_tags) if all_tags else ''
note = ''
if data[site].get('disabled'):
if site.disabled:
note = ', search is disabled'
site_file.write(f'1. [{site}]({url_main})*: top {valid_rank}{tags}*{note}\n')
db.update_site(site)
site_file.write(f'\nAlexa.com rank data fetched at ({datetime.utcnow()} UTC)\n')
sorted_json_data = json.dumps({'sites': data, 'engines': engines}, indent=2, sort_keys=True)
with open(args.base_file, "w") as data_file:
data_file.write(sorted_json_data)
db.save_to_file(args.base_file)
print("\nFinished updating supported site listing!")