Initial import.
[gitosis.git] / gitosis / test / test_ssh.py
1 from nose.tools import eq_ as eq, assert_raises
2
3 import os, errno
4 from cStringIO import StringIO
5
6 from gitosis import ssh
7
8 def mkdir(path):
9     try:
10         os.mkdir(path)
11     except OSError, e:
12         if e.errno == errno.EEXIST:
13             pass
14         else:
15             raise
16
17 def maketemp():
18     tmp = os.path.join(os.path.dirname(__file__), 'tmp')
19     mkdir(tmp)
20     me = os.path.splitext(os.path.basename(__file__))[0]
21     tmp = os.path.join(tmp, me)
22     mkdir(tmp)
23     return tmp
24
25 def writeFile(path, content):
26     tmp = '%s.tmp' % path
27     f = file(tmp, 'w')
28     try:
29         f.write(content)
30     finally:
31         f.close()
32     os.rename(tmp, path)
33
34 def _key(s):
35     return ''.join(s.split('\n')).strip()
36
37 KEY_1 = _key("""
38 ssh-rsa +v5XLsUrLsHOKy7Stob1lHZM17YCCNXplcKfbpIztS2PujyixOaBev1ku6H6ny
39 gUXfuYVzY+PmfTLviSwD3UETxEkR/jlBURACDQARJdUxpgt9XG2Lbs8bhOjonAPapxrH0o
40 9O8R0Y6Pm1Vh+H2U0B4UBhPgEframpeJYedijBxBV5aq3yUvHkXpcjM/P0gsKqr036k= j
41 unk@gunk
42 """)
43
44 KEY_2 = _key("""
45 ssh-rsa 4BX2TxZoD3Og2zNjHwaMhVEa5/NLnPcw+Z02TDR0IGJrrqXk7YlfR3oz+Wb/Eb
46 Ctli20SoWY0Ur8kBEF/xR4hRslZ2U8t0PAJhr8cq5mifhok/gAdckmSzjD67QJ68uZbga8
47 ZwIAo7y/BU7cD3Y9UdVZykG34NiijHZLlCBo/TnobXjFIPXvFbfgQ3y8g+akwocFVcQ= f
48 roop@snoop
49 """)
50
51 class ReadKeys_Test(object):
52     def test_empty(self):
53         tmp = maketemp()
54         empty = os.path.join(tmp, 'empty')
55         mkdir(empty)
56         gen = ssh.readKeys(keydir=empty)
57         assert_raises(StopIteration, gen.next)
58
59     def test_ignore_dot(self):
60         tmp = maketemp()
61         keydir = os.path.join(tmp, 'ignore_dot')
62         mkdir(keydir)
63         writeFile(os.path.join(keydir, '.jdoe.pub'), KEY_1+'\n')
64         gen = ssh.readKeys(keydir=keydir)
65         assert_raises(StopIteration, gen.next)
66
67     def test_ignore_nonpub(self):
68         tmp = maketemp()
69         keydir = os.path.join(tmp, 'ignore_dot')
70         mkdir(keydir)
71         writeFile(os.path.join(keydir, 'jdoe.xub'), KEY_1+'\n')
72         gen = ssh.readKeys(keydir=keydir)
73         assert_raises(StopIteration, gen.next)
74
75     def test_one(self):
76         tmp = maketemp()
77         keydir = os.path.join(tmp, 'one')
78         mkdir(keydir)
79         writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n')
80
81         gen = ssh.readKeys(keydir=keydir)
82         eq(gen.next(), ('jdoe', KEY_1))
83         assert_raises(StopIteration, gen.next)
84
85     def test_two(self):
86         tmp = maketemp()
87         keydir = os.path.join(tmp, 'two')
88         mkdir(keydir)
89         writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n')
90         writeFile(os.path.join(keydir, 'wsmith.pub'), KEY_2+'\n')
91
92         gen = ssh.readKeys(keydir=keydir)
93         got = frozenset(gen)
94
95         eq(got,
96            frozenset([
97             ('jdoe', KEY_1),
98             ('wsmith', KEY_2),
99             ]))
100
101 class GenerateAuthorizedKeys_Test(object):
102     def test_simple(self):
103         def k():
104             yield ('jdoe', KEY_1)
105             yield ('wsmith', KEY_2)
106         gen = ssh.generateAuthorizedKeys(k())
107         eq(gen.next(), ssh.COMMENT)
108         eq(gen.next(), (
109             'command="gitosis-serve jdoe",no-port-forwarding,no-X11-f'
110             +'orwarding,no-agent-forwarding,no-pty %s' % KEY_1))
111         eq(gen.next(), (
112             'command="gitosis-serve wsmith",no-port-forwarding,no-X11'
113             +'-forwarding,no-agent-forwarding,no-pty %s' % KEY_2))
114         assert_raises(StopIteration, gen.next)
115
116
117 class FilterAuthorizedKeys_Test(object):
118     def run(self, s):
119         f = StringIO(s)
120         lines = ssh.filterAuthorizedKeys(f)
121         got = ''.join(['%s\n' % line for line in lines])
122         return got
123
124     def check_no_change(self, s):
125         got = self.run(s)
126         eq(got, s)
127
128     def test_notFiltered_comment(self):
129         self.check_no_change('#comment\n')
130
131     def test_notFiltered_junk(self):
132         self.check_no_change('junk\n')
133
134     def test_notFiltered_key(self):
135         self.check_no_change('%s\n' % KEY_1)
136
137     def test_notFiltered_keyWithCommand(self):
138         s = '''\
139 command="faketosis-serve wsmith",no-port-forwarding,no-X11-forwardin\
140 g,no-agent-forwarding,no-pty %(key_1)s
141 ''' % dict(key_1=KEY_1)
142         self.check_no_change(s)
143
144
145     def test_filter_autogeneratedComment_backwardsCompat(self):
146         got = self.run('### autogenerated by gitosis, DO NOT EDIT\n')
147         eq(got, '')
148
149     def test_filter_autogeneratedComment_current(self):
150         got = self.run(ssh.COMMENT+'\n')
151         eq(got, '')
152
153     def test_filter_simple(self):
154         s = '''\
155 command="gitosis-serve wsmith",no-port-forwarding,no-X11-forwardin\
156 g,no-agent-forwarding,no-pty %(key_1)s
157 ''' % dict(key_1=KEY_1)
158         got = self.run(s)
159         eq(got, '')
160
161     def test_filter_withPath(self):
162         s = '''\
163 command="/foo/bar/baz/gitosis-serve wsmith",no-port-forwarding,no-X11-forwardin\
164 g,no-agent-forwarding,no-pty %(key_1)s
165 ''' % dict(key_1=KEY_1)
166         got = self.run(s)
167         eq(got, '')
168
169
170 class WriteAuthorizedKeys_Test(object):
171     def test_simple(self):
172         tmp = maketemp()
173         path = os.path.join(tmp, 'authorized_keys')
174         oldfp = StringIO('''\
175 # foo
176 bar
177 ### autogenerated by gitosis, DO NOT EDIT
178 command="/foo/bar/baz/gitosis-serve wsmith",no-port-forwarding,\
179 no-X11-forwarding,no-agent-forwarding,no-pty %(key_2)s
180 baz
181 ''' % dict(key_2=KEY_2))
182         keydir = os.path.join(tmp, 'one')
183         mkdir(keydir)
184         writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n')
185
186         ssh.writeAuthorizedKeys(
187             oldfp=oldfp, newpath=path, keydir=keydir)
188
189         f = file(path)
190         try:
191             got = f.read()
192         finally:
193             f.close()
194
195         eq(got, '''\
196 # foo
197 bar
198 baz
199 ### autogenerated by gitosis, DO NOT EDIT
200 command="gitosis-serve jdoe",no-port-forwarding,\
201 no-X11-forwarding,no-agent-forwarding,no-pty %(key_1)s
202 ''' % dict(key_1=KEY_1))