Add steam review fetching

This commit is contained in:
LunaChocken
2026-01-04 17:34:33 +00:00
parent 6e68fcf0cb
commit d980eecf81
3 changed files with 157 additions and 13 deletions
+2 -2
View File
@@ -49,7 +49,7 @@ class IGDB(TwitchOAuth):
super().__init__()
log.info("Initializing IGDB")
@cachier()
# @cachier()
@sleep_and_retry
@limits(calls=4, period=1)
def get_game_data(self, body="fields id,name,rating,category,rating_count"):
@@ -71,7 +71,7 @@ class IGDB(TwitchOAuth):
try:
js = json.loads(data.decode("utf-8"))[0]
if {'id','name','rating','rating_count'}.issubset(js):
return Game(js["id"], js["name"], js["rating"], js["rating_count"])
return dict({"id" : js["id"], "name": js["name"], "rating":js["rating"], "rating_count":js["rating_count"]})
else:
log.error("Missing Tags: %s" % js)
return None
+82
View File
@@ -0,0 +1,82 @@
import json
from cachier import cachier
import os
from steam_web_api import Steam
from dotenv import load_dotenv
import requests
load_dotenv()
def check_key():
return os.environ.get("STEAM_API_KEY") is not None
class SteamAPI:
def __init__(self):
if check_key() is False:
raise KeyError("Steam Key env Missing")
key = os.getenv("STEAM_KEY")
self.steam = Steam(key)
@cachier()
def search_game(self, query):
search = self.steam.apps.search_games(query)
# print(search)
return search
@cachier()
def game_details(self, game_id):
user = self.steam.apps.get_app_details(game_id)
if user is None:
return None
id=list(user.keys())[0]
user['data'] = user[id]['data']
del user[id]
return user
@cachier()
def get_reviews(self, game_id):
url = f"https://store.steampowered.com/appreviews/{game_id}?json=1"
r = requests.get(url).text
reviews = json.loads(r)
reviews = reviews['query_summary'] # dump review text
data = dict()
data['review_score'] = reviews['review_score']
data['review_score_desc'] = reviews['review_score_desc']
data['total_positive'] = reviews['total_positive']
data['total_negative'] = reviews['total_negative']
data['total_reviews'] = reviews['total_reviews']
return data
def search_details(self, query):
# search for game then get details
data = self.search_game(query)
if len(data['apps']) == 0:
return None
# print(data)
game = dict()
game['name'] = data['apps'][0]['name']
game['id'] = data['apps'][0]['id'][0]
game.update(self.game_details(game['id']))
game.update(self.get_reviews(game['id']))
return game
# print(self.game_details(game['id']))
# print(self.get_reviews(game['id']))
# Testing
if __name__ == "__main__":
from pprint import pprint
steam = SteamAPI()
steam.get_reviews.clear_cache()
steam.game_details.clear_cache()
steam.get_reviews.clear_cache()
pprint(steam.search_details("Batman"))
# print(search_details())
+73 -11
View File
@@ -1,56 +1,118 @@
import ftfy
import tqdm
from cachier import cachier
from libs.igdb import IGDB
import libs.epiclibrary as ep
from libs.steam import SteamAPI
from loguru import logger as log
# Set logger level
import sys
log.remove()
log.add(sys.stderr, level="INFO")
# IGDB = IGDB()
# game_title = "Sea of thieves"
# body = f"""search "{game_title}";\nfields id,name,rating,category;"""
# print(IGDB.get_game_data(body=body))
games = ep.EpicLibrary().fetch_games()
igdb = IGDB()
CLEAR_CACHE = False
# games = games[5:10]
igdb = IGDB()
steam = SteamAPI()
# @cachier()
def get_game(title):
def igdb_get_game(title):
body = f"""search "{title}";\nfields id,name,rating,category,rating_count;""".encode("utf-8")
return igdb.get_game_data(body=body)
def steam_get_game(title):
return steam.search_details(title)
def get_game_data(title):
# grabs from IGDB AND steam
igdb = igdb_get_game(title)
steam_response = steam.search_details(title)
game = dict()
try:
game.update(igdb)
except TypeError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
# log.error(f"Could not get game data for {title}. Full traceback: {exc_traceback, exc_value}")
try:
game.update(steam_response)
except TypeError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
log.error(f"STEAM could not get game data for {title}. Full traceback: {exc_traceback, exc_value}")
if "name" not in game:
game["name"] = title
return game
@cachier()
def scan_library():
log.info(f"Scanning {len(games)} games")
log.info(f"ppScanning {len(games)} games")
game_data = []
for game in games:
for game in tqdm.tqdm(games):
title = game['title']
log.info(f"Title: {title}")
game_data.append(get_game(title=title))
log.debug(f"Title: {title}")
game_data.append(get_game_data(title))
# game_data.append(igdb_get_game(title=title))
return game_data
def sort_games(game_data):
if game_data == None:
return 9999
return game_data.rating_count
if "rating_count" in game_data:
return game_data['rating_count']
elif "total_reviews" in game_data:
return game_data['total_reviews']
else:
return 9999
CLEAR_CACHE = False
if CLEAR_CACHE:
scan_library.clear_cache()
steam.search_game.clear_cache()
steam.game_details.clear_cache()
steam.get_reviews.clear_cache()
log.info("Cleared Cache")
games = scan_library()
games.sort(key=sort_games, reverse=True)
# sort games
# reorder data to be at the end
keys = list(games[0].keys())
keys.pop(keys.index("data"))
# keys.append("data")
log.info(f"Found {len(games)-games.count(None)}/{len(games)}. Errors: {games.count(None)}")
import csv
with open('games.csv', 'w', newline='\n') as csvfile:
with open('games.csv', 'w', newline='\n', errors='replace') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Title", "Rating", "Rating_Count"])
writer.writerow(keys)
# writer.writerow(["Title", "Rating", "Rating_Count"])
for game in games:
if game == None:
continue
writer.writerow([game.name, round(float(game.rating)), game.rating_count])
cleaned = game
if "id" not in cleaned:
cleaned["id"]= ""
row = [
cleaned.get("name", ""),
cleaned.get("id", ""),
cleaned.get("review_score", ""),
cleaned.get("review_score_desc", ""),
cleaned.get("total_positive", ""),
cleaned.get("total_negative", ""),
cleaned.get("total_reviews", ""),
]
writer.writerow(row)