GCP Cloud Run: LOC Flattener
Flattening and injesting JSON into data lake.. Autonomously.
gitpythongcpbashworkflow automationdockercontainerization
26 Minutes, 10 Seconds
2024-07-11 00:00 +0000
Library of Congress Normalizer Job
This repo normalizes the existing library of congress schema into a db that wil then be used to construct a knowledge graph of supreme court law.
Plan
- Setup a venv to run locally
- Install requirements
- Write out the script to interface with gcp
- Set up a docker container and test locally
- build the image
- upload to gcp
- create the job
Setup the venv
Install
I installed virtualenv locally on ubuntu
Create
I then run virtualenv {path to venvs}
Activate
Then source the venv bin to activate
source {path to venv}/bin/activate
Install requirements
pip install -r requirements.txt
Write out the Script
Steps
- Access the loc_scraper Bucket
- Grab a json blob
- Process the blob
- Move the blob to a processed bucket
Data Organization
I want to create workflow class with the following methods
- get_creds
- grab_blob
- process_blob
- move_blob
The process_blob method will be a lot of work. I might just flatten the json and dump into a table. I will then write a normalization workflow
Get Creds
If running locally I will need some creds in the enviornment. I will take create a key from the console and download it for local run .
Setup the Docker Container
The Dockerfile
Also available on github
# # Use the Alpine Linux base image
# FROM alpine:latest
# # Set the working directory inside the container
# WORKDIR /app
# # Copy a simple script that prints "Hello, World!" into the container
# COPY /src/hello.sh .
# # Make the script executable
# RUN chmod +x hello.sh
# # Define the command to run when the container starts
# CMD ["./hello.sh"]
# Use the official Python image from Docker Hub
FROM python:3.10-slim
# Set the working directory in the container
WORKDIR /app
# Copy the current directory contents into the container at /app
COPY ./src /app
COPY requirements.txt /app
# Install any needed dependencies specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Run the Python script when the container launches
CMD ["python", "loc_scraper.py"]
Quickstart
Gcloud cli
After this you will have to install gcloud cli and configure you’re local environment. I will write up some scripts in a subsequent post to automate this process… but for the time being check out this “link”
Create the image
In the repo there is a a bash script called build.sh
that will need to be updated to according to your gcp project.
gcloud builds submit --region=us-west2 --config cloudbuild.yaml
It calls cloudbuild.yaml
which might need to be updated for you, but the following the should work.
steps:
- name: 'gcr.io/cloud-builders/docker'
script: |
docker build -t us-west2-docker.pkg.dev/$PROJECT_ID/supreme-court-scraper/supreme-court-scraper-image:dev .
automapSubstitutions: true
images:
- 'us-west2-docker.pkg.dev/$PROJECT_ID/supreme-court-scraper/supreme-court-scraper-image:dev'
Following creation of the imge
Next you can create a job on gcp by runnning the job_create.sh
script… or by copying the code below and chaging yourproject to the correct project-name
gcloud run jobs create supreme-court-scraper --image us-west2-docker.pkg.dev/yourproject/supreme-court-scraper/supreme-court-scraper-image:dev \
Executing the job
Once complete you can execute the job by running the execute_job.sh
script or by running
gcloud run jobs execute supreme-court-scraper
Putting it all together
In a perfect world the following should work. Note that src/.env should be set with your environmental variables such as $GCPPROJECTID
source src/.env \
&& ./build.sh \
&& ./job_create.sh \
&& ./execute_job.sh
Running locally
The python script in the /src
can be run locally, however it should be modified if you choose not to use gcp. There are a number of functions within that can easily be modified to permit writing to the local directory.
Documentation Sources
pip install -r ../requirements.txt
Requirement already satisfied: beautifulsoup4==4.12.3 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 1)) (4.12.3)
Requirement already satisfied: bs4==0.0.2 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 2)) (0.0.2)
Requirement already satisfied: cachetools==5.3.3 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 3)) (5.3.3)
Requirement already satisfied: certifi==2024.2.2 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 4)) (2024.2.2)
Requirement already satisfied: charset-normalizer==3.3.2 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 5)) (3.3.2)
Requirement already satisfied: flatten-json==0.1.14 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 6)) (0.1.14)
Requirement already satisfied: google-api-core==2.18.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 7)) (2.18.0)
Requirement already satisfied: google-auth==2.29.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 8)) (2.29.0)
Requirement already satisfied: google-cloud-appengine-logging==1.4.3 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 9)) (1.4.3)
Requirement already satisfied: google-cloud-audit-log==0.2.5 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 10)) (0.2.5)
Requirement already satisfied: google-cloud-core==2.4.1 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 11)) (2.4.1)
Requirement already satisfied: google-cloud-logging==3.10.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 12)) (3.10.0)
Requirement already satisfied: google-cloud-storage==2.16.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 13)) (2.16.0)
Requirement already satisfied: google-crc32c==1.5.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 14)) (1.5.0)
Requirement already satisfied: google-resumable-media==2.7.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 15)) (2.7.0)
Requirement already satisfied: googleapis-common-protos==1.63.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 16)) (1.63.0)
Requirement already satisfied: grpc-google-iam-v1==0.13.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 17)) (0.13.0)
Requirement already satisfied: grpcio==1.62.2 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 18)) (1.62.2)
Requirement already satisfied: grpcio-status==1.62.2 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 19)) (1.62.2)
Requirement already satisfied: idna==3.7 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 20)) (3.7)
Requirement already satisfied: proto-plus==1.23.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 21)) (1.23.0)
Requirement already satisfied: protobuf==4.25.3 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 22)) (4.25.3)
Requirement already satisfied: pyasn1==0.6.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 23)) (0.6.0)
Requirement already satisfied: pyasn1_modules==0.4.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 24)) (0.4.0)
Requirement already satisfied: requests==2.31.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 25)) (2.31.0)
Requirement already satisfied: rsa==4.9 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 26)) (4.9)
Requirement already satisfied: six==1.16.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 27)) (1.16.0)
Requirement already satisfied: soupsieve==2.5 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 28)) (2.5)
Requirement already satisfied: urllib3==2.2.1 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 29)) (2.2.1)
Requirement already satisfied: google-cloud-bigquery==3.25.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 40)) (3.25.0)
Collecting numpy==2.0.0 (from -r ../requirements.txt (line 51))
Downloading numpy-2.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
[2K [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.9/60.9 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m[31m1.1 MB/s[0m eta [36m0:00:01[0m
[?25hCollecting packaging==24.1 (from -r ../requirements.txt (line 52))
Downloading packaging-24.1-py3-none-any.whl.metadata (3.2 kB)
Requirement already satisfied: pandas==2.2.2 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 53)) (2.2.2)
Requirement already satisfied: pyarrow==16.1.0 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 56)) (16.1.0)
Collecting python-dateutil==2.9.0.post0 (from -r ../requirements.txt (line 59))
Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl.metadata (8.4 kB)
Requirement already satisfied: pytz==2024.1 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 60)) (2024.1)
Requirement already satisfied: tzdata==2024.1 in /home/cobra/.config/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages (from -r ../requirements.txt (line 65)) (2024.1)
Downloading numpy-2.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.0 MB)
[2K [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.0/19.0 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
[?25hDownloading packaging-24.1-py3-none-any.whl (53 kB)
[2K [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.0/54.0 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
[2K [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m229.9/229.9 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m
[?25hInstalling collected packages: python-dateutil, packaging, numpy
Attempting uninstall: python-dateutil
Found existing installation: python-dateutil 2.9.0
Uninstalling python-dateutil-2.9.0:
Successfully uninstalled python-dateutil-2.9.0
Attempting uninstall: packaging
Found existing installation: packaging 24.0
Uninstalling packaging-24.0:
Successfully uninstalled packaging-24.0
Attempting uninstall: numpy
Found existing installation: numpy 1.26.4
Uninstalling numpy-1.26.4:
Successfully uninstalled numpy-1.26.4
Successfully installed numpy-2.0.0 packaging-24.1 python-dateutil-2.9.0.post0
Note: you may need to restart the kernel to use updated packages.
Write out the Script
Steps
- Initialize the Google Logging Service
- Initialize The Google Cloud Storage Service
- Initialize the Bigquery Client
- Grab a json blob
- Process the blob
- Move the blob to a processed bucket
Initialize The Google Cloud Storage Service
I created a Gloud Service Client Class available at : https://github.com/justin-napolitano/gcputils/blob/bc421debf4c828522580ec79ab634b2e2bf402a4/GoogleCloudLogging.py
It is imported below and tested below. Note that cli specific arguments are commented out for testing in ipynb.
# loc_flattener.py
# library_of_congress_scraper.py
from __future__ import print_function
from gcputils.gcpclient import GCSClient
from gcputils.GoogleCloudLogging import GoogleCloudLogging
from gcputils.BigQueryClient import BigQueryClient
from bs4 import BeautifulSoup
import requests
import json
import os
import time
from pprint import pprint
import html
from flatten_json import flatten
import google.cloud.logging
import logging
import argparse
import pandas as pd
from google.cloud import bigquery
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def main():
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
if __name__ == "__main__":
main()
Initialize the Google Cloud Storage Client
The Google Cloud Storage Client is available at https://github.com/justin-napolitano/gcputils/blob/bc421debf4c828522580ec79ab634b2e2bf402a4/gcpclient.py
Calling the client and listing the buckets to test below
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
def main():
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
if __name__ == "__main__":
main()
trying creds file
Buckets: ['loc-scraper', 'loc_flattener_processed', 'processed_results', 'smart-axis-421517_cloudbuild']
Access the Blobs within the bucket
Now I need to grab a blob from the bucket. IN this case I just want to grab one from the top of the heap without pulling a lot of data into context.
Addition to the storage class
def list_blobs(self, bucket_name):
"""
Lists all blobs in the specified bucket in Google Cloud Storage.
Args:
bucket_name (str): Name of the bucket.
Returns:
list: A list of blob names.
"""
# Get the bucket
bucket = self.client.bucket(bucket_name)
# List all blobs in the bucket
blobs = list(bucket.list_blobs())
blob_names = [blob.name for blob in blobs]
return blob_names
def pop_blob(self, bucket_name):
"""
Selects and removes the first blob from the specified bucket in Google Cloud Storage.
Args:
bucket_name (str): Name of the bucket.
Returns:
google.cloud.storage.blob.Blob: The first blob from the bucket.
"""
# Get the bucket
bucket = self.client.bucket(bucket_name)
# List all blobs in the bucket
blobs = list(bucket.list_blobs())
if not blobs:
print(f"No blobs found in bucket '{bucket_name}'.")
return None
# Get the first blob
first_blob = blobs[0]
print(f"First blob selected: {first_blob.name}")
return first_blob
Test Run
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
def main():
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
# List Buckets for testing
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
# Grab A blob from the heap
first_blob = gcs_client.pop_blob(bucket_name)
if first_blob:
print(f"First blob name: {first_blob.name}")
if __name__ == "__main__":
main()
trying creds file
Buckets: ['loc-scraper', 'loc_flattener_processed', 'processed_results', 'smart-axis-421517_cloudbuild']
First valid blob selected: last_page.txt
First blob name: last_page.txt
Some additions to avoid last_page.txt
So there is a last page.txt that is used by the scraper program. I want to pass some regex patterns to exclude in the pop_blob method
def pop_blob(self, bucket_name, patterns_file = None):
"""
Selects and removes the first blob from the specified bucket in Google Cloud Storage,
excluding any blobs that match patterns from the provided file.
Args:
bucket_name (str): Name of the bucket.
patterns_file (str, optional): Path to the file containing regex patterns to exclude.
Returns:
google.cloud.storage.blob.Blob: The first blob from the bucket that doesn't match any pattern.
"""
# Load regex patterns from file
patterns = []
if patterns_file:
with open(patterns_file, 'r') as file:
patterns = [line.strip() for line in file]
# Get the bucket
bucket = self.client.bucket(bucket_name)
# List all blobs in the bucket
blobs = list(bucket.list_blobs())
if not blobs:
print(f"No blobs found in bucket '{bucket_name}'.")
return None
# Filter blobs based on regex patterns
for blob in blobs:
if not any(re.search(pattern, blob.name) for pattern in patterns):
print(f"First valid blob selected: {blob.name}")
return blob
print("No valid blobs found after applying regex patterns.")
return None
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
def main():
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
patterns_file = os.getenv('PATTERNS_FILE', 'exclude.txt')
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
# List Buckets for testing
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
# Grab A blob from the heap
first_blob = gcs_client.pop_blob(bucket_name,patterns_file )
if first_blob:
print(f"First blob name: {first_blob.name}")
if __name__ == "__main__":
main()
trying creds file
Buckets: ['loc-scraper', 'loc_flattener_processed', 'processed_results', 'smart-axis-421517_cloudbuild']
First valid blob selected: result-10.json
First blob name: result-10.json
Download the data
Now I need to process the information. First off i need to grab the data from the blob
def download_blob_to_memory(self, bucket_name, blob_name):
"""
Downloads a blob from the specified bucket to memory.
Args:
bucket_name (str): Name of the bucket.
blob_name (str): Name of the blob to download.
Returns:
string: The string content of the blob.
"""
# Get the bucket
bucket = self.client.bucket(bucket_name)
# Get the blob
blob = bucket.blob(blob_name)
# Download the blob to a string
blob_data = blob.download_as_string()
# Parse the JSON content
# json_content = json.loads(blob_data)
print(f"Blob '{blob_name}' downloaded to memory.")
return blob_data
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
def main():
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
patterns_file = os.getenv('PATTERNS_FILE', 'exclude.txt')
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
# List Buckets for testing
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
# Grab A blob from the heap
first_blob = gcs_client.pop_blob(bucket_name,patterns_file )
if first_blob:
print(f"First blob name: {first_blob.name}")
#download to memory
blob_data = gcs_client.download_blob_to_memory(bucket_name, first_blob.name)
json_data = json.loads(blob_data)
print(blob_data[0:100])
# create_gcs_bucket(gcs_client, bucket_name)
if __name__ == "__main__":
main()
trying creds file
Buckets: ['loc-scraper', 'loc_flattener_processed', 'processed_results', 'smart-axis-421517_cloudbuild']
First valid blob selected: result-10.json
First blob name: result-10.json
Blob 'result-10.json' downloaded to memory.
b'{"breadcrumbs": [{"Library of Congress": "https://www.loc.gov"}, {"Digital Collections": "https://ww'
Flatten and process the JSON
There is a ton of information in the json. I need to explore it.
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
patterns_file = os.getenv('PATTERNS_FILE', 'exclude.txt')
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
# List Buckets for testing
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
# Grab A blob from the heap
first_blob = gcs_client.pop_blob(bucket_name,patterns_file )
if first_blob:
print(f"First blob name: {first_blob.name}")
#download to memory
blob_data = gcs_client.download_blob_to_memory(bucket_name, first_blob.name)
json_data = json.loads(blob_data)
print(blob_data[0:100])
# create_gcs_bucket(gcs_client, bucket_name)
trying creds file
Buckets: ['loc-scraper', 'loc_flattener_processed', 'processed_results', 'smart-axis-421517_cloudbuild']
First valid blob selected: result-10.json
First blob name: result-10.json
Blob 'result-10.json' downloaded to memory.
b'{"breadcrumbs": [{"Library of Congress": "https://www.loc.gov"}, {"Digital Collections": "https://ww'
json_data.keys()
dict_keys(['breadcrumbs', 'browse', 'categories', 'content', 'content_is_post', 'expert_resources', 'facet_trail', 'facet_views', 'facets', 'form_facets', 'next', 'next_sibling', 'options', 'original_formats', 'pages', 'pagination', 'partof', 'previous', 'previous_sibling', 'research-centers', 'results', 'search', 'shards', 'site_type', 'subjects', 'timestamp', 'title', 'topics', 'views'])
json_data["results"][0]
{'access_restricted': False,
'aka': ['http://www.loc.gov/item/usrep308213/',
'http://www.loc.gov/resource/usrep.usrep308213/',
'http://www.loc.gov/item/usrep.usrep308213/'],
'campaigns': [],
'contributor': ['stone, harlan fiske', 'supreme court of the united states'],
'date': '1939',
'dates': ['1939'],
'digitized': True,
'extract_timestamp': '2023-12-04T18:41:50.547Z',
'group': ['usrep103', 'us-report'],
'hassegments': False,
'id': 'http://www.loc.gov/item/usrep308213/',
'image_url': ['https://tile.loc.gov/storage-services/service/ll/usrep/usrep308/usrep308213/usrep308213.gif#h=150&w=100'],
'index': 631,
'item': {'call_number': ['Call Number: KF101',
'Series: Administrative Law',
'Series: Volume 308'],
'contributors': ['Stone, Harlan Fiske (Judge)',
'Supreme Court of the United States (Author)'],
'created_published': ['1939'],
'date': '19390000',
'format': 'periodical',
'genre': ['Periodical'],
'language': ['eng'],
'notes': ['Description: U.S. Reports Volume 308; October Term, 1939; Union Stock Yard & Transit Co. v. United States et al.'],
'rights': 'no known restrictions on use or reproduction',
'source_collection': 'U.S. Reports',
'subjects': ['Livestock',
'Law',
'Railroads',
'Law Library',
'Supreme Court',
'United States',
'Government Documents',
'Judicial review and appeals',
'Agency',
'Tariffs',
'Interstate commerce',
'Administrative law and regulatory procedure',
'U.S. Reports',
'Common law',
'Court opinions',
'Judicial decisions',
'Court cases',
'Court decisions',
'Interstate Commerce Commission (I.C.C.)',
'Agency jurisdiction',
'Periodical'],
'title': 'U.S. Reports: Union Stock Yard Co. v. U.S., 308 U.S. 213 (1939).'},
'language': ['english'],
'mime_type': ['image/gif', 'application/pdf'],
'online_format': ['image', 'pdf'],
'original_format': ['periodical'],
'other_title': [],
'partof': ['u.s. reports: volume 308',
'u.s. reports: administrative law',
'law library of congress',
'united states reports (official opinions of the u.s. supreme court)'],
'resources': [{'files': 1,
'image': 'https://tile.loc.gov/storage-services/service/ll/usrep/usrep308/usrep308213/usrep308213.gif',
'pdf': 'https://tile.loc.gov/storage-services/service/ll/usrep/usrep308/usrep308213/usrep308213.pdf',
'url': 'https://www.loc.gov/resource/usrep.usrep308213/'}],
'shelf_id': 'Call Number: KF101 Series: Administrative Law Series: Volume 308',
'subject': ['administrative law',
'livestock',
'railroads',
'supreme court',
'united states',
'court opinions',
'periodical',
'agency',
'interstate commerce',
'court cases',
'judicial decisions',
'law library',
'interstate commerce commission (i.c.c.)',
'judicial review and appeals',
'government documents',
'administrative law and regulatory procedure',
'law',
'common law',
'court decisions',
'u.s. reports',
'tariffs',
'agency jurisdiction'],
'subject_major_case_topic': ['administrative law'],
'timestamp': '2023-12-04T19:05:12.397Z',
'title': 'U.S. Reports: Union Stock Yard Co. v. U.S., 308 U.S. 213 (1939).',
'type': ['periodical'],
'url': 'https://www.loc.gov/item/usrep308213/'}
Use Pandas to normalize the data
# Flatten the JSON content
df_main = pd.json_normalize(json_data["results"][0])
# Normalize nested structures
item_data = json_data["results"][0]['item']
resources_data = json_data["results"][0]['resources']
df_item = pd.json_normalize(item_data)
df_item['id'] = json_data["results"][0]['id'] # Add 'id' for joining
df_resources = pd.json_normalize(resources_data)
df_resources['id'] = json_data["results"][0]['id'] # Add 'id' for joining
df_item
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
df_resources
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
df_main
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
# df_call_number = pd.DataFrame(df_item["call_number"], columns=['call_number'])
# df_call_number['id'] = json_data["results"][0]['id'] # Add 'id' for joining
call_numbers = item_data.get('call_number', [])
df_call_number = pd.DataFrame(call_numbers, columns=['call_number'])
df_call_number['id'] = json_data["results"][0]['id'] # Add 'id' for joining
df_call_number
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
subjects = item_data.get('subjects', [])
df_subjects = pd.DataFrame(subjects, columns=['subjects'])
df_subjects['id'] = json_data["results"][0]['id'] # Add 'id' for joining
df_subjects
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
notes = item_data.get('notes', [])
df_notes = pd.DataFrame(notes, columns=['notes'])
df_notes['id'] = json_data["results"][0]['id'] # Add 'id' for joining
df_notes
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
Putting it together
def normalize_main(result):
df_main = pd.json_normalize(result)
return df_main
def normalize_item(result):
item_data = result['item']
df_item = pd.json_normalize(item_data)
df_item['id'] = result['id'] # Add 'id' for joining
return df_item
def normalize_resources(result):
resources_data = result['resources']
df_resources = pd.json_normalize(resources_data)
df_resources['id'] = result['id'] # Add 'id' for joining
return df_resources
def normalize_call_numbers(result):
item_data = result['item']
call_numbers = item_data.get('call_number', [])
df_call_number = pd.DataFrame(call_numbers, columns=['call_number'])
df_call_number['id'] = result['id'] # Add 'id' for joining
return df_call_number
def normalize_contributors(result):
item_data = result['item']
contributors = item_data.get('contributors', [])
df_contributors = pd.DataFrame(contributors, columns=['contributors'])
df_contributors['id'] = result['id'] # Add 'id' for joining
return df_contributors
def normalize_subjects(result):
item_data = result['item']
subjects = item_data.get('subjects', [])
df_subjects = pd.DataFrame(subjects, columns=['subjects'])
df_subjects['id'] = result['id'] # Add 'id' for joining
return df_subjects
def normalize_notes(result):
item_data = result['item']
notes = item_data.get('notes', [])
df_notes = pd.DataFrame(notes, columns=['notes'])
df_notes['id'] = result['id'] # Add 'id' for joining
return df_notes
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def initialize_bq_client(project_id,credentials_path=None):
return BigQueryClient(project_id,credentials_path = credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
def main():
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
patterns_file = os.getenv('PATTERNS_FILE', 'exclude.txt')
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
# List Buckets for testing
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
# initialize bq
bq_client = initialize_bq_client(project_id,credentials_path)
# Grab A blob from the heap
first_blob = gcs_client.pop_blob(bucket_name,patterns_file )
if first_blob:
print(f"First blob name: {first_blob.name}")
#download to memory
blob_data = gcs_client.download_blob_to_memory(bucket_name, first_blob.name)
json_data = json.loads(blob_data)
print(blob_data[0:100])
# create_gcs_bucket(gcs_client, bucket_name)
results = json_data["results"]
# Initialize lists to hold DataFramesdf_notes
df_main_list = []
df_item_list = []
df_resources_list = []
df_call_number_list = []
df_contributors_list = []
df_subjects_list = []
df_notes_list = []
for result in results:
df_main_list.append(normalize_main(result))
df_item_list.append(normalize_item(result))
df_resources_list.append(normalize_resources(result))
df_call_number_list.append(normalize_call_numbers(result))
df_contributors_list.append(normalize_contributors(result))
df_subjects_list.append(normalize_subjects(result))
df_notes_list.append(normalize_notes(result))
# Concatenate all DataFrames
df_main = pd.concat(df_main_list, ignore_index=True)
df_item = pd.concat(df_item_list, ignore_index=True)
df_resources = pd.concat(df_resources_list, ignore_index=True)
df_call_number = pd.concat(df_call_number_list, ignore_index=True)
df_contributors = pd.concat(df_contributors_list, ignore_index=True)
df_subjects = pd.concat(df_subjects_list, ignore_index=True)
df_notes = pd.concat(df_notes_list, ignore_index=True)
if __name__ == "__main__":
main()
trying creds file
Buckets: ['loc-scraper', 'loc_flattener_processed', 'processed_results', 'smart-axis-421517_cloudbuild']
First valid blob selected: result-10.json
First blob name: result-10.json
Blob 'result-10.json' downloaded to memory.
b'{"breadcrumbs": [{"Library of Congress": "https://www.loc.gov"}, {"Digital Collections": "https://ww'
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def initialize_bq_client(project_id,credentials_path=None):
return BigQueryClient(project_id,credentials_path = credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
def main():
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
main_table_id = "results_staging"
item_table_id = "items_staging"
resources_table_id = "resources_staging"
call_number_table_id = "call_numbers_staging"
contributors_table_id = "contributors_staging"
subjects_table_id = "subjects_staging"
notes_table_id = "notes_staging"
dataset_id = "supreme_court"
patterns_file = os.getenv('PATTERNS_FILE', 'exclude.txt')
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
# List Buckets for testing
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
bq_client = initialize_bq_client(project_id,credentials_path)
# Create the dataset if not exists
bq_client.create_dataset(dataset_id)
# Grab A blob from the heap
first_blob = gcs_client.pop_blob(bucket_name,patterns_file )
if first_blob:
print(f"First blob name: {first_blob.name}")
#download to memory
blob_data = gcs_client.download_blob_to_memory(bucket_name, first_blob.name)
json_data = json.loads(blob_data)
# print(blob_data[0:100])
# create_gcs_bucket(gcs_client, bucket_name)
results = json_data["results"]
# Initialize lists to hold DataFrames
df_main_list = []
df_item_list = []
df_resources_list = []
df_call_number_list = []
df_contributors_list = []
df_subjects_list = []
df_notes_list = []
for result in results:
df_main_list.append(normalize_main(result))
df_item_list.append(normalize_item(result))
df_resources_list.append(normalize_resources(result))
df_call_number_list.append(normalize_call_numbers(result))
df_contributors_list.append(normalize_contributors(result))
df_subjects_list.append(normalize_subjects(result))
df_notes_list.append(normalize_notes(result))
# Concatenate all DataFrames
df_main = pd.concat(df_main_list, ignore_index=True)
df_item = pd.concat(df_item_list, ignore_index=True)
df_resources = pd.concat(df_resources_list, ignore_index=True)
df_call_number = pd.concat(df_call_number_list, ignore_index=True)
df_contributors = pd.concat(df_contributors_list, ignore_index=True)
df_subjects = pd.concat(df_subjects_list, ignore_index=True)
df_notes = pd.concat(df_notes_list, ignore_index=True)
# Define the BigQuery table schema
# main_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_main.columns]
item_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_item.columns]
resources_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_resources.columns]
call_number_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_call_number.columns]
contributors_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_contributors.columns]
subjects_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_subjects.columns]
notes_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_notes.columns]
# Create BigQuery tables
# bq_client.create_table(dataset_id, main_table_id, main_schema)
bq_client.create_table(dataset_id, item_table_id, item_schema)
bq_client.create_table(dataset_id, resources_table_id, resources_schema)
bq_client.create_table(dataset_id, call_number_table_id, call_number_schema)
bq_client.create_table(dataset_id, contributors_table_id, contributors_schema)
bq_client.create_table(dataset_id, subjects_table_id, subjects_schema)
bq_client.create_table(dataset_id, notes_table_id, notes_schema)
# Load DataFrames into BigQuery tables
# bq_client.load_dataframe_to_table(dataset_id, main_table_id, df_main)
bq_client.load_dataframe_to_table(dataset_id, item_table_id, df_item)
bq_client.load_dataframe_to_table(dataset_id, resources_table_id, df_resources)
bq_client.load_dataframe_to_table(dataset_id, call_number_table_id, df_call_number)
bq_client.load_dataframe_to_table(dataset_id, contributors_table_id, df_contributors)
bq_client.load_dataframe_to_table(dataset_id, subjects_table_id, df_subjects)
bq_client.load_dataframe_to_table(dataset_id, notes_table_id, df_notes)
if __name__ == "__main__":
main()
trying creds file
Buckets: ['loc-scraper', 'loc_flattener_processed', 'processed_results', 'smart-axis-421517_cloudbuild']
Dataset supreme_court created.
First valid blob selected: result-10.json
First blob name: result-10.json
Blob 'result-10.json' downloaded to memory.
Table items_staging created in dataset supreme_court.
Table resources_staging created in dataset supreme_court.
Table call_numbers_staging created in dataset supreme_court.
Table contributors_staging created in dataset supreme_court.
Table subjects_staging created in dataset supreme_court.
Table notes_staging created in dataset supreme_court.
Loaded 70 rows into supreme_court:items_staging.
Loaded 70 rows into supreme_court:resources_staging.
Loaded 210 rows into supreme_court:call_numbers_staging.
Loaded 138 rows into supreme_court:contributors_staging.
Loaded 1570 rows into supreme_court:subjects_staging.
Loaded 70 rows into supreme_court:notes_staging.
Moving the Processed blobs to a Processed Bucket
Add code to the GCS Client to enable deleting and copying
def copy_blob(self, source_bucket_name, source_blob_name, destination_bucket_name, destination_blob_name):
"""
Copies a blob from one bucket to another.
Args:
source_bucket_name (str): Name of the source bucket.
source_blob_name (str): Name of the source blob.
destination_bucket_name (str): Name of the destination bucket.
destination_blob_name (str): Name of the destination blob.
Returns:
google.cloud.storage.blob.Blob: The copied blob.
"""
source_bucket = self.client.bucket(source_bucket_name)
source_blob = source_bucket.blob(source_blob_name)
destination_bucket = self.client.bucket(destination_bucket_name)
blob_copy = source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name)
return blob_copy
def delete_blob(self, bucket_name, blob_name):
"""
Deletes a blob from the specified bucket.
Args:
bucket_name (str): Name of the bucket.
blob_name (str): Name of the blob to delete.
"""
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.delete()
Add a couple lines to the main script to call the new methods
# Move the blob to the processed_results bucket
gcs_client.copy_blob(bucket_name, first_blob.name, processed_bucket_name, first_blob.name)
gcs_client.delete_blob(bucket_name, first_blob.name)
print(f"Blob {first_blob.name} moved to {processed_bucket_name} and deleted from {bucket_name}")
#### Testing
# loc_flattener.py
# library_of_congress_scraper.py
# loc_flattener.py
# library_of_congress_scraper.py
from __future__ import print_function
from gcputils.gcpclient import GCSClient
from gcputils.GoogleCloudLogging import GoogleCloudLogging
from gcputils.BigQueryClient import BigQueryClient
from bs4 import BeautifulSoup
import requests
import json
import os
import time
from pprint import pprint
import html
from flatten_json import flatten
import google.cloud.logging
import logging
import argparse
import pandas as pd
from google.cloud import bigquery
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_bq_client(project_id,credentials_path=None):
return BigQueryClient(project_id,credentials_path = credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
def create_gcs_bucket(client, bucket_name):
try:
bucket = client.create_bucket(bucket_name=bucket_name)
logging.info(bucket)
print(bucket)
except Exception as e:
logging.error(f"Error creating bucket: {e}")
def normalize_main(result):
df_main = pd.json_normalize(result)
return df_main
def normalize_item(result):
item_data = result['item']
df_item = pd.json_normalize(item_data)
df_item['id'] = result['id'] # Add 'id' for joining
return df_item
def normalize_resources(result):
resources_data = result['resources']
df_resources = pd.json_normalize(resources_data)
df_resources['id'] = result['id'] # Add 'id' for joining
return df_resources
def normalize_call_numbers(result):
item_data = result['item']
call_numbers = item_data.get('call_number', [])
df_call_number = pd.DataFrame(call_numbers, columns=['call_number'])
df_call_number['id'] = result['id'] # Add 'id' for joining
return df_call_number
def normalize_contributors(result):
item_data = result['item']
contributors = item_data.get('contributors', [])
df_contributors = pd.DataFrame(contributors, columns=['contributors'])
df_contributors['id'] = result['id'] # Add 'id' for joining
return df_contributors
def normalize_subjects(result):
item_data = result['item']
subjects = item_data.get('subjects', [])
df_subjects = pd.DataFrame(subjects, columns=['subjects'])
df_subjects['id'] = result['id'] # Add 'id' for joining
return df_subjects
def normalize_notes(result):
item_data = result['item']
notes = item_data.get('notes', [])
df_notes = pd.DataFrame(notes, columns=['notes'])
df_notes['id'] = result['id'] # Add 'id' for joining
return df_notes
def main():
# parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
# parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
# args = parser.parse_args()
main_table_id = "results_staging"
item_table_id = "items_staging"
resources_table_id = "resources_staging"
call_number_table_id = "call_numbers_staging"
contributors_table_id = "contributors_staging"
subjects_table_id = "subjects_staging"
notes_table_id = "notes_staging"
processed_bucket_name = "loc_flattener_processed"
dataset_id = "supreme_court"
patterns_file = os.getenv('PATTERNS_FILE', 'exclude.txt')
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
credentials_path = None
# if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id,credentials_path)
logging_client.setup_logging()
# List Buckets for testing
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
bq_client = initialize_bq_client(project_id,credentials_path)
# Create the dataset if not exists
bq_client.create_dataset(dataset_id)
# create the processed_bucket if not exists
# print(gcs_client.create_bucket(processed_bucket_name))
# Grab A blob from the heap
first_blob = gcs_client.pop_blob(bucket_name,patterns_file )
if first_blob:
print(f"First blob name: {first_blob.name}")
#download to memory
blob_data = gcs_client.download_blob_to_memory(bucket_name, first_blob.name)
json_data = json.loads(blob_data)
# print(blob_data[0:100])
# create_gcs_bucket(gcs_client, bucket_name)
results = json_data["results"]
# Initialize lists to hold DataFrames
df_main_list = []
df_item_list = []
df_resources_list = []
df_call_number_list = []
df_contributors_list = []
df_subjects_list = []
df_notes_list = []
for result in results:
df_main_list.append(normalize_main(result))
df_item_list.append(normalize_item(result))
df_resources_list.append(normalize_resources(result))
df_call_number_list.append(normalize_call_numbers(result))
df_contributors_list.append(normalize_contributors(result))
df_subjects_list.append(normalize_subjects(result))
df_notes_list.append(normalize_notes(result))
# Concatenate all DataFrames
df_main = pd.concat(df_main_list, ignore_index=True)
df_item = pd.concat(df_item_list, ignore_index=True)
df_resources = pd.concat(df_resources_list, ignore_index=True)
df_call_number = pd.concat(df_call_number_list, ignore_index=True)
df_contributors = pd.concat(df_contributors_list, ignore_index=True)
df_subjects = pd.concat(df_subjects_list, ignore_index=True)
df_notes = pd.concat(df_notes_list, ignore_index=True)
# Define the BigQuery table schema
# main_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_main.columns]
item_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_item.columns]
resources_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_resources.columns]
call_number_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_call_number.columns]
contributors_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_contributors.columns]
subjects_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_subjects.columns]
notes_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_notes.columns]
# Create BigQuery tables
# bq_client.create_table(dataset_id, main_table_id, main_schema)
bq_client.create_table(dataset_id, item_table_id, item_schema)
bq_client.create_table(dataset_id, resources_table_id, resources_schema)
bq_client.create_table(dataset_id, call_number_table_id, call_number_schema)
bq_client.create_table(dataset_id, contributors_table_id, contributors_schema)
bq_client.create_table(dataset_id, subjects_table_id, subjects_schema)
bq_client.create_table(dataset_id, notes_table_id, notes_schema)
# Load DataFrames into BigQuery tables
# bq_client.load_dataframe_to_table(dataset_id, main_table_id, df_main)
bq_client.load_dataframe_to_table(dataset_id, item_table_id, df_item)
bq_client.load_dataframe_to_table(dataset_id, resources_table_id, df_resources)
bq_client.load_dataframe_to_table(dataset_id, call_number_table_id, df_call_number)
bq_client.load_dataframe_to_table(dataset_id, contributors_table_id, df_contributors)
bq_client.load_dataframe_to_table(dataset_id, subjects_table_id, df_subjects)
bq_client.load_dataframe_to_table(dataset_id, notes_table_id, df_notes)
# Move the blob to the processed_results bucket
gcs_client.copy_blob(bucket_name, first_blob.name, processed_bucket_name, first_blob.name)
# gcs_client.delete_blob(bucket_name, first_blob.name)
print(f"Blob {first_blob.name} moved to {processed_bucket_name} and deleted from {bucket_name}")
if __name__ == "__main__":
main()
trying creds file
Buckets: ['loc-scraper', 'loc_flattener_processed', 'processed_results', 'smart-axis-421517_cloudbuild']
Dataset supreme_court created.
First valid blob selected: result-10.json
First blob name: result-10.json
Blob 'result-10.json' downloaded to memory.
Table items_staging created in dataset supreme_court.
Table resources_staging created in dataset supreme_court.
Table call_numbers_staging created in dataset supreme_court.
Table contributors_staging created in dataset supreme_court.
Table subjects_staging created in dataset supreme_court.
Table notes_staging created in dataset supreme_court.
Loaded 70 rows into supreme_court:items_staging.
Loaded 70 rows into supreme_court:resources_staging.
Loaded 210 rows into supreme_court:call_numbers_staging.
Loaded 138 rows into supreme_court:contributors_staging.
Loaded 1570 rows into supreme_court:subjects_staging.
Loaded 70 rows into supreme_court:notes_staging.
Blob result-10.json moved to loc_flattener_processed and deleted from loc-scraper
Add while true logic and clean up the script
import os
import pandas as pd
import argparse
import json
import logging
from google.cloud import bigquery
from gcputils.gcpclient import GCSClient
from gcputils.GoogleCloudLogging import GoogleCloudLogging
from gcputils.BigQueryClient import BigQueryClient
def initialize_gcs_client(project_id, credentials_path=None):
return GCSClient(project_id, credentials_path=credentials_path)
def initialize_google_cloud_logging_client(project_id, credentials_path=None):
return GoogleCloudLogging(project_id, credentials_path=credentials_path)
def initialize_bq_client(project_id, credentials_path=None):
return BigQueryClient(project_id, credentials_path=credentials_path)
def list_gcs_buckets(client):
try:
buckets = client.list_buckets()
print("Buckets:", buckets)
logging.info(f"Buckets: {buckets}")
except Exception as e:
logging.error(f"Error listing buckets: {e}")
def create_gcs_bucket(client, bucket_name):
try:
bucket = client.create_bucket(bucket_name=bucket_name)
logging.info(bucket)
print(bucket)
except Exception as e:
logging.error(f"Error creating bucket: {e}")
def normalize_main(result):
df_main = pd.json_normalize(result)
return df_main
def normalize_item(result):
item_data = result['item']
df_item = pd.json_normalize(item_data)
df_item['id'] = result['id'] # Add 'id' for joining
return df_item
def normalize_resources(result):
resources_data = result['resources']
df_resources = pd.json_normalize(resources_data)
df_resources['id'] = result['id'] # Add 'id' for joining
return df_resources
def normalize_call_numbers(result):
item_data = result['item']
call_numbers = item_data.get('call_number', [])
df_call_number = pd.DataFrame(call_numbers, columns=['call_number'])
df_call_number['id'] = result['id'] # Add 'id' for joining
return df_call_number
def normalize_contributors(result):
item_data = result['item']
contributors = item_data.get('contributors', [])
df_contributors = pd.DataFrame(contributors, columns=['contributors'])
df_contributors['id'] = result['id'] # Add 'id' for joining
return df_contributors
def normalize_subjects(result):
item_data = result['item']
subjects = item_data.get('subjects', [])
df_subjects = pd.DataFrame(subjects, columns=['subjects'])
df_subjects['id'] = result['id'] # Add 'id' for joining
return df_subjects
def normalize_notes(result):
item_data = result['item']
notes = item_data.get('notes', [])
df_notes = pd.DataFrame(notes, columns=['notes'])
df_notes['id'] = result['id'] # Add 'id' for joining
return df_notes
def create_tables_and_schemas(bq_client, bucket_name, patterns_file, gcs_client, dataset_id):
# Define the BigQuery table schema
main_table_id = "results_staging"
item_table_id = "items_staging"
resources_table_id = "resources_staging"
call_number_table_id = "call_numbers_staging"
contributors_table_id = "contributors_staging"
subjects_table_id = "subjects_staging"
notes_table_id = "notes_staging"
# Assuming the first blob provides a sample structure
sample_blob = gcs_client.pop_blob(bucket_name, patterns_file)
blob_data = gcs_client.download_blob_to_memory(bucket_name, sample_blob.name)
json_data = json.loads(blob_data)
result = json_data["results"][0] # Use the first result as a sample
df_main = normalize_main(result)
df_item = normalize_item(result)
df_resources = normalize_resources(result)
df_call_number = normalize_call_numbers(result)
df_contributors = normalize_contributors(result)
df_subjects = normalize_subjects(result)
df_notes = normalize_notes(result)
main_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_main.columns]
item_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_item.columns]
resources_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_resources.columns]
call_number_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_call_number.columns]
contributors_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_contributors.columns]
subjects_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_subjects.columns]
notes_schema = [bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING) for name in df_notes.columns]
# Create BigQuery tables
# bq_client.create_table(dataset_id, main_table_id, main_schema)
bq_client.create_table(dataset_id, item_table_id, item_schema)
bq_client.create_table(dataset_id, resources_table_id, resources_schema)
bq_client.create_table(dataset_id, call_number_table_id, call_number_schema)
bq_client.create_table(dataset_id, contributors_table_id, contributors_schema)
bq_client.create_table(dataset_id, subjects_table_id, subjects_schema)
bq_client.create_table(dataset_id, notes_table_id, notes_schema)
def process_blob(gcs_client, bq_client, bucket_name, processed_bucket_name, patterns_file, dataset_id):
main_table_id = "results_staging"
item_table_id = "items_staging"
resources_table_id = "resources_staging"
call_number_table_id = "call_numbers_staging"
contributors_table_id = "contributors_staging"
subjects_table_id = "subjects_staging"
notes_table_id = "notes_staging"
# Grab a blob from the heap
first_blob = gcs_client.pop_blob(bucket_name, patterns_file)
if not first_blob:
return False
print(f"Processing blob: {first_blob.name}")
# Download to memory
blob_data = gcs_client.download_blob_to_memory(bucket_name, first_blob.name)
json_data = json.loads(blob_data)
results = json_data["results"]
# Initialize lists to hold DataFrames
df_main_list = []
df_item_list = []
df_resources_list = []
df_call_number_list = []
df_contributors_list = []
df_subjects_list = []
df_notes_list = []
for result in results:
df_main_list.append(normalize_main(result))
df_item_list.append(normalize_item(result))
df_resources_list.append(normalize_resources(result))
df_call_number_list.append(normalize_call_numbers(result))
df_contributors_list.append(normalize_contributors(result))
df_subjects_list.append(normalize_subjects(result))
df_notes_list.append(normalize_notes(result))
# Concatenate all DataFrames
df_main = pd.concat(df_main_list, ignore_index=True)
df_item = pd.concat(df_item_list, ignore_index=True)
df_resources = pd.concat(df_resources_list, ignore_index=True)
df_call_number = pd.concat(df_call_number_list, ignore_index=True)
df_contributors = pd.concat(df_contributors_list, ignore_index=True)
df_subjects = pd.concat(df_subjects_list, ignore_index=True)
df_notes = pd.concat(df_notes_list, ignore_index=True)
# Load DataFrames into BigQuery tables
# bq_client.load_dataframe_to_table(dataset_id, main_table_id, df_main)
bq_client.load_dataframe_to_table(dataset_id, item_table_id, df_item)
bq_client.load_dataframe_to_table(dataset_id, resources_table_id, df_resources)
bq_client.load_dataframe_to_table(dataset_id, call_number_table_id, df_call_number)
bq_client.load_dataframe_to_table(dataset_id, contributors_table_id, df_contributors)
bq_client.load_dataframe_to_table(dataset_id, subjects_table_id, df_subjects)
bq_client.load_dataframe_to_table(dataset_id, notes_table_id, df_notes)
# Move the blob to the processed_results bucket
gcs_client.copy_blob(bucket_name, first_blob.name, processed_bucket_name, first_blob.name)
gcs_client.delete_blob(bucket_name, first_blob.name)
print(f"Blob {first_blob.name} moved to {processed_bucket_name} and deleted from {bucket_name}")
return True
def main():
parser = argparse.ArgumentParser(description='Run the script locally or in the cloud.')
parser.add_argument('--local', action='store_true', help='Run the script locally with credentials path')
args = parser.parse_args()
dataset_id = "supreme_court"
patterns_file = os.getenv('PATTERNS_FILE', 'exclude.txt')
project_id = os.getenv('GCP_PROJECT_ID', 'smart-axis-421517')
bucket_name = os.getenv('BUCKET_NAME', 'loc-scraper')
processed_bucket_name = "processed_results"
credentials_path = None
if args.local:
credentials_path = os.getenv('GCP_CREDENTIALS_PATH', 'secret.json')
# Initialize logging
logging_client = initialize_google_cloud_logging_client(project_id, credentials_path)
logging_client.setup_logging()
# List Buckets for testing
gcs_client = initialize_gcs_client(project_id, credentials_path)
list_gcs_buckets(gcs_client)
# Create the processed_results bucket if not exists
# gcs_client.create_bucket(processed_bucket_name)
bq_client = initialize_bq_client(project_id, credentials_path)
# Create the dataset if not exists
bq_client.create_dataset(dataset_id)
# Create tables and schemas
create_tables_and_schemas(bq_client, bucket_name, patterns_file, gcs_client, dataset_id)
# def create_tables_and_schemas(bq_client, bucket_name, patterns_file, gcs_client, dataset_id):
# Process blobs in a loop
while process_blob(gcs_client, bq_client, bucket_name, processed_bucket_name, patterns_file, dataset_id):
print("Processed a blob, checking for more...")
if __name__ == "__main__":
main()
GCP Cloud Run
I want this to run autonomously for me on GCP
To do this I will need to
- Create a DockerFile
- Build the image on gcp
- create a job to run it
Create the DockerFile
My Dockerfile looks something like this. Ignore the quickstart code thta i’ve commente dout. I use that as a reference.
# # Use the Alpine Linux base image
# FROM alpine:latest
# # Set the working directory inside the container
# WORKDIR /app
# # Copy a simple script that prints "Hello, World!" into the container
# COPY /src/hello.sh .
# # Make the script executable
# RUN chmod +x hello.sh
# # Define the command to run when the container starts
# CMD ["./hello.sh"]
# Use the official Python image from Docker Hub
FROM python:3.10-slim
# Set the working directory in the container
WORKDIR /app
# Copy the current directory contents into the container at /app
COPY ./src /app
COPY requirements.txt /app
# Install any needed dependencies specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Run the Python script when the container launches
CMD ["python", "loc_scraper.py"]
Using Cloudbuild
I want to automate the entire build and deploy process by passing the steps to google’s cloud build service.
My file looks like this…
steps:
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'gcr.io/$PROJECT_ID/$IMAGE_NAME', '.']
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/$PROJECT_ID/$IMAGE_NAME']
- name: 'gcr.io/cloud-builders/gcloud'
args: ['run', 'deploy', '$SERVICE_NAME',
'--image', 'gcr.io/$PROJECT_ID/$IMAGE_NAME',
'--platform', 'managed',
'--region', '$REGION',
'--allow-unauthenticated']
substitutions:
_PROJECT_ID: 'smart-axis-421517'
_IMAGE_NAME: 'loc-flattener-image'
_SERVICE_NAME: 'loc-flattener'
_REGION: 'us-west2' # e.g., us-central1
timeout: '1200s'
Submit the build
To sbumit the build run the following from the cli or save to as script.
gcloud builds submit --config cloudbuild.yaml .