如何在 Python 中懒惰地从文件/流中读取多个 JSON 值?

我希望从 Python 中的文件/流中读取多个 JSON 对象,一次一个。不幸的是,在文件结束之前,json.load()只是 .read(); 似乎没有任何方法可以使用它来读取单个对象或对对象进行延迟迭代。

Is there any way to do this? Using the standard library would be ideal, but if there's a third-party library I'd use that instead.

目前,我把每个对象放在一个单独的行,并使用 json.loads(f.readline()),但我真的希望不需要这样做。

示例使用

Py

import my_json as json
import sys


for o in json.iterload(sys.stdin):
print("Working on a", type(o))

In.txt

{"foo": ["bar", "baz"]} 1 2 [] 4 5 6

例会

$ python3.2 example.py < in.txt
Working on a dict
Working on a int
Working on a int
Working on a list
Working on a int
Working on a int
Working on a int
93006 次浏览

JSON generally isn't very good for this sort of incremental use; there's no standard way to serialise multiple objects so that they can easily be loaded one at a time, without parsing the whole lot.

你正在使用的每行对象解决方案也可以在其他地方看到。 Scrapy 称之为‘ JSON lines’:

你可以用 Python 语言来做:

for jsonline in f:
yield json.loads(jsonline)   # or do the processing in this loop

我认为这是最好的方法——它不依赖于任何第三方库,而且很容易理解发生了什么。我在自己的一些代码中也使用了它。

你当然可以。你只需要直接去 raw_decode。这个实现将整个文件加载到内存中,并对该字符串进行操作(就像 json.load一样) ; 如果您有大文件,那么您可以将其修改为只读取必要的文件,而不会遇到太多困难。

import json
from json.decoder import WHITESPACE


def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
if isinstance(string_or_fp, file):
string = string_or_fp.read()
else:
string = str(string_or_fp)


decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()

用法: 正如您所要求的,它是一个生成器。

这实际上是一个相当棘手的问题,因为您必须在行中进行流,但是跨多行的模式匹配与大括号匹配,而且模式匹配与 json 匹配。这是一种 json 准备,然后是 json 解析。与其他格式相比,Json 很容易解析,所以并不总是需要使用解析库,然而,我们应该如何解决这些相互冲突的问题呢?

发电机前来救援!

The beauty of generators for a problem like this is you can stack them on top of each other gradually abstracting away the difficulty of the problem whilst maintaining laziness. I also considered using the mechanism for passing back values into a generator (send()) but fortunately found I didn't need to use that.

要解决第一个问题,您需要某种流式查找器,作为 re.finditer 的流式版本。我在下面尝试在返回匹配的同时根据需要引入代码行(取消注释要查看的调试语句)。然后,我实际上稍微修改了它,以产生不匹配的行和匹配(在产生的元组的第一部分中标记为0或1)。

import re


def streamingfinditer(pat,stream):
for s in stream:
#    print "Read next line: " + s
while 1:
m = re.search(pat,s)
if not m:
yield (0,s)
break
yield (1,m.group())
s = re.split(pat,s,1)[1]

这样,就可以匹配直到大括号,每次都要考虑大括号是否平衡,然后适当地返回简单对象或复合对象。

braces='{}[]'
whitespaceesc=' \t'
bracesesc='\\'+'\\'.join(braces)
balancemap=dict(zip(braces,[1,-1,1,-1]))
bracespat='['+bracesesc+']'
nobracespat='[^'+bracesesc+']*'
untilbracespat=nobracespat+bracespat


def simpleorcompoundobjects(stream):
obj = ""
unbalanced = 0
for (c,m) in streamingfinditer(re.compile(untilbracespat),stream):
if (c == 0): # remainder of line returned, nothing interesting
if (unbalanced == 0):
yield (0,m)
else:
obj += m
if (c == 1): # match returned
if (unbalanced == 0):
yield (0,m[:-1])
obj += m[-1]
else:
obj += m
unbalanced += balancemap[m[-1]]
if (unbalanced == 0):
yield (1,obj)
obj=""

这将返回如下的元组:

(0,"String of simple non-braced objects easy to parse")
(1,"{ 'Compound' : 'objects' }")

基本上这就是最恶心的部分了。现在,我们只需要按照自己认为合适的方式执行最后一级解析。例如,我们可以使用 Jeremy Roman 的 iterload 函数(谢谢!)为一行进行解析:

