166 lines
6.4 KiB
Python
166 lines
6.4 KiB
Python
import json, re, sys
|
|
from app.models import *
|
|
from app.tasks import celery
|
|
from app.utils import is_username_valid
|
|
import urllib.request
|
|
import gi
|
|
import PIL
|
|
import requests
|
|
import os
|
|
import sys
|
|
import inspect
|
|
import shutil
|
|
import urllib.request
|
|
from sqlalchemy.sql import func
|
|
from gi.repository import Gio
|
|
gi.require_version('AppStreamGlib', '1.0')
|
|
from gi.repository import AppStreamGlib
|
|
from app.utils.lists import alwaysAccept, alwaysDeny, badLicenses, badCategories, nonFreeAssets, nonFreeNetworkServices
|
|
import itertools
|
|
from app.utils import make_flask_login_password
|
|
from app.utils.image import get_image_size
|
|
from app.utils import randomString
|
|
|
|
#Workaround to get the urls because app.get_urls() doesn't work :|
|
|
def get_urls(app):
|
|
kinds = [AppStreamGlib.UrlKind(kind) for kind in range(11)]
|
|
urls = [(app.get_url_item(kind),kind.value_nick) for kind in kinds]
|
|
return list(filter(lambda a: a[0] is not None, urls))
|
|
|
|
def acceptedGame(app):
|
|
#return 'Game' in app.get_categories()
|
|
if app.get_id() in alwaysAccept:
|
|
return True
|
|
if app.get_id() in alwaysDeny:
|
|
return False
|
|
|
|
return app.get_project_license() and \
|
|
not [x for x in badLicenses if x in app.get_project_license()] and \
|
|
'Game' in app.get_categories() and \
|
|
not [x for x in badCategories if x in app.get_categories()]
|
|
|
|
def getScreenshots(app):
|
|
return [images.get_source() for images in app.get_screenshots()]
|
|
|
|
@celery.task()
|
|
def importFromFlathub():
|
|
url = "https://flathub.org/repo/appstream/x86_64/appstream.xml.gz"
|
|
with urllib.request.urlopen(url) as response, open("/var/cdb/uploads/appstream.xml.gz", 'wb') as out_file:
|
|
shutil.copyfileobj(response, out_file)
|
|
store = AppStreamGlib.Store()
|
|
file = Gio.File.new_for_path("/var/cdb/uploads/appstream.xml.gz")
|
|
file.load_contents()
|
|
AppStreamGlib.Store.from_file(store, file, ".", None)
|
|
apps = list(filter(acceptedGame, store.get_apps()))
|
|
session=db.session
|
|
licenses = { x.name : x for x in License.query.all() }
|
|
tags = { x.name : x for x in Tag.query.all() }
|
|
admin_user = User.query.filter_by(username="AppStreamBot").first()
|
|
|
|
if not admin_user:
|
|
admin_user = User("AppStreamBot")
|
|
admin_user.is_active = True
|
|
admin_user.password = make_flask_login_password("AppStreamBot")
|
|
admin_user.github_username = "AppStreamBot"
|
|
admin_user.forums_username = "AppStreamBot"
|
|
admin_user.rank = UserRank.ADMIN
|
|
session.add(admin_user)
|
|
|
|
|
|
for app in apps:
|
|
screenshots = getScreenshots(app)
|
|
urls = get_urls(app)
|
|
filename = app.get_name().replace(':', '').replace('/','') + ".html"
|
|
print("APPLICATION: ",app.get_name())
|
|
package_exists = Package.query.filter_by(name=app.get_id()).first()
|
|
if package_exists:
|
|
print(f"Package {app.get_id()} exists, skipping.")
|
|
else:
|
|
game1 = Package()
|
|
game1.state = PackageState.APPROVED
|
|
game1.name = app.get_id()
|
|
game1.title = app.get_name()
|
|
if "Development" in app.get_categories():
|
|
game1.type = PackageType.TOOL
|
|
else:
|
|
game1.type = PackageType.GAME
|
|
license = "Uknown" if app.get_project_license() is None else app.get_project_license().split("AND")[0].split("and")[0]
|
|
if license not in licenses:
|
|
row = License(license)
|
|
licenses[row.name] = row
|
|
session.add(row)
|
|
session.commit()
|
|
for category in app.get_categories():
|
|
if category.lower() not in tags:
|
|
row = Tag(category.lower())
|
|
tags[row.name] = row
|
|
print("adding tag: ", row.name)
|
|
session.add(row)
|
|
game1.tags.append(tags[category.lower()])
|
|
game1.license = licenses[license]
|
|
game1.media_license = licenses["MIT"]
|
|
game1.author = admin_user
|
|
|
|
|
|
for url,t in urls:
|
|
if t == "bugtracker":
|
|
game1.issueTracker = url
|
|
elif t == "homepage":
|
|
game1.repo = url
|
|
|
|
game1.forums = 12835
|
|
game1.short_desc = "" or app.get_comment()
|
|
game1.desc = app.get_description()
|
|
session.add(game1)
|
|
|
|
install_url = f"https://dl.flathub.org/repo/appstream/{app.get_id()}.flatpakref"
|
|
release = PackageRelease()
|
|
release.package = game1
|
|
release.title = "Flathub Install"
|
|
release.url = install_url
|
|
release.approved = True
|
|
release.downloads = 0
|
|
release.releaseDate = func.now()
|
|
|
|
session.add(release)
|
|
|
|
for screenshot in screenshots:
|
|
counter = 1
|
|
url = screenshot.get_url()
|
|
try:
|
|
r = requests.get(url,timeout=10)
|
|
r.raise_for_status()
|
|
filename = randomString(10) + "." + "png"
|
|
filepath = os.path.join("/var/cdb/uploads", filename)
|
|
print("Screenshot url: ", url)
|
|
with open(filepath,"wb") as f:
|
|
f.write(r.content)
|
|
|
|
width, height = get_image_size(filepath)
|
|
|
|
if (width is not None) and (height is not None):
|
|
ss = PackageScreenshot()
|
|
ss.package = game1
|
|
ss.title = "Untitled"
|
|
ss.url = "/uploads/" + filename
|
|
ss.width = width
|
|
ss.height = height
|
|
ss.approved = True
|
|
ss.order = counter
|
|
session.add(ss)
|
|
session.commit()
|
|
game1.cover_image = ss
|
|
session.commit()
|
|
counter += 1
|
|
except requests.exceptions.HTTPError as err:
|
|
print("HTTP error downloading the screenshot ", err)
|
|
except requests.exceptions.ConnectionError as err:
|
|
print("HTTP error downloading the screenshot ", err)
|
|
except requests.exceptions.ReadTimeout as err:
|
|
print("Screenshot timeout ", err)
|
|
except PIL.UnidentifiedImageError as err:
|
|
print("Corrupt image ", err)
|
|
|
|
session.commit()
|
|
def importAppstream():
|
|
pass |