MongoDB and market research of IT vacancies

Have you ever analyzed a job?
 
 
The question was asked, in what technologies is the labor market most demanding at the moment? A month ago? A year ago?
 
 
How often do new Java developer vacancies open in a certain area of ​​your city and how actively do they close?
 
 
In this article I will tell you how you can achieve the desired result and build a report system on the topic of interest to us. Go!
 
 
MongoDB and market research of IT vacancies

 
Source
 
Headhunter.ru
 
Probably, many of you are familiar and even used a resource like Headhunter.ru . On this site, thousands of new jobs are posted daily in various fields. HeadHunter also has an API that allows the developer to interact with the data of this resource.
 
 

Toolkit


 
On a simple example, consider the construction of the process of obtaining data for the reporting system, which is based on working with the API of the site Headhunter.ru. As an intermediate storage of information we will use the embedded SQLite DBMS, the processed data will be stored in the MongoDB NoSQL database, as the main language - Python version 3.4.
 
 
HH API [/b]
The capabilities of the HeadHunter API are quite extensive and well described in the official documentation for GitHib . First of all, it is the ability to send anonymous requests that do not require authorization to obtain information about jobs in the JSON format. Recently, a number of methods have become paid (employer methods), but they will not be considered in this task.
 
 
Each vacancy hangs on the site for 30 days, after which, if it is not renewed, it gets to the archive. If the vacancy was in the archive before the expiry of 30 days, it means that it was closed the employer.
 
 
HeadHunter API (further - HH API) allows to receive an array of published vacancies for any date for the last 30 days, and we will use it - we will be at daily based on collecting published vacancies for each day.
 

Implementation of


 
 
Connecting the SQLite database
 
 
import sqlite3
conn_db = sqlite3.connect ('hr.db', timeout = 10)
c = conn_db.cursor ()

 
Table for storing job status changes
 