def streamingiterload(stream):
for c,o in simpleorcompoundobjects(stream):
for x in iterload(o):
yield x

测试一下:

of = open("test.json","w")
of.write("""[ "hello" ] { "goodbye" : 1 } 1 2 {
} 2
9 78
4 5 { "animals" : [ "dog" , "lots of mice" ,
"cat" ] }
""")
of.close()
// open & stream the json
f = open("test.json","r")
for o in streamingiterload(f.readlines()):
print o
f.close()

I get these results (and if you turn on that debug line, you'll see it pulls in the lines as needed):

[u'hello']
{u'goodbye': 1}
1
2
{}
2
9
78
4
5
{u'animals': [u'dog', u'lots of mice', u'cat']}

这并不适用于所有情况。由于实现了 json库,所以 impossible可以完全正确地工作,而无需自己重新实现解析器。

我想提供一个解决方案。关键思想是“尝试”解码: 如果失败,给它更多的饲料,否则使用偏移量信息准备下一次解码。

然而,当前的 json 模块不能容忍字符串头部的 SPACE 被解码,所以我必须去掉它们。

import sys
import json


def iterload(file):
buffer = ""
dec = json.JSONDecoder()
for line in file:
buffer = buffer.strip(" \n\r\t") + line.strip(" \n\r\t")
while(True):
try:
r = dec.raw_decode(buffer)
except:
break
yield r[0]
buffer = buffer[r[1]:].strip(" \n\r\t")




for o in iterload(sys.stdin):
print("Working on a", type(o),  o)

========================= 我已经测试了几个文本文件,它工作得很好。 (in1.txt)

{"foo": ["bar", "baz"]
}
1 2 [
]  4
{"foo1": ["bar1", {"foo2":{"A":1, "B":3}, "DDD":4}]
}
5   6

(in2.txt)

{"foo"
: ["bar",
"baz"]
}
1 2 [
] 4 5 6

(in.txt, your initial)

{"foo": ["bar", "baz"]} 1 2 [] 4 5 6

(本尼迪克特测试箱的输出)

python test.py < in.txt
('Working on a', <type 'list'>, [u'hello'])
('Working on a', <type 'dict'>, {u'goodbye': 1})
('Working on a', <type 'int'>, 1)
('Working on a', <type 'int'>, 2)
('Working on a', <type 'dict'>, {})
('Working on a', <type 'int'>, 2)
('Working on a', <type 'int'>, 9)
('Working on a', <type 'int'>, 78)
('Working on a', <type 'int'>, 4)
('Working on a', <type 'int'>, 5)
('Working on a', <type 'dict'>, {u'animals': [u'dog', u'lots of mice', u'cat']})

这是我的:

import simplejson as json
from simplejson import JSONDecodeError
class StreamJsonListLoader():
"""
When you have a big JSON file containint a list, such as


[{
...
},
{
...
},
{
...
},
...
]


And it's too big to be practically loaded into memory and parsed by json.load,
This class comes to the rescue. It lets you lazy-load the large json list.
"""


def __init__(self, filename_or_stream):
if type(filename_or_stream) == str:
self.stream = open(filename_or_stream)
else:
self.stream = filename_or_stream


if not self.stream.read(1) == '[':
raise NotImplementedError('Only JSON-streams of lists (that start with a [) are supported.')


def __iter__(self):
return self


def next(self):
read_buffer = self.stream.read(1)
while True:
try:
json_obj = json.loads(read_buffer)


if not self.stream.read(1) in [',',']']:
raise Exception('JSON seems to be malformed: object is not followed by comma (,) or end of list (]).')
return json_obj
except JSONDecodeError:
next_char = self.stream.read(1)
read_buffer += next_char
while next_char != '}':
next_char = self.stream.read(1)
if next_char == '':
raise StopIteration
read_buffer += next_char

也许有点晚了,但我有这个确切的问题(好吧,或多或少)。对于这些问题,我的标准解决方案通常是在一些已知的根对象上执行正则表达式分割,但在我的例子中这是不可能的。唯一可行的通用方法是 实现适当的标记器

After not finding a generic-enough and reasonably well-performing solution, I ended doing this myself, writing the splitstream module. It is a pre-tokenizer that understands JSON and XML and splits a continuous stream into multiple chunks for parsing (it leaves the actual parsing up to you though). To get some kind of performance out of it, it is written as a C module.

例如:

from splitstream import splitfile


for jsonstr in splitfile(sys.stdin, format="json")):
yield json.loads(jsonstr)

