在Java's JDK中有并发列表吗?

我如何创建一个并发的列表实例,在那里我可以通过索引访问元素?JDK是否有我可以使用的类或工厂方法?

407952 次浏览

因为获取位置和从给定位置获取元素的行为自然需要一些锁定(您不能让列表在这两个操作之间发生结构变化)。

并发收集的思想是,每个操作本身都是原子的,可以在没有显式锁定/同步的情况下完成。

因此,从给定的List中获取位于n位置的元素作为原子操作在期望并发访问的情况下没有太大意义。

java . util . concurrent中有一个并发列表实现。CopyOnWriteArrayList尤其如此。

免责声明:这个答案发表于2011年,在JDK 5之前,在许多高级和优化的并发api之前。因此,虽然下面的可以工作,但它不是最好的选择。


如果你所需要的只是简单的调用同步,你可以很好地使用Collections.synchronizedList(列表):

 List<Object> objList = Collections.synchronizedList(new ArrayList<Object>());

ConcurrentLinkedQueue

如果你不关心是否有基于索引的访问,而只想要List的插入顺序保留特征,你可以考虑java.util.concurrent.ConcurrentLinkedQueue。因为它实现了Iterable,一旦你添加完所有的项,你可以使用增强的for语法循环内容:

Queue<String> globalQueue = new ConcurrentLinkedQueue<String>();


//Multiple threads can safely call globalQueue.add()...


for (String href : globalQueue) {
//do something with href
}

enter image description here

CopyOnWriteArrayList是ArrayList的线程安全变体 所有可变操作(添加、设置等)都由

.创建底层数组的新副本

CopyOnWriteArrayList是同步列表实现列表接口的并发替代,它是java.util.concurrent包的一部分,它是线程安全的集合。

public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable

CopyOnWriteArrayList是故障安全的,当底层的CopyOnWriteArrayList在迭代期间使用ArrayList的单独副本被修改时,不会抛出ConcurrentModificationException。

这通常代价太大,因为拷贝数组涉及每个更新操作,将创建一个克隆副本。CopyOnWriteArrayList是频繁读操作的最佳选择。

/**
* Returns a shallow copy of this list.  (The elements themselves
* are not copied.)
*
* @return a clone of this list
*/
public Object clone() {
try {
@SuppressWarnings("unchecked")
CopyOnWriteArrayList<E> clone =
(CopyOnWriteArrayList<E>) super.clone();
clone.resetLock();
return clone;
} catch (CloneNotSupportedException e) {
// this shouldn't happen, since we are Cloneable
throw new InternalError();
}
}

你有这些选择:

  • Collections.synchronizedList():你可以包装任何List实现(ArrayListLinkedList或第三方列表)。对每个方法(读和写)的访问将使用synchronized来保护。当使用iterator()或增强的for循环时,必须手动同步整个迭代。在迭代时,其他线程甚至被完全阻止读取。你也可以为每个hasNextnext调用分别同步,但随后ConcurrentModificationException是可能的。

  • CopyOnWriteArrayList:修改是昂贵的,但读取是等待自由的。迭代器从不抛出ConcurrentModificationException,它们在创建迭代器时返回列表的快照,即使在迭代时列表被另一个线程修改。适用于不经常更新的列表。像addAll这样的批量操作更适合用于更新—内部数组被复制的次数更少。

  • Vector:非常像synchronizedList(new ArrayList<>()),但迭代也是同步的。然而,如果vector在迭代时被另一个线程修改,迭代器可以抛出ConcurrentModificationException

其他选项:

  • 如果你只在列表的末尾添加/删除或迭代它,QueueDeque可能是另一种选择。Queue只允许在一端添加和从另一端删除,Deque允许在两端添加和删除。没有索引访问。有多个实现具有比任何List所能提供的更好的并发属性。查看“所有已知的实现类”;在队列javadoc中,那些在java.util.concurrent包中的实现是并发的。你也可以看看JCTools,它包含了针对单个消费者或单个生产者的更快的队列实现。
  • Collections.unmodifiableList():无等待,线程安全,但不可修改
  • List.of,List.copyOf: Java 9及以后版本中的另一个不可变列表

如果你不打算从列表中删除元素(因为这需要在删除的元素之后更改所有元素的索引),那么你可以使用ConcurrentSkipListMap<Integer, T>来代替ArrayList<T>,例如:

NavigableMap<Integer, T> map = new ConcurrentSkipListMap<>();

这将允许您将项目添加到“列表”的末尾。如下所示,只要只有一个写线程(否则在map.size()map.put()之间存在竞争条件):

// Add item to end of the "list":
map.put(map.size(), item);

显然,您还可以修改“列表”中任何项的值。(即映射)通过简单地调用map.put(index, item)

将项放入映射或按索引检索它们的平均代价是O(log(n)),并且ConcurrentSkipListMap是无锁的,这使得它明显优于Vector (ArrayList的旧同步版本)。

您可以在“列表”中来回迭代。通过使用NavigableMap接口的方法。

只要你理解竞态条件警告(或者你可以只同步写入方法),你可以将上述所有内容包装到一个实现List接口的类中——并且你需要为remove方法抛出一个不支持的操作异常。实现所有必需的方法需要相当多的样板文件,但这里有一个快速的实现尝试。

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;


