First commit
This commit is contained in:
parent
0a9fee9c07
commit
0ce5f912bc
1
libwyag/__init__.py
Normal file
1
libwyag/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .commands import main
|
38
libwyag/commands.py
Normal file
38
libwyag/commands.py
Normal file
@ -0,0 +1,38 @@
|
||||
import sys
|
||||
from libwyag import repository, objects
|
||||
from libwyag.parsers import argparser
|
||||
|
||||
|
||||
def main(argv=sys.argv[1:]):
|
||||
args = argparser.parse_args(argv)
|
||||
|
||||
if args.command == "init":
|
||||
cmd_init(args)
|
||||
elif args.command == "cat-file":
|
||||
cmd_cat_file(args)
|
||||
|
||||
|
||||
def cmd_init(args):
|
||||
repository.create_repo(args.path)
|
||||
|
||||
|
||||
def cmd_cat_file(args):
|
||||
repo = repository.repo_find()
|
||||
cat_file(repo, args.object, fmt=args.type.encode())
|
||||
|
||||
|
||||
def cat_file(repo, obj, fmt=None):
|
||||
obj = objects.read_object(repo, objects.find_object(repo, obj, fmt=fmt))
|
||||
sys.stdout.buffer.write(obj.serialize())
|
||||
|
||||
|
||||
def cmd_hash_object(args):
|
||||
if args.write:
|
||||
repo = repository.GitRepository(".")
|
||||
else:
|
||||
repo = None
|
||||
|
||||
with open(args.path, "rb") as fd:
|
||||
sha = objects.hash_object(fd, args.type.encode(), repo)
|
||||
print(sha)
|
||||
|
131
libwyag/objects.py
Normal file
131
libwyag/objects.py
Normal file
@ -0,0 +1,131 @@
|
||||
import zlib
|
||||
import hashlib
|
||||
from abc import ABC, abstractmethod
|
||||
from libwyag import repository
|
||||
|
||||
|
||||
class GitObject:
|
||||
|
||||
fmt: bytes = b""
|
||||
repo = None
|
||||
|
||||
def __init__(self, repo, data=None):
|
||||
self.repo = repo
|
||||
if data is not None:
|
||||
self.deserialize(data)
|
||||
|
||||
@abstractmethod
|
||||
def serialize(self):
|
||||
"""
|
||||
This function MUST be implemented by subclasses.
|
||||
It must read the object's contents from self.data, a byte string, and do
|
||||
whatever it takes to convert it into a meaningful representation.
|
||||
What exactly that means depend on each subclass.
|
||||
"""
|
||||
return NotImplemented
|
||||
|
||||
@abstractmethod
|
||||
def deserialize(self, data):
|
||||
return NotImplemented
|
||||
|
||||
|
||||
class GitBlob(GitObject):
|
||||
fmt = b"blob"
|
||||
|
||||
def serialize(self):
|
||||
return self.blobdata
|
||||
|
||||
def deserialize(self, data):
|
||||
self.blobdata = data
|
||||
|
||||
|
||||
class GitCommit(GitObject):
|
||||
pass
|
||||
|
||||
|
||||
class GitTree(GitObject):
|
||||
pass
|
||||
|
||||
|
||||
class GitTag(GitObject):
|
||||
pass
|
||||
|
||||
|
||||
def read_object(repo, sha):
|
||||
"""Read object object_id from Git repository repo. Return a
|
||||
GitObject whose exact type depends on the object."""
|
||||
|
||||
path = repository.repo_file(repo, "objects", sha[0:2], sha[2:])
|
||||
|
||||
with open(path, "rb") as f:
|
||||
raw = zlib.decompress(f.read())
|
||||
|
||||
# Read object type
|
||||
x = raw.find(b" ")
|
||||
fmt = raw[0:x]
|
||||
|
||||
# Read and validate object size
|
||||
y = raw.find(b"\x00", x)
|
||||
size = int(raw[x:y].decode("ascii"))
|
||||
if size != len(raw) - y - 1:
|
||||
raise Exception("Malformed object {0}: bad length".format(sha))
|
||||
|
||||
# Pick constructor
|
||||
if fmt == b"commit":
|
||||
c = GitCommit
|
||||
elif fmt == b"tree":
|
||||
c = GitTree
|
||||
elif fmt == b"tag":
|
||||
c = GitTag
|
||||
elif fmt == b"blob":
|
||||
c = GitBlob
|
||||
else:
|
||||
raise Exception(f"Unknown type {fmt.decode('ascii')} for object {sha}")
|
||||
|
||||
# Call constructor and return object
|
||||
return c(repo, raw[y + 1 :])
|
||||
|
||||
|
||||
def find_object(repo, name: str, fmt=None, follow=True):
|
||||
""" Name resolution function """
|
||||
pass
|
||||
|
||||
|
||||
def write_object(obj: GitObject, actually_write=True):
|
||||
# Serialize object data
|
||||
data = obj.serialize()
|
||||
# Add header
|
||||
result = obj.fmt + b" " + str(len(data)).encode() + b"\x00" + data
|
||||
# Compute hash
|
||||
sha = hashlib.sha1(result).hexdigest()
|
||||
|
||||
if actually_write:
|
||||
# Compute path
|
||||
path = repository.repo_file(
|
||||
obj.repo, "objects", sha[0:2], sha[2:], mkdir=actually_write
|
||||
)
|
||||
|
||||
with open(path, "wb") as f:
|
||||
# Compress and write
|
||||
f.write(zlib.compress(result))
|
||||
|
||||
return sha
|
||||
|
||||
|
||||
def hash_object(fd, fmt, repo=None):
|
||||
data = fd.read()
|
||||
|
||||
# Choose constructor depending on
|
||||
# object type found in header.
|
||||
if fmt == b"commit":
|
||||
obj = GitCommit(repo, data)
|
||||
elif fmt == b"tree":
|
||||
obj = GitTree(repo, data)
|
||||
elif fmt == b"tag":
|
||||
obj = GitTag(repo, data)
|
||||
elif fmt == b"blob":
|
||||
obj = GitBlob(repo, data)
|
||||
else:
|
||||
raise Exception("Unknown type %s!" % fmt)
|
||||
|
||||
return write_object(obj, repo)
|
52
libwyag/parsers.py
Normal file
52
libwyag/parsers.py
Normal file
@ -0,0 +1,52 @@
|
||||
import argparse
|
||||
|
||||
argparser = argparse.ArgumentParser(description="The stupid content tracker")
|
||||
argsubparsers = argparser.add_subparsers(title="Commands", dest="command")
|
||||
argsubparsers.required = True
|
||||
|
||||
# Init
|
||||
argsp = argsubparsers.add_parser("init", help="Initialize a new, empty repository.")
|
||||
argsp.add_argument(
|
||||
"path",
|
||||
metavar="directory",
|
||||
nargs="?",
|
||||
default=".",
|
||||
help="Where to create the repository.",
|
||||
)
|
||||
|
||||
# Cat-file
|
||||
argsp = argsubparsers.add_parser(
|
||||
"cat-file", help="Provide content of repository objects"
|
||||
)
|
||||
|
||||
argsp.add_argument(
|
||||
"type",
|
||||
metavar="type",
|
||||
choices=["blob", "commit", "tag", "tree"],
|
||||
help="Specify the type",
|
||||
)
|
||||
|
||||
argsp.add_argument("object", metavar="object", help="The object to display")
|
||||
|
||||
# Hash-object
|
||||
argsp = argsubparsers.add_parser(
|
||||
"hash-object", help="Compute object ID and optionally creates a blob from a file"
|
||||
)
|
||||
|
||||
argsp.add_argument(
|
||||
"-t",
|
||||
metavar="type",
|
||||
dest="type",
|
||||
choices=["blob", "commit", "tag", "tree"],
|
||||
default="blob",
|
||||
help="Specify the type",
|
||||
)
|
||||
|
||||
argsp.add_argument(
|
||||
"-w",
|
||||
dest="write",
|
||||
action="store_true",
|
||||
help="Actually write the object into the database",
|
||||
)
|
||||
|
||||
argsp.add_argument("path", help="Read object from <file>")
|
143
libwyag/repository.py
Normal file
143
libwyag/repository.py
Normal file
@ -0,0 +1,143 @@
|
||||
import os
|
||||
import configparser
|
||||
|
||||
|
||||
class GitRepository(object):
|
||||
"""A git repository"""
|
||||
|
||||
"foo/"
|
||||
worktree = None
|
||||
"foo/.git"
|
||||
gitdir = None
|
||||
"foo/.git/config"
|
||||
conf = None
|
||||
|
||||
def __init__(self, path, force=False):
|
||||
self.worktree = path
|
||||
self.gitdir = os.path.join(path, ".git")
|
||||
|
||||
if not (force or os.path.isdir(self.gitdir)):
|
||||
raise Exception("Not a Git repository %s" % path)
|
||||
|
||||
# Read configuration file in .git/config
|
||||
self.conf = configparser.ConfigParser()
|
||||
cf = repo_file(self, "config")
|
||||
|
||||
if cf and os.path.exists(cf):
|
||||
self.conf.read([cf])
|
||||
elif not force:
|
||||
raise Exception("Configuration file missing")
|
||||
|
||||
if not force:
|
||||
vers = int(self.conf.get("core", "repositoryformatversion"))
|
||||
# control that core.repositoryformatversion is 0
|
||||
if vers != 0 and not force:
|
||||
raise Exception("Unsupported repositoryformatversion %s" % vers)
|
||||
|
||||
|
||||
def repo_path(repo, *path):
|
||||
"""Compute path under repo's gitdir."""
|
||||
return os.path.join(repo.gitdir, *path)
|
||||
|
||||
|
||||
def repo_file(repo, *path, mkdir=False):
|
||||
"""
|
||||
Same as repo_path, but create dirname(*path) if absent. For
|
||||
example, repo_file(r, \"refs\" \"remotes\", \"origin\", \"HEAD\") will create
|
||||
.git/refs/remotes/origin.
|
||||
"""
|
||||
|
||||
if repo_dir(repo, *path[:-1], mkdir=mkdir):
|
||||
return repo_path(repo, *path)
|
||||
|
||||
|
||||
def repo_dir(repo, *path, mkdir=False):
|
||||
"""Same as repo_path, but mkdir *path if absent if mkdir."""
|
||||
|
||||
path = repo_path(repo, *path)
|
||||
|
||||
if os.path.exists(path):
|
||||
if os.path.isdir(path):
|
||||
return path
|
||||
else:
|
||||
raise Exception("Not a directory %s" % path)
|
||||
|
||||
if mkdir:
|
||||
os.makedirs(path)
|
||||
return path
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def create_repo(path):
|
||||
"""Create a new repository at path."""
|
||||
|
||||
repo = GitRepository(path, True)
|
||||
|
||||
# First, we make sure the path either doesn't exist or is an
|
||||
# empty dir.
|
||||
|
||||
if os.path.exists(repo.worktree):
|
||||
if not os.path.isdir(repo.worktree):
|
||||
raise Exception("%s is not a directory!" % path)
|
||||
if os.listdir(repo.worktree):
|
||||
raise Exception("%s is not empty!" % path)
|
||||
else:
|
||||
os.makedirs(repo.worktree)
|
||||
|
||||
repo_dir(repo, "branches", mkdir=True)
|
||||
repo_dir(repo, "objects", mkdir=True)
|
||||
repo_dir(repo, "refs", "tags", mkdir=True)
|
||||
repo_dir(repo, "refs", "heads", mkdir=True)
|
||||
|
||||
# .git/description
|
||||
with open(repo_file(repo, "description"), "w") as f:
|
||||
f.write(
|
||||
"Unnamed repository; edit this file 'description' to name the repository.\n"
|
||||
)
|
||||
|
||||
# .git/HEAD
|
||||
with open(repo_file(repo, "HEAD"), "w") as f:
|
||||
f.write("ref: refs/heads/master\n")
|
||||
|
||||
with open(repo_file(repo, "config"), "w") as f:
|
||||
config = repo_default_config()
|
||||
config.write(f)
|
||||
|
||||
return repo
|
||||
|
||||
|
||||
def repo_default_config():
|
||||
ret = configparser.ConfigParser()
|
||||
|
||||
ret.add_section("core")
|
||||
ret.set("core", "repositoryformatversion", "0")
|
||||
ret.set("core", "filemode", "false")
|
||||
ret.set("core", "bare", "false")
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def repo_find(path=".", required=True):
|
||||
"""
|
||||
Look for a repository, starting at current directory and recursing back until /.
|
||||
"""
|
||||
path = os.path.realpath(path)
|
||||
|
||||
if os.path.isdir(os.path.join(path, ".git")):
|
||||
return GitRepository(path)
|
||||
|
||||
# If we haven't returned, recurse in parent, if w
|
||||
parent = os.path.realpath(os.path.join(path, ".."))
|
||||
|
||||
if parent == path:
|
||||
# Bottom case
|
||||
# os.path.join("/", "..") == "/":
|
||||
# If parent==path, then path is root.
|
||||
if required:
|
||||
raise Exception("No git directory.")
|
||||
else:
|
||||
return None
|
||||
|
||||
# Recursive case
|
||||
return repo_find(parent, required)
|
Loading…
Reference in New Issue
Block a user