我用的是@维朗的高雅解决方案。这个简单的方法——读取一个字节,尝试解码,读取一个字节,尝试解码,... ——起作用了,但不幸的是它非常慢。

在我的例子中,我试图从一个文件中读取具有相同对象类型的“漂亮打印的”JSON 对象。这使我能够优化这种方法; 我可以逐行读取文件,只有在发现包含正好“}”的行时才进行解码:

def iterload(stream):
buf = ""
dec = json.JSONDecoder()
for line in stream:
line = line.rstrip()
buf = buf + line
if line == "}":
yield dec.raw_decode(buf)
buf = ""

如果您碰巧使用每行一个转义为字符串字面值的换行的紧凑 JSON,那么您可以更加安全地简化这种方法:

def iterload(stream):
dec = json.JSONDecoder()
for line in stream:
yield dec.raw_decode(line)

显然,这些简单的方法只适用于非常特定类型的 JSON。然而,如果这些假设成立,这些解决方案将正确而迅速地工作。

这里有一个非常非常简单的解决方案。秘诀是尝试、失败和使用异常中的信息来正确解析。唯一的限制是文件必须是可查找的。

def stream_read_json(fn):
import json
start_pos = 0
with open(fn, 'r') as f:
while True:
try:
obj = json.load(f)
yield obj
return
except json.JSONDecodeError as e:
f.seek(start_pos)
json_str = f.read(e.pos)
obj = json.loads(json_str)
start_pos += e.pos
yield obj

Edit: just noticed that this will only work for Python >=3.5. For earlier, failures return a ValueError, and you have to parse out the position from the string, e.g.

def stream_read_json(fn):
import json
import re
start_pos = 0
with open(fn, 'r') as f:
while True:
try:
obj = json.load(f)
yield obj
return
except ValueError as e:
f.seek(start_pos)
end_pos = int(re.match('Extra data: line \d+ column \d+ .*\(char (\d+).*\)',
e.args[0]).groups()[0])
json_str = f.read(end_pos)
obj = json.loads(json_str)
start_pos += end_pos
yield obj

