This repository has been archived on 2022-12-28. You can view files and clone it, but cannot push or open issues or pull requests.
gmclcore/Core/GMCLCore/main.py
2022-02-20 12:52:30 +08:00

251 lines
10 KiB
Python

# -*- coding: utf-8 -*-
#
# ________ _____ _________ .____ _________
# / _____/ / \ \_ ___ \| | \_ ___ \ ___________ ____
# / \ ___ / \ / \/ \ \/| | / \ \/ / _ \_ __ _/ __ \
# \ \_\ / Y \ \___| |__\ \___( <_> | | \\ ___/
# \______ \____|__ /\______ |_______ \______ /\____/|__| \___ >
# \/ \/ \/ \/ \/ \/
#
# Ghink Minecraft Launcher
# Copyleft Ghink Network Studio
# Bigsk (https://www.xiaxinzhe.cn)
# See the LICENSE
#
# Standard
import os, platform, hashlib, requests, json, time
import urllib.request, urllib.parse
from threading import Thread
from subprocess import Popen, PIPE
# Extend
import psutil
# Self
import GMCLCore.libs.log4py as log4py
import GMCLCore.libs.ariaAdapter as aria
from GMCLCore.config import *
class Core(object):
# Define constant
# Debug logs output switcher
DEBUG = True
# Downloader Mode
ARIA = "ARIA"
# Source Mode
OFFICIAL = "OFFICIAL"
BMCLAPI = "BMCLAPI"
# Auth Mode
OFFLINE = "OFFLINE"
MOJANG = "MOJANG"
MICROSOFT = "MICROSOFT"
def __init__(self, workDir = ".gmclcore"):
# Define constant
self.ENV_SOURCE = "https://resource.ghink.net/application/rely"
# Define launcher vars
self.__name= "GMCLCore"
self.__version = "{} {}".format(VERSION[0], ".".join(str(c) for c in VERSION[1]))
self.__workDir = workDir
self.__system = platform.system()
self.parallel = psutil.cpu_count()
self.parallelType = self.ARIA
self.useragent = "{}/{}".format(self.__name, self.__version)
# Create workdir
os.makedirs(self.__workDir, exist_ok = True)
if self.__system == "Windows":
import win32api, win32con
win32api.SetFileAttributes(self.__workDir, win32con.FILE_ATTRIBUTE_HIDDEN)
# Init log object
self.l = log4py.log(self.__workDir)
# Define default game vars
self.gameDir = os.path.join(os.path.dirname(__file__), ".minecraft")
self.java = [[], None]
self.gameMem = self.autoMemory()
self.authType = self.OFFLINE
self.id = "Steve"
self.uuid = self.generateUUID(self.id)
# Prepare running env
self.l.info("Initing running env...")
os.makedirs(os.path.join(self.__workDir, "env"), exist_ok = True)
os.makedirs(os.path.join(self.__workDir, "config"), exist_ok = True)
self.initEnv()
# Create daemon thread
self.l.info("Creating daemon thread...")
self.__daemonThread = Thread(target = self.__daemon, name = "Daemon")
self.__daemonThread.daemon = True
self.__daemonThread.start()
# Prepare the launcher runnning env
def initEnv(self):
# Aria2c
if self.networkTest():
source = self.urljoin(self.ENV_SOURCE, "aria2c")
if self.__system == "Windows":
if "64" in platform.machine():
self.__ariaPath = os.path.join(self.__workDir, "env", "aria2c64.exe")
source = self.urljoin(source, "win64")
elif "32" in platform.machine() or "86" in platform.machine():
self.__ariaPath = os.path.join(self.__workDir, "env", "aria2c32.exe")
source = self.urljoin(source, "win32")
else:
self.l.fatal("Unsupported processor architecture! ")
if not os.path.exists(self.__ariaPath):
with open(self.__ariaPath, "wb") as fb:
try:
content = requests.get(self.urljoin(source, "aria2c.exe"), headers = {"user-agent": self.useragent}).content
fb.write(content)
except Exception as e:
self.l.warn("Failed to install running env aria2c! System return: {}".format(e))
elif self.__system == "Linux":
self.__ariaPath = self.urljoin(self.__workDir, "env", "aria2clinux")
source = self.urljoin(source, "linux")
if not os.path.exists(self.__ariaPath):
with open(self.__ariaPath, "wb") as fb:
try:
content = requests.get(self.urljoin(source, "aria2c"), headers = {"user-agent": self.useragent}).content
fb.write(content)
except Exception as e:
self.l.warn("Failed to install running env aria2c! System return: {}".format(e))
elif self.__system == "Darwin":
self.__ariaPath = self.urljoin(self.__workDir, "env", "aria2cdarwin")
source = self.urljoin(source, "darwin")
if not os.path.exists(self.__ariaPath):
with open(self.__ariaPath, "wb") as fb:
try:
content = requests.get(self.urljoin(source, "aria2c"), headers = {"user-agent": self.useragent}).content
fb.write(content)
except Exception as e:
self.l.warn("Failed to install running env aria2c! System return: {}".format(e))
try:
self.aria = aria.adapter(self.__ariaPath)
except FileNotFoundError as e:
self.l.warn("Failed to find running env aria2c! System return: {}".format(e))
except TypeError as e:
self.l.warn("Failed to find running env aria2c! System return: {}".format(e))
else:
self.aria = False
# The daemon thread function
def __daemon(self):
# Start some sub-daemon-thread
threadPool = []
for t in threadPool:
t.daemon = True
t.start()
while True: ...
# Debug logs output
def __debug(self, *args) -> None :
self.l.info(*args)
# Determin wheather the @content is a legal json
def isJson(self, content: str) -> bool:
try:
json.loads(content)
except json.decoder.JSONDecodeError:
return False
else:
return True
# Merge the url
def urljoin(self, *args: str) -> str:
result = os.path.join(*args)
for _ in range(result.count("\\")):
result = result.replace("\\", "/")
return result
# Test the network working
def networkTest(self, source : str = r"http://www.baidu.com", retry : int = 3) -> bool:
flag = False
for _ in range(retry):
try:
fp = urllib.request.urlopen(source)
fp.read(100).decode()
fp.close()
except Exception as e:
self.__debug("networkTest Error:", str(e))
else:
flag = True
break
return flag
# Scan java in the Path and whole disk
def searchJava(self) -> None:
keywords = ("javaw", "java", "javaw.exe", "java.exe")
# Search in Env Vars
def env() -> None :
while True:
javaEnv = os.environ.get("Path", None) if self.__system == "Windows" else os.environ.get("PATH", None)
if javaEnv:
javaEnv = javaEnv.split(";") if self.__system == "Windows" else javaEnv.split(":")
for j in javaEnv:
for name in keywords:
if name == os.path.basename(j) and "jdk" not in j:
if os.path.exists(j):
version = self.javaVersion(j)
if not version:
self.java[0].append((j, version))
time.sleep(0.5)
# Scan in whole disk
def disk() -> None:
while True:
disks = []
if self.__system == "Windows":
partitions = psutil.disk_partitions()
for p in partitions:
disks.append(p.mountpoint)
else:
disks.append("/")
for d in disks:
for root, _, file in os.walk(d):
for f in file:
filePath = os.path.join(root, f)
for name in keywords:
if name == os.path.basename(filePath) and "jdk" not in filePath:
if os.path.exists(filePath):
version = self.javaVersion(filePath)
if not version:
self.java[0].append((filePath, version))
time.sleep(0.5)
envThread = Thread(target = env, name = "scanJavaEnv")
diskThread = Thread(target = disk, name = "scanJavaDisk")
envThread.daemon = True
diskThread.daemon = True
envThread.start()
diskThread.start()
# Read the version of the java
def javaVersion(self, path : str):
p = Popen((path, "--version"), shell = True, stdout = PIPE)
out = p.communicate()[0]
try:
result = int(out.splitlines()[0].decode().split()[1].split(".")[0])
except (UnicodeDecodeError, ValueError):
return False
else:
return result
# Return a auto-determined memory value for game
def autoMemory(self) -> float:
return 0.8 * (psutil.virtual_memory().free / 1024 ** 2)
# Generate a offline player UUID
def generateUUID(self, id : str) -> str:
result = hashlib.md5()
result.update("".join(('OfflinePlayer:', id)).encode())
result = result.hexdigest()
return result