For convenience, we will save the history of the vacancy status (availability on a date) in a special SQLite database table. Thanks to the table vacancy_history We will be aware of the availability of the vacancy on the site, ie, on the date of unloading. in which dates it was active.
 
 
c.execute ('' '
create table if not exists vacancy_history
(
id_vacancy integer,
date_load text,
date_from text,
date_to text
)' '')

 
Filtration of the sample of vacancies
 
There is a restriction that one request can not return more than 2000 collections, and since more than one job can be published on the site in a single day, we will put a filter in the request body, for example: vacancies only in St. Petersburg (area = 2) , on specialization IT (specialization = 1)
 
 
path = ("/vacancies?area=2&specialization=1&page={}&per_page={}&date_from={}&date_to={}".format(page, per_page, date_from, date_to))
 
Additional selection conditions
 
The labor market is actively growing and even with the filter, the number of vacancies may exceed 200? so we will establish an additional restriction in the form of a separate launch for each day: vacancies for the first half of the day and vacancies for the second half of the day
 
 
def get_vacancy_history ():
count_days = 30
hours = 0
while count_days> = 0:
while hours < 24:
date_from = (cur_date.replace (hour = hours, minute = ? second = 0) -
.td (days = count_days)). strftime ('% Y-% m-% dT% H:% M:% S' )
date_to = (cur_date.replace (hour = hours + 1? minute = 5? second = 59) -
.td (days = count_days)). strftime ('% Y-% m-% dT% H:% M:% S ')
while count == per_page:
path = ("/vacancies? area = 2 & specialization = 1 & page = {}
& per_page = {} & date_from = {} & date_to = {}"
.format (page, per_page, date_from, date_to))
conn.request ("GET", path, headers = headers)
response = conn.getresponse ()
vacancies = response.read ()
conn.close ()
count = len (json.loads (vacancies)['items'])
# Insert values ​​into the database
try:
c.executemany ('INSERT INTO vacancy_history VALUES (?,?,?,?)', collection_for_ins)
except sqlite3.DatabaseError as err:
print ("Error:", err)
else:
conn_db.commit ()
if collection_for_ins:
page = page + 1
total = total + count
#set the array to
del (collection_for_ins[:])
hours = hours + 12
count_days = count_days - 1
hours = 0

 
 
 
The first example of using [/b]
Suppose that we are faced with the task of determining vacancies that were closed for a certain time interval, for example, in July 2018. This is solved as follows: the result of a simple SQL query to the vacancy_history table will return the data we need, which can be passed to the DataFrame for further analysis:
 
 
c.execute ("" "
select
a.id_vacancy,
date (a.date_load) as date_last_load,
date (a.date_from) as date_publish,
ifnull (a.date_next, date (a. date_load, '+1 day')) as date_close
from (
select
vh1.id_vacancy,
vh1.date_load,
vh1.date_from,
min (vh2.date_load) as date_next
from vacancy_history vh1
left join vacancy_history vh2
on vh1.id_vacancy = vh2.id_vacancy
and vh1.date_load < vh2.date_load
where date (vh1.date_load) between: date_in and: date_out
.group by
.vh1.id_vacancy,
vh1.date_load,
vh1.date_from
) as a
where a.date_next is null
"" ",
{" Date_in ": date_in," date_out ": date_out})
.
date_in = dt.datetime (201? ? 1)
date_out = dt.datetime (201? ? 31)
.
.
Closed_vacancies = get_closed_by_period (date_in, date_out)
.
df = pd.DataFrame (closed_vacancies,
polumn =['id_vacancy', 'date_last_load', 'date_publish', 'date_close'])
df.head ()

 
We get the result of this type:
 
 
 
 
id_vacancy
 
date_last_load
 
date_publish
 
date_close
 
 
 
0
 
18126697
 
2018-07-09
 
2018-07-09
 
2018-07-10
 
 
 
1
 
18155121
 
2018-07-09
 
2018-06-19
 
2018-07-10
 
 
 
2
 
18881605
 
2018-07-09
 
2018-07-02
 
2018-07-10
 
 
 
3
 
19620783
 
2018-07-09
 
2018-06-27
 
2018-07-10
 
 
 
4
 
19696188
 
2018-07-09
 
2018-06-15
 
2018-07-10
 
 
If we want to perform analysis using Excel or third-party BI tools, we can unload the vacancy_history table into a csv file for further analysis:
 
 
# Export the full table from the database to CSV
data = c.execute ('select * from vacancy_history')
with open ('vacancy_history.csv', 'w', newline = '') as out_csv_file:
csv_out = csv.writer (out_csv_file)
csv_out.writerow (d[0].for d in data.description)
csv_out.writerows (data.fetchall ())
conn_db.close ()

 

Heavy artillery


 
What if we need a more sophisticated analysis of the data? Here the document-oriented NoSQL database comes to the rescue. MongoDB , which allows you to store data in JSON-format.
 
 
 
A demo copy of my MongoDB database is deployed in the cloud service mLab , which allows you to create a free database up to 500MB, which is enough to parse the current task. In the database hr_db there is a collection Vacancy, to which we will establish the connection:
 
 
# We connect the cloud base Mongo
from pymongo import MongoClient
from pymongo import ASCENDING
from pymongo import errors
client = MongoClient ('mongodb: // :
@ ds115219.mlab.com: 15219 /hr_db')
db = client.hr_db
VacancyMongo = db.Vacancy

 
It should be noted that not always the level of wages is indicated in rubles, so for analysis it is necessary to bring all the values ​​to the ruble equivalent. For this, we use the HH API to extract a collection of dictionaries containing information on the exchange rate for the current date:
 
 
# Getting the directory
def get_dictionaries ():
conn = http.client.HTTPSConnection ("api.hh.ru")
conn.request ("GET", "https://api.hh.ru/dictionaries", headers = headers)
response = conn.getresponse ()
if response.status! = 200:
conn.close ()
conn = http.client.HTTPSConnection ("api.hh.ru")
conn.request ("GET", "https://api.hh.ru/dictionaries", headers = headers)
response = conn.getresponse ()
dictionaries = response.read ()
dictionaries_json = json.loads (dictionaries)
return dictionaries_json

 
Filling the dictionary with the currencies current exchange rates:
 
 
hh_dictionary = get_dictionaries ()
currencies = hh_dictionary['currency']
currency_rates = {}
for currency in currencies:
currency_rates[currency['code']]= currency['rate']

 
 
The above-described actions to collect vacancies are launched on a daily basis, so there is no need to review each job every time and get detailed information for each of them. We will take only those that were received for the last five days.
 
Getting an array of jobs for the last 5 days from the SQLite database:
 
 
def get_list_of_vacancies_sql ():
conn_db = sqlite3.connect ('hr.db', timeout = 10)
conn_db.row_factory = lambda cursor, row: row[0]
c = conn_db.cursor ()
items = c.execute ("" "
select
distinct id_vacancy
from vacancy_history
where date (date_load)> = date ('now', '-5 day')
" "") .fetchall ( )
conn_db.close ()
return items

 
Getting a job array for the last five days from MongoDB:
 
 
def get_list_of_vacancies_nosql ():
date_load = (dt.datetime.now () - td (days = 5)). strftime ('% Y-% m-% d')
vacancies_from_mongo =[]
for item in VacancyMongo.find ({"date_load": {"$ gte": date_load}}, {"id": ? "_id": 0}):
vacancies_from_mongo.append (int (item['id']))
return vacancies_from_mongo

 
It remains to find the difference between the two arrays, for those vacancies that are not in MongoDB, to receive detailed information and write it into the database:
 
 
sql_list = get_list_of_vacancies_sql ()
mongo_list = get_list_of_vacancies_nosql ()
vac_for_proc =[]
s = set (mongo_list)
vac_for_proc =[x for x in sql_list if x not in s]
vac_id_chunks =[vac_for_proс[x: x + 500]for x in range (? len (vac_for_proc), 500)]

 
So, we have an array with new vacancies that are not yet available in MongoDB, for each of them we will receive detailed information using a request in the HH API, before writing to MongoDB we will process each document:
 
 
Let's quote the wage to the ruble equivalent;
 
We will add to each vacancy gradation of a specialist level (Junior /Middle /Senior etc)
 
 
All this is implemented in the function of vacancies_processing:
 
 
from nltk.stem.snowball import SnowballStemmer
stemmer = SnowballStemmer ("russian")
def vacancies_processing (vacancies_list):
cur_date = dt.datetime.now (). strftime ('% Y-% m-% d')
for vacancy_id in vacancies_list:
conn = http.client.HTTPSConnection ("api.hh.ru")
conn.request ("GET", "/vacancies/{}".format(vacancy_id), headers = headers)
response = conn.getresponse ()
if response.status! = 404:
vacancy_txt = response.read ()
conn.close ()
vacancy = json.loads (vacancy_txt)
# salary
salary = None
if 'salary' in vacancy:
if vacancy['salary']! = None:
max_salary = 500000
if salary is not None:
salary = int (salary)
if salary> = max_salary:
salary = max_salary
# grade
grade = None
if 'name' in vacancy:
p_grade = ''
title = re.sub (u '[^a-zа-я]+', '', vacancy['name'].lower (), re.UNICODE)
words = re.split (r's {?} ', title.strip ())
for title_word in words:
title_word = stemmer.stem (title_word)
if len (title_word.strip ())> 1:
p_grade = p_grade + "" + title_word.strip ()
if re.search ('(main) | (princip)', p_grade):
grade = 'principal'
elif re.search ('(leading) | (senior) | ([f|F]ull)', p_grade):
grade = 'senior'
else:
grade = 'not specify'
vacancy['salary_processed']= salary
vacancy['date_load']= cur_date
vacancy['grade']= grade
vacancy.pop ('branded_description', None)
try:
post_id = VacancyMongo.insert_one (vacancy)
except errors.DuplicateKeyError:
print ('Cant insert the duplicate vacancy_id:', vacancy['id'])

 
Obtaining detailed information by accessing the HH API, preprocessing the received
 
data and insert them into MongoDB will conduct in several threads, 500 jobs in each:
 
 
t_num = 1
threads =[]
for vac_id_chunk in vac_id_chunks:
print ('starting', t_num)
t_num = t_num + 1
t = threading.Thread (target = vacancies_processing, kwargs = {'vacancies_list': vac_id_chunk})
threads.append (t)
t.start ()
for t in threads:
t.join ()

 
 
 
The completed collection in MongoDB looks like the following:
 
 

 
 

A few more examples are


 
Having the collected database, we can perform various analytical samples. So, I'll bring out the Top-10 highest-paid jobs for Python developers in St. Petersburg:
 
 
cursor_mongo = VacancyMongo.find ({"name": {"$ regex": ". *[pP]ython *"}})
df_mongo = pd.DataFrame (list (cursor_mongo))
del df_mongo['_id']
pd.concat ([df_mongo.drop(['employer'], axis = 1),
df_mongo['employer'].apply (pd.Series)['name']], axis = 1)[['grade',
'name',
'salary_processed'
]].sort_values ​​('salary_processed',
ascending = False)[:10]

 
Top-10 highest paid jobs Python [/b]
 
 
 
grade
 
name
 
name
 
salary_processed
 
 
 
 
 
senior
 
Web Team Lead /Architect (Python /Django /React)
 
Investex Ltd
 
???r3r31011.  
 
 
senior
 
Senior Python developer in Montenegro
 
Betmaster
 
???r3r31011.  
 
 
senior
 
Senior Python developer in Montenegro
 
Betmaster
 
???r3r31011.  
 
 
middle
 
Back-End Web Developer (Python)
 
Soshace
 
???r3r31011.  
 
 
middle
 
Back-End Web Developer (Python)
 
Soshace
 
???r3r31011.  
 
 
senior
 
Lead Python Engineer for a Swiss Startup
 
Assaia International AG
 
???r3r31011.  
 
 
middle
 
Back-End Web Developer (Python)
 
Soshace
 
???r3r31011.  
 
 
middle
 
Back-End Web Developer (Python)
 
Soshace
 
???r3r31011.  
 
 
senior
 
Python teamlead
 
DigitalHR
 
???r3r31011.  
 
 
senior
 
The leading developer (Python, PHP, jаvascript)
 
IK GROUP
 
???r3r31011.  
 
 
 
 
 
And now we will deduce, near which metro station the highest concentration of vacant posts for Java-developers. Using the regular expression, I filter by the job title "Java", as well as select only those jobs where the address is specified:
 
 
cursor_mongo = VacancyMongo.find ({"name": {"$ regex": ". *[jJ]ava[^sS]"}, "address": {"$ ne": None}})
df_mongo = pd.DataFrame (list (cursor_mongo))
df_mongo['metro']= df_mongo.apply (lambda x: x['address'].['metro'].['station_name'].
if x['address'].['metro']is not None
else None, axis = 1)
df_mongo.groupby ('metro')['_id']
.count ()
.reset_index (name = 'count')
.sort_values ​​(['count'], ascending = False)
[:10]

 
Java Developer Jobs for Metro Stations [/b]
 
 
 
metro
 
count
 
 
 
 
 
Vasileostrovskaya
 
87
 
 
 
Petrogradskaya
 
68
 
 
 
Vyborgskaya
 
46
 
 
 
Lenin Square
 
45
 
 
 
Gorkovskaya
 
45
 
 
 
Chkalovskaya
 
43
 
 
 
Narva
 
32
 
 
 
The area of ​​the Uprising
 
29
 
 
 
Old Village
 
29
 
 
 
Elizarovskaya
 
27
 
 
 
 
 

Results of


 
So, the analytical capabilities of the developed system are truly broad and can be used to plan a start-up or open a new line of business.
 
 
I note that only the basic functionality of the system is presented so far, further development is planned in the direction of analysis by geographical coordinates and prediction of the appearance of vacancies in this or that district of the city.
 
 
The full source code for this article can be found at the link to my GitHub .
 
 
P.S. Comments on the article are welcome, I will be glad to answer all your questions and get your opinion. Thank you!
+ 0 -

Add comment