public class ConcurrentAddOnlyList<V> implements List<V> {
private NavigableMap<Integer, V> map = new ConcurrentSkipListMap<>();


@Override
public int size() {
return map.size();
}


@Override
public boolean isEmpty() {
return map.isEmpty();
}


@Override
public boolean contains(Object o) {
return map.values().contains(o);
}


@Override
public Iterator<V> iterator() {
return map.values().iterator();
}


@Override
public Object[] toArray() {
return map.values().toArray();
}


@Override
public <T> T[] toArray(T[] a) {
return map.values().toArray(a);
}


@Override
public V get(int index) {
return map.get(index);
}


@Override
public boolean containsAll(Collection<?> c) {
return map.values().containsAll(c);
}


@Override
public int indexOf(Object o) {
for (Entry<Integer, V> ent : map.entrySet()) {
if (Objects.equals(ent.getValue(), o)) {
return ent.getKey();
}
}
return -1;
}


@Override
public int lastIndexOf(Object o) {
for (Entry<Integer, V> ent : map.descendingMap().entrySet()) {
if (Objects.equals(ent.getValue(), o)) {
return ent.getKey();
}
}
return -1;
}


@Override
public ListIterator<V> listIterator(int index) {
return new ListIterator<V>() {
private int currIdx = 0;


@Override
public boolean hasNext() {
return currIdx < map.size();
}


@Override
public V next() {
if (currIdx >= map.size()) {
throw new IllegalArgumentException(
"next() called at end of list");
}
return map.get(currIdx++);
}


@Override
public boolean hasPrevious() {
return currIdx > 0;
}


@Override
public V previous() {
if (currIdx <= 0) {
throw new IllegalArgumentException(
"previous() called at beginning of list");
}
return map.get(--currIdx);
}


@Override
public int nextIndex() {
return currIdx + 1;
}


@Override
public int previousIndex() {
return currIdx - 1;
}


@Override
public void remove() {
throw new UnsupportedOperationException();
}


@Override
public void set(V e) {
// Might change size of map if currIdx == map.size(),
// so need to synchronize
synchronized (map) {
map.put(currIdx, e);
}
}


@Override
public void add(V e) {
synchronized (map) {
// Insertion is not supported except at end of list
if (currIdx < map.size()) {
throw new UnsupportedOperationException();
}
map.put(currIdx++, e);
}
}
};
}


@Override
public ListIterator<V> listIterator() {
return listIterator(0);
}


@Override
public List<V> subList(int fromIndex, int toIndex) {
// TODO Auto-generated method stub
return null;
}


@Override
public boolean add(V e) {
synchronized (map) {
map.put(map.size(), e);
return true;
}
}


@Override
public boolean addAll(Collection<? extends V> c) {
synchronized (map) {
for (V val : c) {
add(val);
}
return true;
}
}


@Override
public V set(int index, V element) {
synchronized (map) {
if (index < 0 || index > map.size()) {
throw new IllegalArgumentException("Index out of range");
}
return map.put(index, element);
}
}


@Override
public void clear() {
synchronized (map) {
map.clear();
}
}


@Override
public synchronized void add(int index, V element) {
synchronized (map) {
if (index < map.size()) {
// Insertion is not supported except at end of list
throw new UnsupportedOperationException();
} else if (index < 0 || index > map.size()) {
throw new IllegalArgumentException("Index out of range");
}
// index == map.size()
add(element);
}
}


@Override
public synchronized boolean addAll(
int index, Collection<? extends V> c) {
synchronized (map) {
if (index < map.size()) {
// Insertion is not supported except at end of list
throw new UnsupportedOperationException();
} else if (index < 0 || index > map.size()) {
throw new IllegalArgumentException("Index out of range");
}
// index == map.size()
for (V val : c) {
add(val);
}
return true;
}
}


@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}


@Override
public V remove(int index) {
throw new UnsupportedOperationException();
}


@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}


@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
}

不要忘记,即使使用如上所示的写入线程同步,您也需要小心不要遇到可能导致删除项的竞态条件,例如,如果您试图在读取线程中遍历列表,而写入线程正在向列表的末尾添加内容。

你甚至可以使用ConcurrentSkipListMap作为双结束列表,只要你不需要每个项的键来表示列表中的实际位置(即添加到列表的开头将为项分配负键)。(同样的竞争条件警告也适用于这里,即应该只有一个写入线程。)

// Add item after last item in the "list":
map.put(map.isEmpty() ? 0 : map.lastKey() + 1, item);


// Add item before first item in the "list":
map.put(map.isEmpty() ? 0 : map.firstKey() - 1, item);

大多数情况下,如果您需要并发列表,则它位于模型对象内部(因为您不应该使用抽象数据类型(如列表)来表示应用程序模型图中的节点),或者它是特定服务的一部分,您可以自己同步访问。

class MyClass {
List<MyType> myConcurrentList = new ArrayList<>();
void myMethod() {
synchronzied(myConcurrentList) {
doSomethingWithList;
}
}
}

通常这就足够让你继续下去了。如果你需要迭代,迭代列表的一个副本,而不是列表本身,并且只同步你复制列表的部分,而不是在你迭代它的时候。

另外,当并发处理一个列表时,通常要做的事情不仅仅是添加、删除或复制,这意味着该操作变得足够有意义,以保证它自己的方法,并且该列表成为一个特殊类的成员,仅表示这个具有线程安全行为的特定列表。

即使我同意需要一个并发列表实现和Vector / collections . synchronizelist (list)不做的技巧,你肯定需要像compareAndAdd或compareAndRemove或get(…, ifAbsentDo),即使您有ConcurrentList实现,开发人员在使用并发列表(和映射)时也经常会因为没有考虑真正的事务而引入错误。

在这些场景中,事务对于与并发ADT(抽象数据类型)交互的预期目的来说太小,总是导致我将列表隐藏在一个特殊的类中,并在方法级别上使用synchronized同步访问这个类对象方法。这是确保交易正确的唯一方法。

我已经看到了太多的错误,不能用其他方法来做——至少如果代码很重要,处理一些像金钱或安全或保证一些服务质量措施(例如至少发送一次且只发送一次消息)。