I believe a better way of doing it would be to use a state machine. Below is a sample code that I worked out by converting a NodeJS code on below link to Python 3 (used nonlocal keyword only available in Python 3, code won't work on Python 2)

编辑 -1: 更新并使代码与 Python2兼容

编辑 -2: 同时更新并添加了仅 Python 3版本

Https://gist.github.com/creationix/5992451

Python 3唯一版本

# A streaming byte oriented JSON parser.  Feed it a single byte at a time and
# it will emit complete objects as it comes across them.  Whitespace within and
# between objects is ignored.  This means it can parse newline delimited JSON.
import math




def json_machine(emit, next_func=None):
def _value(byte_data):
if not byte_data:
return


if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _value  # Ignore whitespace


if byte_data == 0x22:  # "
return string_machine(on_value)


if byte_data == 0x2d or (0x30 <= byte_data < 0x40):  # - or 0-9
return number_machine(byte_data, on_number)


if byte_data == 0x7b:  #:
return object_machine(on_value)


if byte_data == 0x5b:  # [
return array_machine(on_value)


if byte_data == 0x74:  # t
return constant_machine(TRUE, True, on_value)


if byte_data == 0x66:  # f
return constant_machine(FALSE, False, on_value)


if byte_data == 0x6e:  # n
return constant_machine(NULL, None, on_value)


if next_func == _value:
raise Exception("Unexpected 0x" + str(byte_data))


return next_func(byte_data)


def on_value(value):
emit(value)
return next_func


def on_number(number, byte):
emit(number)
return _value(byte)


next_func = next_func or _value
return _value




TRUE = [0x72, 0x75, 0x65]
FALSE = [0x61, 0x6c, 0x73, 0x65]
NULL = [0x75, 0x6c, 0x6c]




def constant_machine(bytes_data, value, emit):
i = 0
length = len(bytes_data)


def _constant(byte_data):
nonlocal i
if byte_data != bytes_data[i]:
i += 1
raise Exception("Unexpected 0x" + str(byte_data))


i += 1
if i < length:
return _constant
return emit(value)


return _constant




def string_machine(emit):
string = ""


def _string(byte_data):
nonlocal string


if byte_data == 0x22:  # "
return emit(string)


if byte_data == 0x5c:  # \
return _escaped_string


if byte_data & 0x80:  # UTF-8 handling
return utf8_machine(byte_data, on_char_code)


if byte_data < 0x20:  # ASCII control character
raise Exception("Unexpected control character: 0x" + str(byte_data))


string += chr(byte_data)
return _string


def _escaped_string(byte_data):
nonlocal string


if byte_data == 0x22 or byte_data == 0x5c or byte_data == 0x2f:  # " \ /
string += chr(byte_data)
return _string


if byte_data == 0x62:  # b
string += "\b"
return _string


if byte_data == 0x66:  # f
string += "\f"
return _string


if byte_data == 0x6e:  # n
string += "\n"
return _string


if byte_data == 0x72:  # r
string += "\r"
return _string


if byte_data == 0x74:  # t
string += "\t"
return _string


if byte_data == 0x75:  # u
return hex_machine(on_char_code)


def on_char_code(char_code):
nonlocal string
string += chr(char_code)
return _string


return _string




# Nestable state machine for UTF-8 Decoding.
def utf8_machine(byte_data, emit):
left = 0
num = 0


def _utf8(byte_data):
nonlocal num, left
if (byte_data & 0xc0) != 0x80:
raise Exception("Invalid byte in UTF-8 character: 0x" + byte_data.toString(16))


left = left - 1


num |= (byte_data & 0x3f) << (left * 6)
if left:
return _utf8
return emit(num)


if 0xc0 <= byte_data < 0xe0:  # 2-byte UTF-8 Character
left = 1
num = (byte_data & 0x1f) << 6
return _utf8


if 0xe0 <= byte_data < 0xf0:  # 3-byte UTF-8 Character
left = 2
num = (byte_data & 0xf) << 12
return _utf8


if 0xf0 <= byte_data < 0xf8:  # 4-byte UTF-8 Character
left = 3
num = (byte_data & 0x07) << 18
return _utf8


raise Exception("Invalid byte in UTF-8 string: 0x" + str(byte_data))




# Nestable state machine for hex escaped characters
def hex_machine(emit):
left = 4
num = 0


def _hex(byte_data):
nonlocal num, left


if 0x30 <= byte_data < 0x40:
i = byte_data - 0x30
elif 0x61 <= byte_data <= 0x66:
i = byte_data - 0x57
elif 0x41 <= byte_data <= 0x46:
i = byte_data - 0x37
else:
raise Exception("Expected hex char in string hex escape")


left -= 1
num |= i << (left * 4)


if left:
return _hex
return emit(num)


return _hex




def number_machine(byte_data, emit):
sign = 1
number = 0
decimal = 0
esign = 1
exponent = 0


def _mid(byte_data):
if byte_data == 0x2e:  # .
return _decimal


return _later(byte_data)


def _number(byte_data):
nonlocal number
if 0x30 <= byte_data < 0x40:
number = number * 10 + (byte_data - 0x30)
return _number


return _mid(byte_data)


def _start(byte_data):
if byte_data == 0x30:
return _mid


if 0x30 < byte_data < 0x40:
return _number(byte_data)


raise Exception("Invalid number: 0x" + str(byte_data))


if byte_data == 0x2d:  # -
sign = -1
return _start


def _decimal(byte_data):
nonlocal decimal
if 0x30 <= byte_data < 0x40:
decimal = (decimal + byte_data - 0x30) / 10
return _decimal


return _later(byte_data)


def _later(byte_data):
if byte_data == 0x45 or byte_data == 0x65:  # E e
return _esign


return _done(byte_data)


def _esign(byte_data):
nonlocal esign
if byte_data == 0x2b:  # +
return _exponent


if byte_data == 0x2d:  # -
esign = -1
return _exponent


return _exponent(byte_data)


def _exponent(byte_data):
nonlocal exponent
if 0x30 <= byte_data < 0x40:
exponent = exponent * 10 + (byte_data - 0x30)
return _exponent


return _done(byte_data)


def _done(byte_data):
value = sign * (number + decimal)
if exponent:
value *= math.pow(10, esign * exponent)


return emit(value, byte_data)


return _start(byte_data)




def array_machine(emit):
array_data = []


def _array(byte_data):
if byte_data == 0x5d:  # ]
return emit(array_data)


return json_machine(on_value, _comma)(byte_data)


def on_value(value):
array_data.append(value)


def _comma(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _comma  # Ignore whitespace


if byte_data == 0x2c:  # ,
return json_machine(on_value, _comma)


if byte_data == 0x5d:  # ]
return emit(array_data)


raise Exception("Unexpected byte: 0x" + str(byte_data) + " in array body")


return _array




def object_machine(emit):
object_data = {}
key = None


def _object(byte_data):
if byte_data == 0x7d:  #
return emit(object_data)


return _key(byte_data)


def _key(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _object  # Ignore whitespace


if byte_data == 0x22:
return string_machine(on_key)


raise Exception("Unexpected byte: 0x" + str(byte_data))


def on_key(result):
nonlocal key
key = result
return _colon


def _colon(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _colon  # Ignore whitespace


if byte_data == 0x3a:  # :
return json_machine(on_value, _comma)


raise Exception("Unexpected byte: 0x" + str(byte_data))


def on_value(value):
object_data[key] = value


def _comma(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _comma  # Ignore whitespace


if byte_data == 0x2c:  # ,
return _key


if byte_data == 0x7d:  #
return emit(object_data)


raise Exception("Unexpected byte: 0x" + str(byte_data))


return _object

Python 2兼容版

# A streaming byte oriented JSON parser.  Feed it a single byte at a time and
# it will emit complete objects as it comes across them.  Whitespace within and
# between objects is ignored.  This means it can parse newline delimited JSON.
import math




def json_machine(emit, next_func=None):
def _value(byte_data):
if not byte_data:
return


if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _value  # Ignore whitespace


if byte_data == 0x22:  # "
return string_machine(on_value)


if byte_data == 0x2d or (0x30 <= byte_data < 0x40):  # - or 0-9
return number_machine(byte_data, on_number)


if byte_data == 0x7b:  #:
return object_machine(on_value)


if byte_data == 0x5b:  # [
return array_machine(on_value)


if byte_data == 0x74:  # t
return constant_machine(TRUE, True, on_value)


if byte_data == 0x66:  # f
return constant_machine(FALSE, False, on_value)


if byte_data == 0x6e:  # n
return constant_machine(NULL, None, on_value)


if next_func == _value:
raise Exception("Unexpected 0x" + str(byte_data))


return next_func(byte_data)


def on_value(value):
emit(value)
return next_func


def on_number(number, byte):
emit(number)
return _value(byte)


next_func = next_func or _value
return _value




TRUE = [0x72, 0x75, 0x65]
FALSE = [0x61, 0x6c, 0x73, 0x65]
NULL = [0x75, 0x6c, 0x6c]




def constant_machine(bytes_data, value, emit):
local_data = {"i": 0, "length": len(bytes_data)}


def _constant(byte_data):
# nonlocal i, length
if byte_data != bytes_data[local_data["i"]]:
local_data["i"] += 1
raise Exception("Unexpected 0x" + byte_data.toString(16))


local_data["i"] += 1


if local_data["i"] < local_data["length"]:
return _constant
return emit(value)


return _constant




def string_machine(emit):
local_data = {"string": ""}


def _string(byte_data):
# nonlocal string


if byte_data == 0x22:  # "
return emit(local_data["string"])


if byte_data == 0x5c:  # \
return _escaped_string


if byte_data & 0x80:  # UTF-8 handling
return utf8_machine(byte_data, on_char_code)


if byte_data < 0x20:  # ASCII control character
raise Exception("Unexpected control character: 0x" + byte_data.toString(16))


local_data["string"] += chr(byte_data)
return _string


def _escaped_string(byte_data):
# nonlocal string


if byte_data == 0x22 or byte_data == 0x5c or byte_data == 0x2f:  # " \ /
local_data["string"] += chr(byte_data)
return _string


if byte_data == 0x62:  # b
local_data["string"] += "\b"
return _string


if byte_data == 0x66:  # f
local_data["string"] += "\f"
return _string


if byte_data == 0x6e:  # n
local_data["string"] += "\n"
return _string


if byte_data == 0x72:  # r
local_data["string"] += "\r"
return _string


if byte_data == 0x74:  # t
local_data["string"] += "\t"
return _string


if byte_data == 0x75:  # u
return hex_machine(on_char_code)


def on_char_code(char_code):
# nonlocal string
local_data["string"] += chr(char_code)
return _string


return _string




# Nestable state machine for UTF-8 Decoding.
def utf8_machine(byte_data, emit):
local_data = {"left": 0, "num": 0}


def _utf8(byte_data):
# nonlocal num, left
if (byte_data & 0xc0) != 0x80:
raise Exception("Invalid byte in UTF-8 character: 0x" + byte_data.toString(16))


local_data["left"] -= 1


local_data["num"] |= (byte_data & 0x3f) << (local_data["left"] * 6)
if local_data["left"]:
return _utf8
return emit(local_data["num"])


if 0xc0 <= byte_data < 0xe0:  # 2-byte UTF-8 Character
local_data["left"] = 1
local_data["num"] = (byte_data & 0x1f) << 6
return _utf8


if 0xe0 <= byte_data < 0xf0:  # 3-byte UTF-8 Character
local_data["left"] = 2
local_data["num"] = (byte_data & 0xf) << 12
return _utf8


if 0xf0 <= byte_data < 0xf8:  # 4-byte UTF-8 Character
local_data["left"] = 3
local_data["num"] = (byte_data & 0x07) << 18
return _utf8


raise Exception("Invalid byte in UTF-8 string: 0x" + str(byte_data))




# Nestable state machine for hex escaped characters
def hex_machine(emit):
local_data = {"left": 4, "num": 0}


def _hex(byte_data):
# nonlocal num, left
i = 0  # Parse the hex byte
if 0x30 <= byte_data < 0x40:
i = byte_data - 0x30
elif 0x61 <= byte_data <= 0x66:
i = byte_data - 0x57
elif 0x41 <= byte_data <= 0x46:
i = byte_data - 0x37
else:
raise Exception("Expected hex char in string hex escape")


local_data["left"] -= 1
local_data["num"] |= i << (local_data["left"] * 4)


if local_data["left"]:
return _hex
return emit(local_data["num"])


return _hex




def number_machine(byte_data, emit):
local_data = {"sign": 1, "number": 0, "decimal": 0, "esign": 1, "exponent": 0}


def _mid(byte_data):
if byte_data == 0x2e:  # .
return _decimal


return _later(byte_data)


def _number(byte_data):
# nonlocal number
if 0x30 <= byte_data < 0x40:
local_data["number"] = local_data["number"] * 10 + (byte_data - 0x30)
return _number


return _mid(byte_data)


def _start(byte_data):
if byte_data == 0x30:
return _mid


if 0x30 < byte_data < 0x40:
return _number(byte_data)


raise Exception("Invalid number: 0x" + byte_data.toString(16))


if byte_data == 0x2d:  # -
local_data["sign"] = -1
return _start


def _decimal(byte_data):
# nonlocal decimal
if 0x30 <= byte_data < 0x40:
local_data["decimal"] = (local_data["decimal"] + byte_data - 0x30) / 10
return _decimal


return _later(byte_data)


def _later(byte_data):
if byte_data == 0x45 or byte_data == 0x65:  # E e
return _esign


return _done(byte_data)


def _esign(byte_data):
# nonlocal esign
if byte_data == 0x2b:  # +
return _exponent


if byte_data == 0x2d:  # -
local_data["esign"] = -1
return _exponent


return _exponent(byte_data)


def _exponent(byte_data):
# nonlocal exponent
if 0x30 <= byte_data < 0x40:
local_data["exponent"] = local_data["exponent"] * 10 + (byte_data - 0x30)
return _exponent


return _done(byte_data)


def _done(byte_data):
value = local_data["sign"] * (local_data["number"] + local_data["decimal"])
if local_data["exponent"]:
value *= math.pow(10, local_data["esign"] * local_data["exponent"])


return emit(value, byte_data)


return _start(byte_data)




def array_machine(emit):
local_data = {"array_data": []}


def _array(byte_data):
if byte_data == 0x5d:  # ]
return emit(local_data["array_data"])


return json_machine(on_value, _comma)(byte_data)


def on_value(value):
# nonlocal array_data
local_data["array_data"].append(value)


def _comma(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _comma  # Ignore whitespace


if byte_data == 0x2c:  # ,
return json_machine(on_value, _comma)


if byte_data == 0x5d:  # ]
return emit(local_data["array_data"])


raise Exception("Unexpected byte: 0x" + str(byte_data) + " in array body")


return _array




def object_machine(emit):
local_data = {"object_data": {}, "key": ""}


def _object(byte_data):
# nonlocal object_data, key
if byte_data == 0x7d:  #
return emit(local_data["object_data"])


return _key(byte_data)


def _key(byte_data):
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _object  # Ignore whitespace


if byte_data == 0x22:
return string_machine(on_key)


raise Exception("Unexpected byte: 0x" + byte_data.toString(16))


def on_key(result):
# nonlocal object_data, key
local_data["key"] = result
return _colon


def _colon(byte_data):
# nonlocal object_data, key
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _colon  # Ignore whitespace


if byte_data == 0x3a:  # :
return json_machine(on_value, _comma)


raise Exception("Unexpected byte: 0x" + str(byte_data))


def on_value(value):
# nonlocal object_data, key
local_data["object_data"][local_data["key"]] = value


def _comma(byte_data):
# nonlocal object_data
if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
return _comma  # Ignore whitespace


if byte_data == 0x2c:  # ,
return _key


if byte_data == 0x7d:  #
return emit(local_data["object_data"])


raise Exception("Unexpected byte: 0x" + str(byte_data))


return _object

测试一下

if __name__ == "__main__":
test_json = """[1,2,"3"] {"name":
"tarun"} 1 2
3 [{"name":"a",
"data": [1,
null,2]}]
"""
def found_json(data):
print(data)


state = json_machine(found_json)


for char in test_json:
state = state(ord(char))

相同的输出是

[1, 2, '3']
{'name': 'tarun'}
1
2
3
[{'name': 'a', 'data': [1, None, 2]}]

如果使用 json.JSONDecode 实例,则可以使用 raw_decode成员函数。它返回 JSON 值的 python 表示形式的元组和解析停止的位置的索引。这使得切割(或在流对象中查找)剩余的 JSON 值变得很容易。我对于额外的 while 循环跳过输入中不同 JSON 值之间的空白并不感到高兴,但是在我看来它完成了任务。

import json


def yield_multiple_value(f):
'''
parses multiple JSON values from a file.
'''
vals_str = f.read()
decoder = json.JSONDecoder()
try:
nread = 0
while nread < len(vals_str):
val, n = decoder.raw_decode(vals_str[nread:])
nread += n
# Skip over whitespace because of bug, below.
while nread < len(vals_str) and vals_str[nread].isspace():
nread += 1
yield val
except json.JSONDecodeError as e:
pass
return

下一个版本要短得多,并且会吃掉已经被解析的字符串部分。似乎出于某种原因,当字符串中的第一个字符是空格时,第二个调用 json.JSONDecoder.raw _ decode ()似乎会失败,这也是我在上面的 while 循环中跳过空格的原因..。

def yield_multiple_value(f):
'''
parses multiple JSON values from a file.
'''
vals_str = f.read()
decoder = json.JSONDecoder()
while vals_str:
val, n = decoder.raw_decode(vals_str)
#remove the read characters from the start.
vals_str = vals_str[n:]
# remove leading white space because a second call to decoder.raw_decode()
# fails when the string starts with whitespace, and
# I don't understand why...
vals_str = vals_str.lstrip()
yield val
return

在关于 json.JSONDecder 类的文档中,raw _ decode https://docs.python.org/3/library/json.html#encoders-and-decoders方法包含以下内容:

这可用于从可能具有 最后的无关数据。

这些无关的数据很容易成为另一个 JSON 值。换句话说,在编写方法时可能会考虑到这个目的。

通过 input.txt 使用上面的函数,我得到了原问题中给出的示例输出。

您可以使用 https://pypi.org/project/json-stream-parser/来完成这个目的。

import sys
from json_stream_parser import load_iter
for obj in load_iter(sys.stdin):
print(obj)

输出

{'foo': ['bar', 'baz']}
1
2
[]
4
5
6

如果您可以控制数据的生成方式,那么您可能需要切换到另一种格式,比如 ndjson,它代表 新行分隔 JSON,并允许对增量 JSON 格式的数据进行流处理。每一行本身都是有效的 JSON。有两个 Python 包可用: ndjsonJsonlines

还有 Json-stream,它允许您在读取 JSON 时处理它,从而避免必须预先加载整个 JSON。您应该能够使用它从流中读取 JSON 数据,而且还可以在任何其他 I/O 操作之前和之后使用相同的流,或者从同一个流中读取多个 JSON 对象。