Conduct Legal Research with AI: Part 3
Testing TensorFlow models against the SCDB data set.
pythonlegaljsonscotusautomationmachine-learningaiml
7 Minutes, 31 Seconds
2022-05-18 14:40 +0000
Conduct Legal Research with AI: Part 3
This is the third post in a series documenting the process of building an ml pipeline that will be used to train models to predict the outcomes of Supreme Court Cases.
You can find the others at:
Introduction
In this post, I will be testing a sample TensorFlow pipeline against the Supreme Court Database maintained by the Washington University Law School in order to build a proof of concept model for a Supreme Court Graph Analysis project.
The Data Set
The data set is fairly limited. There are only about 250 issue areas to train against. It also does not include doctrinal trees to train an algorithm to predict when the court will break from precedent.
The data records primarily the voting behavior of justices, the policy direction of the outcome, and the political ideology score of the outcome.
Policy direction and political ideology are factors coded by individuals that may be prone to error an bias. In my analysis I will test against the findings of this model to determine which factors are most powerful.
The Model
I tested against case centric and justice centric data orientations.
Case Centric Model
The first is case centric. The power of this model is fairly limited, as it attempts to discern the outcome of a case from the aggregate behavior of the current justices on the bench. It essentially creates a score that is modified by each case outcome to determine if the court will vote conservatively or liberally towards a case of regarding a specific issue area. It is important to note that the database limits each case to one overarching issue area.
Case Centric Training Features
- issue
- issueArea
- naturalCourt
- petitioner
- petitionerState
- respondent
- jurisdiction
- adminAction
- adminActionState
- threeJudgeFdc
- caseOrigin
- caseOriginState
- respondentState
- caseSource
- caseSourceState
- lawType
- lawSupp
Case Centric Results
The models were able to predict the outcomes about 75 percent of the time following 1,000 epochs of training. It is likely that factors other than previous voting behavior determine the outcome of the case.
Justice Centric Model
The second is justice centric. The predictive power of this model, should in theory be greater as it attempts to determine the likely vote of an individual justice against an issue area. Again this is limited as only the overarching issue area is available in the data. It may very well prove that justices may favor free speech in most cases, except when the military is the target. The model could not predict that, because the data is limited.
Justice Centric Training Features
- justice
- issue
- issueArea
- lawType
- direction
- petitioner
- naturalCourt
- petitionerState
- respondent
- respondentState
- jurisdiction
- caseOrigin
- caseOriginState
- caseSource
- caseSourceState
- certReason
- lawSupp
Justice Centric Results
The justice centric pipeline results in accuracy exceeding 82 percent. This is better, but a model that could consider more factors and issue areas would likely improve the results.
Next Steps
I will integrate the models to my neo4j data set. As of now, the graph database exceeds 50,000 individuals nodes and over 400,000 relationships. It will allow me to train an algorithm that can determine a justices attitude towards, cases, clauses, sentences, states, petitioners, and any other feature integrated into the dataset.
The Code
Case Centric Training Pipeline
I’ve included the entire model below. Review my documentation site for more detail.
#supremeCourtPrediction.py
#a program that uses the segal and spaeth dataset to predict the outcomes of cases
import pandas as pd
import tensorflow as tf
import math
import seaborn as sns
import matplotlib.pyplot as plt
import shap
shap.initjs()
import os
import csv
import datetime
from tensorflow import feature_column
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from itertools import permutations
import matplotlib.pyplot as plt
def get_environmental_variables():
cwd = os.getcwd()
log_dir = os.path.join(cwd,"logs/fit/") + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
return cwd, log_dir
def load_data(cwd):
data_path = os.path.join(cwd,'data','citation.csv')
drop_path = os.path.join(cwd,'data','features','drop','drop.txt')
#print(path)
dataframe = pd.read_csv(data_path,encoding= 'unicode_escape')
#print(dataframe.head())
with open(drop_path) as fp:
for cnt, line in enumerate(fp):
#print("Dropping {} from dataframe".format(line))
line = line.strip('\n')
line = line.strip('\t')
line = line.strip("'")
dataframe.drop(line,axis=1,inplace=True)
dataframe.fillna(0, inplace=True)
print(dataframe)
#dataframe.issue = dataframe.astype({'issue': 'str'})
#dataframe[issue] = dataframe[issue].astype(str)
#print(dataframe)
#print(dataframe.head())
return dataframe
def split_datframe(dataframe):
train, test = train_test_split(dataframe, test_size=0.2)
train, val = train_test_split(train, test_size=0.2)
print(len(train), 'train examples')
print(len(val), 'validation examples')
print(len(test), 'test examples')
return train, test, val
# A utility method to create a tf.data dataset from a Pandas Dataframe
def df_to_dataset(dataframe, shuffle=True, batch_size=32):
dataframe = dataframe.copy()
labels = dataframe.pop('declarationUncon')
ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
if shuffle:
ds = ds.shuffle(buffer_size=len(dataframe))
ds = ds.batch(batch_size)
print(ds)
return ds
def get_input_pipeline(train, test, val, batch_size=32, shuffle=True):
train_ds = df_to_dataset(train, batch_size=batch_size)
val_ds = df_to_dataset(val, shuffle=False, batch_size=batch_size)
test_ds = df_to_dataset(test, shuffle=False, batch_size=batch_size)
return train_ds, val_ds, test_ds
def get_feature_layer(cwd): #select the columns for analysis from dataset
feature_path = os.path.join(cwd,'data','features','use','features.txt')
dict_path = os.path.join(cwd,'data','features','use')
feature_columns = []
#embedded columns
with open(feature_path) as fp:
for cnt, line in enumerate(fp):
#print("Adding {} to features".format(line))
line = line.strip('\n')
line = line.strip('\t')
line = line.strip("'")
print(cnt)
print(line)
indicator = feature_column.numeric_column(line)
#feature_column.categorical_column_with_vocabulary_file(
# key = line, vocabulary_file = os.path.join(dict_path,"{}.txt".format(line)), default_value=0)
print(indicator)
feature_columns.append(indicator)
#feature_columns.append(feature_column.embedding_column(indicator, dimension=8))
crossed_feature = feature_column.crossed_column(['issue', 'issueArea', 'naturalCourt'], hash_bucket_size=1000)
crossed_feature = feature_column.indicator_column(crossed_feature)
feature_columns.append(crossed_feature)
crossed_feature = feature_column.crossed_column(['issue', 'naturalCourt'], hash_bucket_size=1000)
crossed_feature = feature_column.indicator_column(crossed_feature)
feature_columns.append(crossed_feature)
crossed_feature = feature_column.crossed_column(['issueArea', 'naturalCourt'],hash_bucket_size=1000)
crossed_feature = feature_column.indicator_column(crossed_feature)
feature_columns.append(crossed_feature)
# court_buckets = feature_column.bucketized_column(naturalCourt, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
#print(feature_columns)
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
#for item in feature_columns:
# print(item)
# print("\n")
return feature_layer
#crossed cols
#categorical_columns
def understand_input_pipeline(train_ds):
for feature_batch, label_batch in train_ds.take(1):
print('Every feature:', list(feature_batch.keys()))
#print('A batch of ages:', feature_batch['age'])
print('A batch of targets:', label_batch )
def create_model(log_dir,feature_layer, train_ds, val_ds, epochs = 5):
model = tf.keras.Sequential([
feature_layer,
layers.Dense(128, activation='relu'),
layers.Dense(128, activation='relu'),
layers.Dense(1)
])
model.compile(optimizer='adam',
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
metrics=['accuracy', 'mae'])
history = model.fit(train_ds,
validation_data=val_ds,
epochs=epochs)
loss, accuracy,mae = model.evaluate(test_ds)
print("Accuracy", accuracy)
print(model.summary())
return model,history
def plot_history(history):
hist = pd.DataFrame(history.history)
print(hist)
hist['epoch'] = history.epoch
plt.figure()
plt.xlabel('Epoch')
plt.ylabel('Mean Absolute Error')
plt.plot(hist['epoch'], hist['mae'],
label='Train Error')
plt.plot(hist['epoch'], hist['val_mae'],
label = 'Val Error')
plt.legend()
def explain_kernal(df_train,model,train_ds):
# KernelExplainer is a general approach that can work with any ML framework
# Its inputs are the predictions and training data
# Summarize the training set to accelerate analysis
df_train_summary = shap.kmeans(df_train.values, 25)
# Instantiate an explainer with the model predictions and training data summary
explainer = shap.KernelExplainer(model.predict, df_train)
# Extract Shapley values from the explainer
#shap_values = explainer.shap_values(df_train.values)
if __name__ == "__main__":
cwd, log_dir = get_environmental_variables()
#print(cwd)
df = load_data(cwd)
train,test,val = split_datframe(df)
dataset = df_to_dataset(df)
train_ds, val_ds, test_ds = get_input_pipeline(train,test,val,32)
understand_input_pipeline(train_ds)
feature_layer = get_feature_layer(cwd)
model, model_history = create_model(log_dir,feature_layer, train_ds, val_ds)
plot_history(model_history)
#print(model.predict(train_ds))
#print(model.predict)
explain_kernal(df,model,train_ds)
The Justice Centric Training Pipeline
The entire pipeline is included below. Review the documentation for greater detail.
#supremeCourtPrediction.py
#a program that uses the segal and spaeth dataset to predict the outcomes of cases
import pandas as pd
import tensorflow as tf
import math
import seaborn as sns
import matplotlib.pyplot as plt
import shap
shap.initjs()
import os
import csv
import datetime
from tensorflow import feature_column
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from itertools import permutations
from itertools import combinations_with_replacement
import matplotlib.pyplot as plt
def get_environmental_variables():
cwd = os.getcwd()
log_dir = os.path.join(cwd,"logs/fit/") + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
return cwd, log_dir
def load_data(cwd):
data_path = os.path.join(cwd,'data','justice.csv')
drop_path = os.path.join(cwd,'data','features','drop','drop.txt')
target_path = os.path.join(cwd,'data','features','targets.txt')
print(data_path)
dataframe = pd.read_csv(data_path,encoding= 'unicode_escape')
#print(dataframe.head())
with open(drop_path) as fp:
for cnt, line in enumerate(fp):
#print("Dropping {} from dataframe".format(line))
line = line.strip('\n')
line = line.strip('\t')
line = line.strip("'")
dataframe.drop(line,axis=1,inplace=True)
dataframe.fillna(0, inplace=True)
print(dataframe)
#dataframe.issue = dataframe.astype({'issue': 'str'})
#dataframe[issue] = dataframe[issue].astype(str)
#print(dataframe)
#print(dataframe.head())
return dataframe
def split_datframe(dataframe):
train, test = train_test_split(dataframe, test_size=0.2)
train, val = train_test_split(train, test_size=0.2)
print(len(train), 'train examples')
print(len(val), 'validation examples')
print(len(test), 'test examples')
return train, test, val
# A utility method to create a tf.data dataset from a Pandas Dataframe
def df_to_dataset(dataframe, shuffle=True, batch_size=32):
dataframe = dataframe.copy()
labels = dataframe.pop('vote')
ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
if shuffle:
ds = ds.shuffle(buffer_size=len(dataframe))
ds = ds.batch(batch_size)
print(ds)
return ds
def get_input_pipeline(train, test, val, batch_size=32, shuffle=True):
train_ds = df_to_dataset(train, batch_size=batch_size)
val_ds = df_to_dataset(val, shuffle=False, batch_size=batch_size)
test_ds = df_to_dataset(test, shuffle=False, batch_size=batch_size)
return train_ds, val_ds, test_ds
def get_feature_layer(cwd): #select the columns for analysis from dataset
feature_path = os.path.join(cwd,'data','features','use','features.txt')
dict_path = os.path.join(cwd,'data','features','use')
feature_columns = []
feature_list = []
#embedded columns
with open(feature_path) as fp:
for cnt, line in enumerate(fp):
#print("Adding {} to features".format(line))
line = line.strip('\n')
line = line.strip('\t')
line = line.strip("'")
print(cnt)
print(line)
feature_list.append(line)
indicator = feature_column.numeric_column(line)
#feature_column.categorical_column_with_vocabulary_file(
# key = line, vocabulary_file = os.path.join(dict_path,"{}.txt".format(line)), default_value=0)
print(indicator)
feature_columns.append(indicator)
#feature_columns.append(feature_column.embedding_column(indicator, dimension=8))
feature_comb = combinations_with_replacement(feature_list, 2)
lst = list(feature_comb)
limit = len(feature_list)
print(limit)
j = 0
k = limit - 1
lookup = []
for i in range(limit):
lookup.append(i+j)
j = j + k
k = k - 1
for i in range(len(lst)):
if i in lookup:
continue
else:
one, two = lst[i]
crossed_feature = feature_column.crossed_column([one, two], hash_bucket_size=1000)
crossed_feature = feature_column.indicator_column(crossed_feature)
feature_columns.append(crossed_feature)
"""
crossed_feature = feature_column.crossed_column(['issue', 'naturalCourt'], hash_bucket_size=1000)
crossed_feature = feature_column.indicator_column(crossed_feature)
feature_columns.append(crossed_feature)
crossed_feature = feature_column.crossed_column(['issueArea', 'naturalCourt'],hash_bucket_size=1000)
crossed_feature = feature_column.indicator_column(crossed_feature)
feature_columns.append(crossed_feature)
"""
# court_buckets = feature_column.bucketized_column(naturalCourt, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
#print(feature_columns)
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
#for item in feature_columns:
# print(item)
# print("\n")
return feature_layer
#crossed cols
#categorical_columns
def understand_input_pipeline(train_ds):
for feature_batch, label_batch in train_ds.take(1):
print('Every feature:', list(feature_batch.keys()))
#print('A batch of ages:', feature_batch['age'])
print('A batch of targets:', label_batch )
def create_model(log_dir,feature_layer, train_ds, val_ds, epochs = 8):
model = tf.keras.Sequential([
feature_layer,
layers.Dense(128, activation='relu'),
layers.Dense(128, activation='relu'),
layers.Dense(1)
])
model.compile(optimizer='adam',
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
metrics=['accuracy', 'mae'])
history = model.fit(train_ds,
validation_data=val_ds,
epochs=epochs)
loss, accuracy,mae = model.evaluate(test_ds)
print("Accuracy", accuracy)
print(model.summary())
return model,history
def plot_history(history):
hist = pd.DataFrame(history.history)
print(hist)
hist['epoch'] = history.epoch
plt.figure()
plt.xlabel('Epoch')
plt.ylabel('Mean Absolute Error')
plt.plot(hist['epoch'], hist['mae'],
label='Train Error')
plt.plot(hist['epoch'], hist['val_mae'],
label = 'Val Error')
plt.legend()
def explain_kernal(model,train_ds):
model.predict(train_ds)
# KernelExplainer is a general approach that can work with any ML framework
# Its inputs are the predictions and training data
# Summarize the training set to accelerate analysis
df_train_summary = shap.kmeans(train_ds.values, 25)
#print(df_train_summary)
#model.predict(df_train_summary)
# Instantiate an explainer with the model predictions and training data summary
explainer = shap.KernelExplainer(model.predict, train_ds)
# Extract Shapley values from the explainer
#shap_values = explainer.shap_values(df_train.values)
if __name__ == "__main__":
cwd, log_dir = get_environmental_variables()
print(cwd)
df = load_data(cwd)
train,test,val = split_datframe(df)
dataset = df_to_dataset(df)
train_ds, val_ds, test_ds = get_input_pipeline(train,test,val,32)
understand_input_pipeline(train_ds)
feature_layer = get_feature_layer(cwd)
model, model_history = create_model(log_dir,feature_layer, train_ds, val_ds)
#plot_history(model_history)
#print(model.predict(train_ds))
#print(model.predict)
#explain_kernal(model,train_ds)