Source code for lpmd.core.scrape

"""lpmd.core.scrape."""

import os
import shutil
from importlib import resources

import dask.dataframe as dd
import pandas as pd
import yaml

import lpmd.utils.check as check
import lpmd.utils.format as fmt


[docs]class BaseScraper: """ Base class for scraper. Other scraper classes should inherit from this class. The class has base methods for scraping data, and in other scraper subclass `get_scraped_data` should be customized. Parameters ---------- data_id : str String expressing which data should be scraped in data_catalogue.yml. """
[docs] def __init__(self, data_id): self.data_id = data_id # Read yml file on urls data_catalogue = yaml.safe_load( resources.read_text("lpmd", "data_catalogue.yml") ) self.data_catalogue = data_catalogue[self.data_id] self.columns = self.data_catalogue["columns"] self.datasets_path = "lpmd/datasets/{data_id}/".format(data_id=self.data_id)
[docs] def get_scraped_data(self, partition_id): """ Get scraped data corresponding to partition_id. Parameters ---------- partition_id : str String expressing which partition data should be scraped in data_catalogue.yml. Returns ------- df_scraped : pandas.core.frame.DataFrame Scraped data that are not cleansed. """ url = self.data_catalogue["partition"][partition_id] columns = self.columns.keys() # Check whether url is effective if not check.check_url(url): # ToDo: change logger print("Specified url is not effective.") return None # Read and cleanse Excel file on specified url df_scraped = pd.read_excel(url) df_scraped.columns = columns return df_scraped
def _cleanse_scraped_data(self, df_scraped): """ Cleanse scraped data. The function is assumed to be added in `get_scraped_data` of the class that inherit from `BaseScraper`. Parameters ---------- df_scraped : pandas.core.frame.DataFrame Scraped data. Returns ------- df_scraped : pandas.core.frame.DataFrame Scraped data that are cleansed. """ for col in df_scraped.columns: if col in self.columns.keys(): column_type = self.columns[col]["column_type"] if column_type == "str_year": df_scraped[col] = fmt.format_raw_str_year(df_scraped[col]) elif column_type == "qty": df_scraped[col] = fmt.format_raw_qty(df_scraped[col]) return df_scraped
[docs] def save_scraped_data(self, partition_id, path=None, **kwargs): """ Save scraped data. Parameters ---------- partition_id : str String expressing which partition data should be scraped in data_catalogue.yml. path : str, default None Sting expressing the path to save. If None, scraped data is stored at working current directory. kwargs Additional keyword arguments passed to ``pandas.DataFrame.to_csv``. Returns ------- has_saved : bool If scraped data is successfully saved, True. Otherwise, False. """ has_saved = False if path is None: path = "." else: if not isinstance(path, str): msg = "Specified path must be str." raise TypeError(msg) # データを取得し保存するディレクトリを作成. save_path = os.path.join(path, self.data_id) os.makedirs(save_path, exist_ok=True) df_scraped = self.get_scraped_data(partition_id) if df_scraped is not None: filename = self.data_id + "-" + partition_id + ".csv" df_scraped.to_csv(os.path.join(save_path, filename), **kwargs) has_saved = True return has_saved
[docs] def save_batch(self, path=None, **kwargs): """ Save scraped data in batches that are defined in partition section of data_catalogue.yml. Parameters ---------- path : str, default None Sting expressing the path to save. If None, scraped data is stored at working current directory. kwargs Additional keyword arguments passed to ``pandas.DataFrame.to_csv``. Returns ------- dict_result : dict Dict expressing whether partition_id in question is successfully saved. """ dict_result = dict() partition_id_list = self.data_catalogue["partition"].keys() for partition_id in partition_id_list: dict_result[partition_id] = self.save_scraped_data( partition_id, path=path, **kwargs ) return dict_result
[docs] def aggregate(self): """ Aggregate scraped data that are defined in partition section of data_catalogue.yml into a single data frame. Returns ------- df : pandas.core.frame.DataFrame Aggregated data frame. """ path = ".tmp/" self.save_batch(path=path, index=False, sep="\t") ddf = dd.read_csv(f"{path}/{self.data_id}/*.csv", sep="\t") df = ddf.compute() shutil.rmtree(path) return df.reset_index(drop=True)
[docs] def out_to_datasets(self): """ Write a DataFrame to the binary parquet format in `lpmd.datasets`. Returns ------- file_path : str The file path to be saved. """ file = "{data_id}.parquet.zstd".format(data_id=self.data_id) file_path = os.path.join(self.datasets_path, file) os.makedirs(self.datasets_path, exist_ok=True) df = self.aggregate() df.to_parquet(file_path, compression="zstd") return file_path
[docs]class ScraperShipment(BaseScraper): """Scraper class for data on livestock products shipment.""" _data_id = "shipment"
[docs] def __init__(self): super(ScraperShipment, self).__init__(self._data_id)
[docs] def get_scraped_data(self, partition_id): """ Get scraped data corresponding to partition_id for livestock products shipment. Parameters ---------- partition_id : str String expressing which partition data should be scraped in data_catalogue.yml. Returns ------- df_scraped : pandas.core.frame.DataFrame Scraped data that are not cleansed. """ df_scraped = super().get_scraped_data(partition_id) if df_scraped is not None: df_scraped = df_scraped[ (df_scraped.index >= 7) & (~df_scraped["year"].isna()) ] df_scraped = self._cleanse_scraped_data(df_scraped) df_scraped["data_source"] = self.data_catalogue["name"] df_scraped["prefecture"] = partition_id df_scraped["source_url"] = self.data_catalogue["partition"][partition_id] df_scraped.reset_index(drop=True, inplace=True) return df_scraped
[docs]class ScraperSlaughter(BaseScraper): """Scraper class for data on animals slaughtered and abattoirs.""" _data_id = "slaughter"
[docs] def __init__(self): super(ScraperSlaughter, self).__init__(self._data_id)
[docs] def get_scraped_data(self, partition_id): """ Get scraped data corresponding to partition_id for animals slaughtered and abattoirs. Parameters ---------- partition_id : str String expressing which partition data should be scraped in data_catalogue.yml. Returns ------- df_scraped : pandas.core.frame.DataFrame Scraped data that are not cleansed. """ df_scraped = super().get_scraped_data(partition_id) if df_scraped is not None: df_scraped = df_scraped[ (df_scraped.index >= 7) & (~df_scraped["year"].isna()) ] df_scraped = self._cleanse_scraped_data(df_scraped) df_scraped["data_source"] = self.data_catalogue["name"] df_scraped["prefecture"] = partition_id df_scraped["source_url"] = self.data_catalogue["partition"][partition_id] df_scraped.reset_index(drop=True, inplace=True) return df_scraped
[docs]class ScraperCarcass(BaseScraper): """Scraper class for data on carcass.""" _data_id = "carcass"
[docs] def __init__(self): super(ScraperCarcass, self).__init__(self._data_id)
[docs] def get_scraped_data(self, partition_id): """ Get scraped data corresponding to partition_id for carcass. Parameters ---------- partition_id : str String expressing which partition data should be scraped in data_catalogue.yml. Returns ------- df_scraped : pandas.core.frame.DataFrame Scraped data that are not cleansed. """ df_scraped = super().get_scraped_data(partition_id) if df_scraped is not None: df_scraped = df_scraped[ (df_scraped.index >= 7) & (~df_scraped["year"].isna()) ] df_scraped = self._cleanse_scraped_data(df_scraped) df_scraped["data_source"] = self.data_catalogue["name"] df_scraped["prefecture"] = partition_id df_scraped["source_url"] = self.data_catalogue["partition"][partition_id] df_scraped.reset_index(drop=True, inplace=True) return df_scraped