Source code for wsknn.preprocessing.static_parsers.parse

from typing import Dict, Iterable, IO, Tuple, List
import pandas as pd
from more_itertools import locate
from tqdm import tqdm
from wsknn.preprocessing.static_parsers.checkers.validation import check_event_keys_and_values, is_user_item_interaction
from wsknn.preprocessing.static_parsers.cleaners.time_transform import clean_time
from wsknn.preprocessing.structure.item import Items
from wsknn.preprocessing.structure.session import Sessions


[docs]def parse_fn(dataset: Iterable, allowed_actions: Dict, purchase_action_name: str, session_id_key: str, product_key: str, action_key: str, time_key: str, time_to_numeric: bool, time_to_datetime: bool, datetime_format: str, progress_bar: bool) -> (Items, Sessions): """ Function parses given dataset into Sessions and Items objects. Parameters ---------- dataset : Iterable Object with events. allowed_actions : Dict, optional Allowed actions and their weights. purchase_action_name: Any, optional The name of the final action (it is required to apply weight into the session vector). session_id_key : str The name of the session key. product_key : str The name of the product key. action_key : str The name of the event action type key. time_key : str The name of the event timestamp key. time_to_numeric : bool, default = True Transforms input timestamps to float values. time_to_datetime : bool, default = False Transforms input timestamps to datatime objects. Setting `datetime_format` parameter is required. datetime_format : str The format of datetime object. progress_bar : bool Show parsing progress. Returns ------- ItemsMap, SessionsMap : Items, Sessions """ # Initialize Items and Sessions items_obj = Items(event_session_key=session_id_key, event_product_key=product_key, event_time_key=time_key) sessions_obj = Sessions(event_session_key=session_id_key, event_product_key=product_key, event_time_key=time_key, event_action_key=action_key, event_action_weights=allowed_actions) possible_actions_list = [] if allowed_actions is not None: possible_actions_list = list(allowed_actions.keys()) for event in (tqdm(dataset, disable=(not progress_bar))): event = check_event_keys_and_values(event, session_id_key, product_key, time_key, action_key) # Check if params are returned if event: # parse times if time_to_numeric or time_to_datetime: event[time_key] = clean_time(times=event[time_key], time_to_numeric=time_to_numeric, time_to_datetime=time_to_datetime, datetime_format=datetime_format) action = event.get(action_key, False) if not action: items_obj.append(event) sessions_obj.append(event) else: if action != purchase_action_name: # Is session user interaction? if is_user_item_interaction(action, possible_actions_list): # Append Event to Items and Sessions items_obj.append(event) sessions_obj.append(event) else: # It is a purchase, update weights accordingly purchase_additive_factor = allowed_actions[purchase_action_name] sessions_obj.update_weights_of_purchase_session(event[session_id_key], purchase_additive_factor) return items_obj, sessions_obj
def parse_stream(events: IO, sep: str, allowed_actions: Dict, purchase_action_name: str, session_index: int, product_index: int, time_index: int, action_index: int, time_to_numeric: bool, time_to_datetime: bool, datetime_format: str, ignore_errors: bool = True, header_names: Dict = None, progress_bar: bool = False): """ Function parses given stream of values. Parameters ---------- events : IO Stream to file. sep : str Separator between file stream records. allowed_actions : Dict, optional Allowed actions and their weights. purchase_action_name: Any, optional The name of the final action (it is required to apply weight into the session vector). session_index : int The index of the session. product_index : int The index of the product. action_index : int The index of the event action. time_index : int The index of the event timestamp. time_to_numeric : bool, default = False Transforms input timestamps to float values. time_to_datetime : bool, default = False Transforms input timestamps to datatime objects. Setting ``datetime_format`` parameter is required. datetime_format : str The format of datetime object. ignore_errors : bool, default=True Ignore rows that raise exceptions. header_names : List, default = None Key names applied to the data. progress_bar : bool, default = False Show parsing progress. Returns ------- items_obj, sessions_obj : Items, Sessions """ # Initialize Items and Sessions if header_names is None: if action_index is None: header_names = { session_index: 'session', product_index: 'item', time_index: 'ts' } else: header_names = { session_index: 'session', product_index: 'item', time_index: 'ts', action_index: 'action' } items_obj = Items(event_session_key=header_names[session_index], event_product_key=header_names[product_index], event_time_key=header_names[time_index]) if action_index is not None: sessions_obj = Sessions(event_session_key=header_names[session_index], event_product_key=header_names[product_index], event_time_key=header_names[time_index], event_action_key=header_names[action_index], event_action_weights=allowed_actions) else: sessions_obj = Sessions(event_session_key=header_names[session_index], event_product_key=header_names[product_index], event_time_key=header_names[time_index], event_action_key=None, event_action_weights=allowed_actions) if allowed_actions is None: possible_actions_list = None else: possible_actions_list = list(allowed_actions.keys()) for raw_event in (tqdm(events, disable=(not progress_bar))): try: splitted = raw_event.split(sep) splitted = [s.strip() for s in splitted] except Exception as ex: if ignore_errors: continue else: raise ex if action_index is not None: event = { header_names[session_index]: splitted[session_index], header_names[product_index]: splitted[product_index], header_names[action_index]: splitted[action_index], header_names[time_index]: splitted[time_index] } else: event = { header_names[session_index]: splitted[session_index], header_names[product_index]: splitted[product_index], header_names[time_index]: splitted[time_index] } event = check_event_keys_and_values(event, header_names[session_index], header_names[product_index], header_names[time_index], header_names.get(action_index, None)) # Check if params are returned if event: # parse times if time_to_numeric or time_to_datetime: event[header_names[time_index]] = clean_time(times=event[header_names[time_index]], time_to_numeric=time_to_numeric, time_to_datetime=time_to_datetime, datetime_format=datetime_format) action = event.get(action_index, False) if not action: items_obj.append(event) sessions_obj.append(event) else: if action != purchase_action_name: # Is session user interaction? if is_user_item_interaction(action, possible_actions_list): # Append Event to Items and Sessions items_obj.append(event) sessions_obj.append(event) else: # It is a purchase, update weights accordingly purchase_additive_factor = allowed_actions[purchase_action_name] sessions_obj.update_weights_of_purchase_session(event[header_names[session_index]], purchase_additive_factor) return items_obj, sessions_obj